#-*- coding: utf-8 -*- # Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import string from random import choice import os import types import subprocess from subprocess import Popen,PIPE,STDOUT import stat from shutil import copytree, rmtree import ctypes import cl_overriding import re import sys import getpass from types import StringType import cl_lang tr = cl_lang.lang() tr.setLocalDomain('cl_lib') tr.setLanguage(sys.modules[__name__]) class _error: # Здесь ошибки, если они есть error = [] def getError(self): """Выдать ошибки""" if not self.error: return False error = "" for e in self.error: error += e + "\n" return error def setError(self, error): """Установка ошибки""" self.error.append(error) return True class typeFile: """Получение типа файла""" def __init__(self, flag=0x410): # Загружаем libmagic.so self.lib = ctypes.cdll.LoadLibrary("libmagic.so") self.lib.error.restype = ctypes.c_char_p self.lib.magic_file.restype = ctypes.c_char_p # 0x10 | 0x400 = MAGIC_MIME (константа, декларируется в magic.h) self.magic = self.lib.magic_open(flag) self.lib.magic_load(self.magic, None) def __del__(self): """Закрываем magic""" self.lib.magic_close(self.magic) def getMType(self, filename): """Информация о типе файла""" return self.lib.magic_file(self.magic, filename) def isBinary(self, filename): """является ли файл бинарным""" mime = self.getMType(filename) # В случае ошибки if mime.count("`"): return mime elif mime.count("binary"): return True return False class scanDirectory: """Класс для cканирования директории""" def processingFile(self, path, prefix): """Обработка в случае файла""" return True def processingDirectory(self, path, prefix): """Обработка в случае директории если возвращаем None то пропуск дир.""" return True def scanningDirectory(self, scanDir, skipFile=[], skipDir=[], prefix=None, flagDir=False): """Сканирование и обработка шаблонов в директории scanDir""" ret = True if not prefix: prefix = os.path.join(scanDir,"")[:-1] if not flagDir: # проверка корневой директории retDir = self.processingDirectory(scanDir, scanDir) if retDir is None: return None elif retDir is False: return False if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]): for fileOrDir in sorted(os.listdir(scanDir)): absPath = os.path.join(scanDir,fileOrDir) relPath = absPath.split(prefix)[1] stInfo = os.lstat(absPath) statInfo = stInfo[stat.ST_MODE] if stat.S_ISREG(statInfo): # Обработка файла if relPath in skipFile: continue if not self.processingFile(absPath, prefix): ret = False break elif stat.S_ISDIR(statInfo): # Обработка директории if relPath in skipDir: continue retDir = self.processingDirectory(absPath, prefix) if retDir is None: continue elif retDir is False: ret = False break ret = self.scanningDirectory(absPath, skipFile, skipDir, prefix, True) if ret is False: break return ret class process: """Execute system command by Popen Example: #execute program and get result if process("/bin/gzip","/boot/somefile").success(): print "Gzip success" #unzip and process unzip data by cpio (list files) processGzip = process("/bin/gzip","-dc","/boot/initrd") processCpio = process("/bin/cpio","-tf",stdin=processGzip) filelist = processCpio.readlines() #execute command and send data processGrub = process("/sbin/grub") processGrub.write("root (hd0,0)\n") processGrub.write("setup (hd0)\n") processGrub.write("quit\n") isok = processGrub.success() #union stdout and stderr process("/bin/ls","/",stderr=STDOUT) """ def __init__(self,command,*params,**kwarg): if not "stdin" in kwarg: stdin=self._defaultStdin else: stdin=kwarg["stdin"].getStdout self.stderr = kwarg.get("stderr",PIPE) self.command = [command] + list(params) self.stdin = stdin self.iter = None self.pipe = None self.cacheresult = None self.communicated = False def _open(self): """Open pipe if it not open""" if not self.pipe: self.pipe = Popen(self.command, stdout=PIPE, stdin=self.stdin(), stderr=self.stderr) def _defaultStdin(self): """Return default stdin""" return PIPE def getStdout(self): """Get current stdout""" self.close() return self.pipe.stdout def write(self,data): """Write to process stdin""" self._open() self.pipe.stdin.write(data) self.pipe.stdin.flush() def close(self): """Close stdin""" if self.pipe: self.pipe.stdin.close() def read(self): """Read all data""" self._open() if not self.cacheresult: self.cacheresult = self.pipe.communicate()[0] return self.cacheresult def readlines(self): """Read lines""" return self.read().split('\n') def __iter__(self): """Get iterator""" if not self.iter: self.iter = iter(self.readlines()) return self.iter def next(self): """Next string from stdout""" return self.__iter__().next() def returncode(self): """Get return code""" self._open() if self.pipe.returncode == None: self.cacheresult = self.pipe.communicate() return self.pipe.returncode def success(self): """Success or not""" return self.returncode() == 0 def failed(self): """Failed or not""" return self.returncode() != 0 def runOsCommand(cmd,inStr=None,ret_first=None,env_dict=None,ret_list=False): """Выполняет внешнюю программу Параметры: cmd внешняя программа inStr данные передаваемые программе на страндартный вход. ret_first вернуть только первую строку env_dict словарь переменных окружения Возвращаемые параметры: строка/строки которую выведет внешняя программа Возвращает код возврата, stdout+stderr """ pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_dict, close_fds=True, shell=True) fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr) # если есть данные на вход, передать их if inStr: fin.write(inStr) fin.close() # Код возврата retcode = pipe.wait() res = fout.readlines() fout.close() res += ferr.readlines() ferr.close() if res: if ret_list: if ret_first: return retcode, res[:1] return retcode, res elif len(res) == 1 or ret_first: return retcode, res[0].strip() else: return retcode, res return retcode, None def genpassword(passlen=9): '''Вернуть случайный набор символов указанной длины Параметры: passlen длина пароля который нужно сгенерировать Возвращаемые параметры: Сгенерированный пароль указанной длины ''' res=''.join([choice(string.ascii_letters+string.digits)\ for i in xrange(passlen)]) return res def fillstr(char, width): '''Заполнить строку указанным числом символов. Псеводоним символ*кол-во''' return str(char) * width def getpathenv(): """Вернуть пути для запуска утилит""" bindir=['/sbin','/bin','/usr/sbin','/usr/bin'] env=os.environ if env and env.has_key('PATH'): lpath=env['PATH'].split(":") npath=[] for dirname in bindir: if os.path.exists(dirname) and dirname not in lpath: npath.append(dirname) lpath=npath+lpath return ":".join(lpath) class MultiReplace: """MultiReplace function object Usage: replacer = MultiReplace({'str':'efg','in':'asd'}) s = replacer("string") """ def __init__(self, repl_dict): # string to string mapping; use a regular expression keys = repl_dict.keys() keys.sort(reverse=True) # lexical order pattern = u"|".join([re.escape(key) for key in keys]) self.pattern = re.compile(pattern) self.dict = repl_dict def replace(self, s): # apply replacement dictionary to string def repl(match, get=self.dict.get): item = match.group(0) return get(item, item) return self.pattern.sub(repl, s) __call__ = replace def str2dict(s): """Convert string to dictionary: String format: {'key1':'value1','key2':'val\'ue2'} """ value = r'(?:\\\\|\\\'|[^\'])' pair = r""" \s*' # begin key (%(v)s*) ' # end key \s*:\s* # delimeter key/value ' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reDict = re.compile(pair, re.X) reMatchDict = re.compile(""" ^{ # begin dict ((%(v)s,)* # many pair with comma at end %(v)s)? # pair without comma }$ # end dict """ % {'v':pair}, re.X) if reMatchDict.match(s.strip()): d = dict(reDict.findall(s)) replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) for i in d.keys(): d[i] = replaceSlash(d[i]) return d else: cl_overriding.printERROR(_("wrong dict value: %s"%s)) cl_overriding.exit(1) def str2list(s): """Convert string to list: String format: ['value1','val\'ue2'] """ value = r'(?:\\\\|\\\'|[^\'])' element = r""" \s*' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reList = re.compile(element, re.X) reMatchList = re.compile(""" ^\[ # begin dict ((%(v)s,)* # many elements with comma at end %(v)s)? # element without comma \]$ # end dict """ % {'v':element}, re.X) if reMatchList.match(s.strip()): replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) return [replaceSlash(i) for i in reList.findall(s)] else: cl_overriding.printERROR(_("wrong list value: %s"%s)) cl_overriding.exit(1) def list2str(list): """Convert list to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i)) for i in list ]) def dict2str(dict): """Convert dictionry to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \ for (k,v) in dict.items()]) def convertStrListDict(val): """Convert data between string, list, dict""" # if val is list if type(val) == types.ListType: return list2str(val) # if val is dictionary elif type(val) == types.DictType: return dict2str(val) # else it is string else: # detect dictionary if re.match("^{.*}$",val): return str2dict(val) # detect list elif re.match("^[.*]$",val): return str2list(val) # else is simple string else: return val def _toUNICODE(val): """Convert text to unicode""" if type(val) == types.UnicodeType: return val else: return str(val).decode('UTF-8') def getModeFile(nameFile, mode="all"): """Выдает информацию о файле mode=="all" права файла, владелец, группа файла mode=="mode" права файла mode=="owner" владелец, группа файла """ fst = os.lstat(nameFile) if mode == "all": return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid) if mode == "mode": return stat.S_IMODE(fst.st_mode) if mode == "owner": return fst.st_uid, fst.st_gid def chownR(directory, uid, gid): """изменяет владельца и группу для всех файлов и директорий внутри directory """ def chownPaths(rootPath, listPath, uid, gid): for chPath in listPath: chownPath = os.path.join(rootPath, chPath) statInfo = os.lstat(chownPath)[stat.ST_MODE] if stat.S_ISLNK(statInfo): os.lchown(chownPath, uid, gid) else: os.chown(chownPath, uid, gid) for root, dirs, files in os.walk(directory): # меняем владельца директории os.chown(root, uid, gid) # Меняем владельца директорий chownPaths(root, dirs, uid, gid) # Меняем владельца файлов chownPaths(root, files, uid, gid) return True def copyDir(srcDir, destDir): """Копируем директорию srcDir в destDir При копировании сохраняются владелец, группа, права """ def ignoreFile(path, names): """Игнорирование сокетов при копировании""" ignore = [] for name in names: if stat.S_ISSOCK(os.lstat(os.path.join(path, name))[stat.ST_MODE]): ignore.append(name) return ignore copytree(srcDir, destDir, ignore=ignoreFile) return True def removeDir(rmDir): """Рекурсивное удаление директории""" rmtree(rmDir) return True def isMount(path): """В случае монтирования директории выдает другой примонтированный путь""" absPath = os.path.abspath(path) return filter(lambda x: x!=absPath, reduce(lambda x,y: y, filter(lambda x: absPath in x, map(lambda x: [x[0], x[1]], map(lambda x: x.split(" "), open("/etc/mtab")))),[""]))[0] def pathJoin(*paths): """Складывает пути, в отличии от os.path.join, складывает абсолютные пути""" if len(paths)==1: return paths[0] return os.path.join(paths[0], reduce(os.path.join, map(lambda x: x.startswith("/") and x[1:] or x, paths[1:]),'')) def getUserPassword(flag="dialog", pwDialog=False): """Получить пароль у пользователя flag - опция "dalog" или "stdin" - откуда получаем пароль pwDialog - структура для вывода приглашения в режиме диалога """ userPwd = "" if flag == "dialog": if not pwDialog: pwDialog = [_("New password"), _("Retype new password")] pwdA = getpass.getpass(pwDialog[0]+":") pwdB = getpass.getpass(pwDialog[1]+":") elif flag == "stdin": pwdA = sys.stdin.readline().rstrip() pwdB = sys.stdin.readline().rstrip() else: cl_overriding.printERROR(_("ERROR in function getUserPassword, \ incorrect option 'flag=%s'")%flag) return False if not pwdA or not (pwdA == pwdB): cl_overriding.printERROR(_("ERROR") + ": " +\ _("password incorrect")+ ": " + _("try again")) return False userPwd = pwdA return userPwd def cmpVersion(v1,v2): """Compare versions specified by tuple or string""" if isinstance(v1,StringType): v1 = getTupleVersion(v1) if isinstance(v2,StringType): v2 = getTupleVersion(v2) return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]), (v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1])) def getTupleVersion(ver): """Get version specified by string as list: Example: 2.6.30 [(2,6,30),('r',0)] 2.6.31-r1 [(2,6,31),('r',1)] """ suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3, "rc": -1} def toTuple(v): return map(lambda x: suffix_value[x] if x in suffix_value else x, map(lambda x: int(x) if x.isdigit() else x, re.findall("r\d+$|\d+|[a-zA-Z+]+", v.replace('-SNAPSHOT','')))) vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups() vers = toTuple(vers) revision = toTuple(revision or "r0") return [vers,revision] def appendProgramToEnvFile(nameProg, objVar): """Append name program to variable cl-merges and save /etc/calculate/calculate.env """ if not objVar.AppendToList("cl_merges", nameProg, force=True): return False if not objVar.WriteList("cl_merges", force=True): return False return True def removeProgramToEnvFile(nameProg, objVar): """Remove name program from variable cl-merges and save /etc/calculate/calculate.env""" if not objVar.RemoveToList("cl_merges", nameProg, force=True): return False if not objVar.WriteList("cl_merges", force=True): return False return True