#-*- coding: utf-8 -*- # Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import string from random import choice import os import types import subprocess from subprocess import Popen,PIPE,STDOUT import stat from shutil import copytree, rmtree import cl_overriding import re import sys import getpass from types import StringType try: from magic import open as type_file, MAGIC_NONE as MAGIC_NONE except ImportError: try: from magic import open as type_file, NONE as MAGIC_NONE except: type_file = None MAGIC_NONE = None import cl_lang tr = cl_lang.lang() tr.setLocalDomain('cl_lib') tr.setLanguage(sys.modules[__name__]) class _error: # Здесь ошибки, если они есть error = [] def getError(self): """Выдать ошибки""" if not self.error: return False error = "" for e in self.error: error += e + "\n" return error def setError(self, error): """Установка ошибки""" self.error.append(error) return True class _warning: # Здесь предупреждения warning = [] def getWarning(self): """Выдать ошибки""" if not self.warning: return False warning = "" for w in self.warning: warning += w + "\n" return warning def setWarning(self, warning): """Установка ошибки""" self.warning.append(warning) return True class proxy_type_file: def __init__(self,flags): self.flags = flags def load(self): pass def setflags(self,flags): self.flags = flags def close(self): pass def file(self,filename): if os.path.exists(filename): if self.flags == 0x410: processFile = process("file","-bi",filename) if processFile.success(): return processFile.read().rstrip() else: processFile = process("file","-bz",filename) if processFile.success(): return processFile.read().rstrip() return None class typeFile: """Получение типа файла""" def __init__(self, magic=0x410): if type_file is None: self.magicObject = proxy_type_file(MAGIC_NONE) else: self.magicObject = type_file(MAGIC_NONE) self.magicObject.load() self.magicObject.setflags(magic) def __del__(self): """Закрываем magic""" self.magicObject.close() def getMType(self, filename): """Информация о типе файла""" return self.magicObject.file(filename) def isBinary(self, filename): """является ли файл бинарным""" mime = self.getMType(filename) # В случае ошибки if mime.count("`"): return mime elif mime.count("binary"): return True return False class scanDirectory: """Класс для cканирования директории""" def processingFile(self, path, prefix): """Обработка в случае файла""" return True def processingDirectory(self, path, prefix): """Обработка в случае директории если возвращаем None то пропуск дир.""" return True def scanningDirectory(self, scanDir, skipFile=[], skipDir=[], prefix=None, flagDir=False): """Сканирование и обработка шаблонов в директории scanDir""" ret = True if not prefix: prefix = os.path.join(scanDir,"")[:-1] if not flagDir: # проверка корневой директории retDir = self.processingDirectory(scanDir, scanDir) if retDir is None: return None elif retDir is False: return False if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]): for fileOrDir in sorted(os.listdir(scanDir)): absPath = os.path.join(scanDir,fileOrDir) relPath = absPath.split(prefix)[1] stInfo = os.lstat(absPath) statInfo = stInfo[stat.ST_MODE] if stat.S_ISREG(statInfo): # Обработка файла if relPath in skipFile: continue if not self.processingFile(absPath, prefix): ret = False break elif stat.S_ISDIR(statInfo): # Обработка директории if relPath in skipDir: continue retDir = self.processingDirectory(absPath, prefix) if retDir is None: continue elif retDir is False: ret = False break ret = self.scanningDirectory(absPath, skipFile, skipDir, prefix, True) if ret is False: break return ret class process: """Execute system command by Popen Examples: execute program and get result: if process("/bin/gzip","/boot/somefile").success(): print "Gzip success" unzip and process unzip data by cpio (list files): processGzip = process("/bin/gzip","-dc","/boot/initrd") processCpio = process("/bin/cpio","-tf",stdin=processGzip) filelist = processCpio.readlines() execute command and send data: processGrub = process("/sbin/grub") processGrub.write("root (hd0,0)\n") processGrub.write("setup (hd0)\n") processGrub.write("quit\n") isok = processGrub.success() union stdout and stderr: process("/bin/ls","/",stderr=STDOUT) result to stdout: process("/bin/ls",stdout=None) get data from keyboard: process("/bin/cat",stdin=None) """ def __init__(self,command,*params,**kwarg): if not "stdin" in kwarg: stdin=self._defaultStdin else: if kwarg["stdin"] == None: stdin = self._keyboardStdin else: stdin=kwarg["stdin"].getStdout self.stdout = kwarg.get("stdout",PIPE) self.envdict = kwarg.get("envdict",None) self.stderr = kwarg.get("stderr",PIPE) self.command = [command] + list(params) self.stdin = stdin self.iter = None self.pipe = None self.cacheresult = None self.communicated = False def _open(self): """Open pipe if it not open""" if not self.pipe: self.pipe = Popen(self.command, stdout=self.stdout, stdin=self.stdin(), stderr=self.stderr, env=self.envdict) def _defaultStdin(self): """Return default stdin""" return PIPE def _keyboardStdin(self): """Return keyboard stdin""" return None def getStdout(self): """Get current stdout""" self.close() self._open() return self.pipe.stdout def write(self,data): """Write to process stdin""" self._open() self.pipe.stdin.write(data) self.pipe.stdin.flush() def close(self): """Close stdin""" if self.pipe: self.pipe.stdin.close() def read(self): """Read all data""" try: self._open() if self.cacheresult is None: self.cacheresult = self.pipe.communicate()[0] return self.cacheresult except KeyboardInterrupt: self.kill() raise KeyboardInterrupt def readlines(self): """Read lines""" return self.read().split('\n') def __iter__(self): """Get iterator""" if not self.iter: self.iter = iter(self.readlines()) return self.iter def kill(self): """Kill this process""" if self.pipe: self.pipe.kill() def next(self): """Next string from stdout""" return self.__iter__().next() def returncode(self): """Get return code""" self.read() return self.pipe.returncode def success(self): """Success or not""" return self.returncode() == 0 def failed(self): """Failed or not""" return self.returncode() != 0 class processProgress(process): """Execute system command by Popen for parse stdout.""" def __init__(self,command,*params,**kwarg): process.__init__(self,command,*params,**kwarg) self.readsize = kwarg.get("readsize",10) self.init(**kwarg) def init(self,**kwarg): pass def read(self): """Read data with parsing ability""" try: self.processInit() self._open() if self.cacheresult is None: self.cacheresult = [] self.buf = "" part = self.pipe.stdout.read(1) while part: if self.buf: self.buf += part else: self.buf = part if self.processStdout(): self.processDraw() self.cacheresult.append(part) part = self.pipe.stdout.read(self.readsize) self.pipe.poll() self.processEnd(self.success()) except KeyboardInterrupt: self.cacheresult = "".join(self.cacheresult) self.pipe.kill() self.processEnd(False) raise KeyboardInterrupt() self.cacheresult = "".join(self.cacheresult) return self.cacheresult def processInit(self): """Called when read first byte""" pass def processDraw(self): """Called when processStdout return True""" pass def processStdout(self): """Called when read readsize byte from stdout""" return True def processEnd(self,res=True): """Called when process end""" pass def runOsCommand(cmd,in_str=None, env_dict=None): """Выполняет внешнюю программу Параметры: cmd внешняя программа in_str данные передаваемые программе на страндартный вход. env_dict словарь переменных окружения Возвращает (код возврата, список stdout+stderr) """ pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_dict, close_fds=True, shell=True) fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr) # если есть данные на вход, передать их if in_str: fin.write(in_str) fin.close() res = map(lambda x: x.rstrip(), fout.readlines()) fout.close() res += map(lambda x: x.rstrip(), ferr.readlines()) ferr.close() # Код возврата retcode = pipe.wait() return retcode, res def genpassword(passlen=9): '''Вернуть случайный набор символов указанной длины Параметры: passlen длина пароля который нужно сгенерировать Возвращаемые параметры: Сгенерированный пароль указанной длины ''' res=''.join([choice(string.ascii_letters+string.digits)\ for i in xrange(passlen)]) return res def fillstr(char, width): '''Заполнить строку указанным числом символов. Псеводоним символ*кол-во''' return str(char) * width def getpathenv(): """Вернуть пути для запуска утилит""" bindir=['/sbin','/bin','/usr/sbin','/usr/bin'] env=os.environ if env and env.has_key('PATH'): lpath=env['PATH'].split(":") npath=[] for dirname in bindir: if os.path.exists(dirname) and dirname not in lpath: npath.append(dirname) lpath=npath+lpath return ":".join(lpath) class MultiReplace: """MultiReplace function object Usage: replacer = MultiReplace({'str':'efg','in':'asd'}) s = replacer("string") """ def __init__(self, repl_dict): # string to string mapping; use a regular expression keys = repl_dict.keys() keys.sort(reverse=True) # lexical order pattern = u"|".join([re.escape(key) for key in keys]) self.pattern = re.compile(pattern) self.dict = repl_dict def replace(self, s): # apply replacement dictionary to string def repl(match, get=self.dict.get): item = match.group(0) return get(item, item) return self.pattern.sub(repl, s) __call__ = replace def str2dict(s): """Convert string to dictionary: String format: {'key1':'value1','key2':'val\'ue2'} """ value = r'(?:\\\\|\\\'|[^\'])' pair = r""" \s*' # begin key (%(v)s*) ' # end key \s*:\s* # delimeter key/value ' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reDict = re.compile(pair, re.X) reMatchDict = re.compile(""" ^{ # begin dict ((%(v)s,)* # many pair with comma at end %(v)s)? # pair without comma }$ # end dict """ % {'v':pair}, re.X) if reMatchDict.match(s.strip()): d = dict(reDict.findall(s)) replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) for i in d.keys(): d[i] = replaceSlash(d[i]) return d else: cl_overriding.printERROR(_("wrong dict value: %s"%s)) cl_overriding.exit(1) def str2list(s): """Convert string to list: String format: ['value1','val\'ue2'] """ value = r'(?:\\\\|\\\'|[^\'])' element = r""" \s*' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reList = re.compile(element, re.X) reMatchList = re.compile(""" ^\[ # begin dict ((%(v)s,)* # many elements with comma at end %(v)s)? # element without comma \]$ # end dict """ % {'v':element}, re.X) if reMatchList.match(s.strip()): replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) return [replaceSlash(i) for i in reList.findall(s)] else: cl_overriding.printERROR(_("wrong list value: %s"%s)) cl_overriding.exit(1) def list2str(list): """Convert list to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i)) for i in list ]) def dict2str(dict): """Convert dictionry to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \ for (k,v) in dict.items()]) def convertStrListDict(val): """Convert data between string, list, dict""" # if val is list if type(val) == types.ListType: return list2str(val) # if val is dictionary elif type(val) == types.DictType: return dict2str(val) # else it is string else: # detect dictionary if re.match("^{.*}$",val): return str2dict(val) # detect list elif re.match("^\[.*\]$",val): return str2list(val) # else is simple string else: return val def _toUNICODE(val): """Convert text to unicode""" if type(val) == types.UnicodeType: return val else: return str(val).decode('UTF-8') def getModeFile(nameFile, mode="all"): """Выдает информацию о файле mode=="all" права файла, владелец, группа файла mode=="mode" права файла mode=="owner" владелец, группа файла """ fst = os.lstat(nameFile) if mode == "all": return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid) if mode == "mode": return stat.S_IMODE(fst.st_mode) if mode == "owner": return fst.st_uid, fst.st_gid def chownR(directory, uid, gid): """изменяет владельца и группу для всех файлов и директорий внутри directory """ def chownPaths(rootPath, listPath, uid, gid): for chPath in listPath: chownPath = os.path.join(rootPath, chPath) statInfo = os.lstat(chownPath)[stat.ST_MODE] if stat.S_ISLNK(statInfo): os.lchown(chownPath, uid, gid) else: os.chown(chownPath, uid, gid) for root, dirs, files in os.walk(directory): # меняем владельца директории os.chown(root, uid, gid) # Меняем владельца директорий chownPaths(root, dirs, uid, gid) # Меняем владельца файлов chownPaths(root, files, uid, gid) return True def copyDir(srcDir, destDir): """Копируем директорию srcDir в destDir При копировании сохраняются владелец, группа, права """ def ignoreFile(path, names): """Игнорирование сокетов при копировании""" ignore = [] for name in names: if stat.S_ISSOCK(os.lstat(os.path.join(path, name))[stat.ST_MODE]): ignore.append(name) return ignore copytree(srcDir, destDir, ignore=ignoreFile) return True def removeDir(rmDir): """Рекурсивное удаление директории""" rmtree(rmDir) return True def getRunCommands(): """List run program""" def getCmd(procNum): cmdLineFile = '/proc/%s/cmdline'%procNum try: if os.path.exists(cmdLineFile): return open(cmdLineFile,'r').read().strip() except: pass return "" if not os.access('/proc',os.R_OK): return [] return map(getCmd,filter(lambda x:x.isdigit(), os.listdir('/proc'))) def isFstabMount(pathname,mapDevUuid={}): """Get mount point or device from fstab""" def removeQuotes(s): return s.replace('"','').replace("'","") if pathname == "swap": absPath = "swap" else: absPath = os.path.abspath(pathname) devuuid = '/dev/disk/by-uuid' if not mapDevUuid and os.path.exists(devuuid): mapDevUuid.update( map(lambda x:("UUID=%s"%os.path.basename(x), os.path.normpath(os.path.join(devuuid,os.readlink(x)))), filter(os.path.islink, map(lambda x:os.path.join(devuuid,x), os.listdir(devuuid))))) # convert fstab to # [['/dev/sda3', '/', '', 'reiserfs', 'noatime', '', '', '0', '2\n'], # ['/dev/sda5', '/var/calculate', 'reiserfs', 'noatime', '0', '0\n']] listFstab = filter(lambda x: len(x) >= 4, map(lambda x: filter(lambda x: x, x.replace('\t',' ').split(' ')), filter(lambda x: not x.startswith('#') and x.strip(), open("/etc/fstab")))) # get mount point or device or dir return filter(lambda x: x!=absPath, reduce(lambda x,y: y, filter(lambda x: absPath in x and x[1] != "none", map(lambda x: [mapDevUuid.get(removeQuotes(x[0]),x[0]), x[1] if x[2] != "swap" else "swap"], listFstab)),[""]))[0] def isMount(path): """В случае монтирования директории выдает другой примонтированный путь""" absPath = os.path.abspath(path) mtabFile = '/etc/mtab' if not os.access(mtabFile,os.R_OK): return "" return filter(lambda x: x!=absPath, reduce(lambda x,y: y, filter(lambda x: absPath in x, map(lambda x: [x[0], x[1]], map(lambda x: x.split(" "), open(mtabFile)))),[""]))[0] def commonPath(*paths): """Return common path from list of paths""" paths = map(lambda x:os.path.normpath(x).split('/'),paths) res = map(lambda x:x[0], filter(lambda x:filter(lambda y:x[0]==y,x[1:]),zip(*paths))) return "/".join(res) def childMounts(path): """Get all mount points which contain path""" if path != "none": absPath = os.path.abspath(path) else: absPath = path mtabFile = '/etc/mtab' if not os.access(mtabFile,os.R_OK): return "" return reduce(lambda x,y: x + [y], filter(lambda x: commonPath(absPath,x[0])==absPath or \ commonPath(absPath,x[1])==absPath, map(lambda x: [x[0], x[1]], map(lambda x: x.split(" "), open(mtabFile)))), []) def pathJoin(*paths): """Складывает пути, в отличии от os.path.join, складывает абсолютные пути""" if len(paths)==1: return paths[0] return reduce(os.path.join, filter(lambda x:x and x != "/", map(lambda x: x.startswith("/") and x[1:] or x, paths[1:])),paths[0]) def getUserPassword(flag="dialog", pwDialog=False): """Получить пароль у пользователя flag - опция "dalog" или "stdin" - откуда получаем пароль pwDialog - структура для вывода приглашения в режиме диалога """ userPwd = "" if flag == "dialog": if not pwDialog: pwDialog = [_("New password"), _("Retype new password")] pwdA = getpass.getpass(pwDialog[0]+":") pwdB = getpass.getpass(pwDialog[1]+":") elif flag == "stdin": pwdA = sys.stdin.readline().rstrip() pwdB = sys.stdin.readline().rstrip() else: cl_overriding.printERROR(_("ERROR in function getUserPassword, \ incorrect option 'flag=%s'")%flag) return False if not pwdA or not (pwdA == pwdB): cl_overriding.printERROR(_("ERROR") + ": " +\ _("password incorrect")+ ": " + _("try again")) return False userPwd = pwdA return userPwd def cmpVersion(v1,v2): """Compare versions specified by tuple or string""" if isinstance(v1,StringType): v1 = getTupleVersion(v1) if isinstance(v2,StringType): v2 = getTupleVersion(v2) return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]), (v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1])) def getTupleVersion(ver): """Get version specified by string as list: Example: 2.6.30 [(2,6,30),('r',0)] 2.6.31-r1 [(2,6,31),('r',1)] """ suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3, "rc": -1} def toTuple(v): return map(lambda x: suffix_value[x] if x in suffix_value else x, map(lambda x: int(x) if x.isdigit() else x, re.findall("r\d+$|\d+|[a-zA-Z+]+", v.replace('-SNAPSHOT','')))) vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups() vers = toTuple(vers) revision = toTuple(revision or "r0") return [vers,revision] def appendProgramToEnvFile(nameProg, objVar): """Append name program to variable cl-merges and save /etc/calculate/calculate.env """ if not objVar.AppendToList("cl_merges", nameProg, force=True): return False if not objVar.WriteList("cl_merges", force=True): return False return True def removeProgramToEnvFile(nameProg, objVar): """Remove name program from variable cl-merges and save /etc/calculate/calculate.env""" if not objVar.RemoveToList("cl_merges", nameProg, force=True): return False if not objVar.WriteList("cl_merges", force=True): return False return True def checkDigestFile(digestfile): """Check digest by digestfile""" reEntry = re.compile(r"# (\S+) HASH\n(\S+) (\S+)",re.S) result = [] for alg,hashdata,filename in \ reEntry.findall(open(digestfile,'r').read()): if hasattr(hashlib,alg.lower()): hashobj = getattr(hashlib,alg.lower()) filename = os.path.join(os.path.dirname(digestfile),filename) if os.path.exists(filename): digest = hashobj(open(filename,'r').read()) result.append((alg, digest.hexdigest().upper() == hashdata.upper())) return result def getFilesCount(directory): """Get files count from directory""" if os.path.exists(directory): return len(reduce(lambda x,y:x+y,map(lambda x:x[1]+x[2], os.walk(directory)),[])) return 0 def listDirectory(directory): """Get files from directory, if it exists""" try: return os.listdir(directory) except OSError: pass return [] def detectDeviceForPartition(dev): """Detect parent device for partition by /sys/block (sysfs)""" reDeviceSplit = re.compile("^(.*/)?(.*?)(\d+)$") device = map(lambda x:filter(lambda x:x in dev, x[1]), os.walk('/sys/block')) if device: device = device[0] parentdevices = \ filter(lambda x: os.path.split(dev)[-1] in \ reduce(lambda y,z:y+z[1], os.walk(os.path.join('/sys/block',x)),[]), device) if parentdevices: return parentdevices[0] res = reDeviceSplit.search(dev) if res: return res.groups()[1] return None def getProgPath(progname): """Get full path of program or False""" baseprogname = os.path.basename(progname) env = {"LANG":"C"} env.update(os.environ.items() + [("PATH",getpathenv())] +\ env.items()) res = runOsCommand("which %s"%progname,env_dict=env) if res[0] == 0: return res[1][0].strip() elif os.path.isabs(progname) and os.path.exists(progname): return progname else: return False reVerSplit = re.compile(r"^(.*?)-(([^-]+?)(?:-(r\d+))?)(?:.(tbz2))?$",re.S) def reVerSplitToPV(x): """Convert match from reVerSplit to PV hash""" if x: match = x.groups() return {'PN':match[0], 'PF':"%s-%s"%(match[0],match[1]), 'P':"%s-%s"%(match[0],match[2]), 'PV':match[2], 'PR':match[3] or "r0", 'PVR':match[1]} return {'PN':"", 'PF':"", 'P':"", 'PV':"", 'PR':"", 'PVR':""}.copy() def getPkgUses(fullpkg): """Get USE and IUSE from package""" category,slash,pkg = fullpkg.partition('/') pkgCategory = '/var/db/pkg/{0}'.format(category) packages = filter(lambda x:x['PN'] == pkg, map(reVerSplitToPV, filter(lambda x:x, map(lambda x:reVerSplit.search(x), listDirectory(pkgCategory))))) if not packages: return None usePath = os.path.join(pkgCategory,packages[-1]['PF'],"USE") iusePath = os.path.join(pkgCategory,packages[-1]['PF'],"IUSE") return (map(lambda x:x[1:] if x.startswith("+") else x, filter(lambda x:x, open(usePath,'r').read().strip().split())), map(lambda x:x[1:] if x.startswith("+") else x, filter(lambda x:x, open(iusePath,'r').read().strip().split()))) def getPkgActiveUses(fullpkg): """Get active uses from package""" res = getPkgUses(fullpkg) if not res: return None return list(set(res[0]) & set(res[1])) def getSquashList(): """Get supprted squashfs compressions method""" wantMethod = set(["lzo","lzma","xz","gzip"]) usesSquashFs = getPkgActiveUses("sys-fs/squashfs-tools") if not usesSquashFs: return ["gzip"] else: return map(lambda x:{"lzma":"xz"}.get(x,x), list(set(usesSquashFs) & wantMethod)) def getAvailableX11Drivers(): """Get available x11 drivers""" xorg_modules_dir = '/usr/lib/xorg/modules/drivers' return map(lambda x: x[:-7], filter(lambda x:x.endswith('_drv.so'), listDirectory(xorg_modules_dir)))