#-*- coding: utf-8 -*- # Copyright 2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from cl_utils import typeFile, process, listDirectory import gzip import re import os from os import path from os import access,R_OK from subprocess import PIPE from shutil import rmtree,copy2 class KernelConfig: """Object for work with kernel config Example: # get current config kc = KernelConfig() # get specified config kc = KernelConfig(configFile="/boot/config-2.6.34.1-i686-CLD") """ def __init__(self,configFile=None): if configFile == None: configFile = '/proc/config.gz' self.config = self._processConfig(configFile) def _processConfig(self,configFile): fileType = typeFile().getMType(configFile).partition(';')[0].strip() if fileType == "application/x-gzip": config = gzip.open(configFile) else: config = open(configFile,'r') reEmptyString = re.compile("^(#\s*|\s*)$") # discard empty strings data = filter(lambda x:not reEmptyString.match(x),config.readlines()) # convert param names data = map(lambda x:x.strip().replace("=y","=Y" ).replace("=m","=M" ).replace(" is not set","=N"), data) # fix data data = ["# Root"]+map(lambda x: x[2:] if "=N" in x else x,data)[3:] config = [] curSect = None for line in data: if line.startswith('#'): curSect = line[2:] else: config.append([curSect]+line.split('=')) return config def getSectionParams(self,*regs): """Get params from sections specified by regular Example: params = kernelconfig.getSectionParams("SCSI Transport","File Sys.*") """ params = [] for reg in regs: reMatch = re.compile(reg,re.M) params += filter(lambda x:reMatch.search(x[0]),self.config) return map(lambda x: (x[1],x[2]), params) def getSections(self): return list(set(map(lambda x:x[0],self.config))) class KernelModules: """Object for work with current kernel modules""" def __init__(self): self._initModulesData() def _initModulesData(self): def holderlistdir(dirpath): dirs = listDirectory(path.join(sysModules,dirpath,"holders")) return reduce(lambda x,y:x+[y]+holderlistdir(path.join(dirpath, "holders",y)), dirs, []) sysModules = '/sys/module' holderPath = lambda x: path.join(sysModules,x,"holders") refPath = lambda x: path.join(sysModules,x,"refcnt") self.loadModules = filter(lambda x: access(holderPath(x),R_OK) and access(refPath(x),R_OK), listDirectory(sysModules)) self.useModules = map(lambda x:int(open(refPath(x),'r').read().strip()), self.loadModules) self.depModules = map(lambda x: holderlistdir(x), self.loadModules) def getWorkModules(self,forceRemove=[],forceKeep=[]): """Get work modules""" needModules = filter(lambda x:x[1] > len(x[2]), zip(self.loadModules, self.useModules, self.depModules)) needModulesName = set(map(lambda x:x[0],needModules)+forceKeep) allModules = map(lambda x:x[0], filter(lambda x:set(x[1])&needModulesName, zip(self.loadModules, self.depModules))) reRemove = re.compile("|".join(forceRemove)) return filter(lambda x: not reRemove.search(x), map(lambda x:x.replace("-","_"), allModules+list(needModulesName))) class InitRamFs: def __init__(self,initRamFsFile): self.initrdFile = initRamFsFile self.tmpPath = path.join('/tmp',path.basename(self.initrdFile)) def _unpackInitRamfs(self): """Unpack initramfs""" self.prevDir = os.getcwd() if not path.exists(self.tmpPath): os.mkdir(self.tmpPath) os.chdir(self.tmpPath) cpioProcess = process("cpio", "-di") gzipInitrd = gzip.open(self.initrdFile,'r') cpioProcess.write(gzipInitrd.read()) res = cpioProcess.success() os.chdir(self.prevDir) return res def searchInside(self,searchFunc): """Search data in file list of initramfs""" cpioProcess = process("cpio", "-tf", stdin=process("gzip", "-dc", self.initrdFile)) return filter(searchFunc,cpioProcess) def isModuleInside(self,moduleName): """Search module in initramfs""" return bool(self.searchInside(lambda x: "%s.ko"%moduleName in x)) def _packInitRamfs(self,newInitramfsFile=None): """Pack initramfs""" self.prevDir = os.getcwd() try: # get file list for pack fileList = reduce(lambda x,y: x +map(lambda z:path.join(y[0],z),y[1]) +map(lambda z:path.join(y[0],z),y[2]), os.walk(self.tmpPath), []) fileList = map(lambda x:x[len(self.tmpPath)+1:],fileList) # change dir for cpio os.chdir(self.tmpPath) # create cpio process cpioProcess = process("cpio", "-o", "--quiet", "-H","newc") # send file list to cpio process cpioProcess.write("\n".join(fileList)) cpioProcess.write("\n") # get result of cpio cpioData = cpioProcess.read() cpioRes = cpioProcess.returncode() except KeyboardInterrupt: os.chdir(self.prevDir) raise KeyboardInterrupt except Exception,e: print e.__repr__() cpioRes = 255 finally: os.chdir(self.prevDir) if cpioRes != 0: return False # remove old initrd file if not newInitramfsFile: newInitramfsFile = self.initrdFile if path.exists(newInitramfsFile): os.unlink(newInitramfsFile) # pack and write new initrd file initrd = gzip.open(newInitramfsFile,'w') initrd.write(cpioData) initrd.close() return True def cleanInitRamFs(self,newinitrd=None,video=None): """Unpack initramfs clean and pack""" copy2(self.initrdFile,newinitrd) return True