#-*- coding: utf-8 -*- # Copyright 2008-2010 Mir Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import string from random import choice import os import types import subprocess import stat from shutil import copytree, rmtree import ctypes import cl_overriding import re class _error: # Здесь ошибки, если они есть error = [] def getError(self): """Выдать ошибки""" if not self.error: return False error = "" for e in self.error: error += e + "\n" return error def setError(self, error): """Установка ошибки""" self.error.append(error) return True class typeFile: """Получение типа файла""" __mgc = None __cookie = None def __init__(self,magic=0x410): # Загружаем libmagic.so self.__mgc = ctypes.cdll.LoadLibrary("libmagic.so") # Создаем новый cookie (требуется для # работы с magic-последовательностями) # 0x10 | 0x400 = MAGIC_MIME (константа # декларируется в magic.h) self.__cookie = self.__mgc.magic_open(magic) # Загружаем в __cookie # /etc/file/magic.mime (т.к. указано None) self.__mgc.magic_load(self.__cookie, None) def __del__(self): """Закрываем __cookie""" self.__mgc.magic_close(self.__cookie) def getMType(self,filename): """информация о типе файла""" result = self.__mgc.magic_file(self.__cookie,filename) # magic_file возвращает указатель const char*, # mimetype.value - это строка по указателю mimetype = ctypes.c_char_p(result) rez = mimetype.value return rez def isBinary(self,filename): """является ли файл бинарным""" mime = self.getMType(filename) # В случае ошибки if mime.count("`"): return mime elif mime.count("binary"): return True return False class scanDirectory: """Класс для cканирования директории""" def processingFile(self, path, prefix): """Обработка в случае файла""" return True def processingDirectory(self, path, prefix): """Обработка в случае директории если возвращаем None то пропуск дир.""" return True def scanningDirectory(self, scanDir, skipFile=[], skipDir=[], prefix=None, flagDir=False): """Сканирование и обработка шаблонов в директории scanDir""" ret = True if not prefix: prefix = os.path.join(scanDir,"")[:-1] if not flagDir: # проверка корневой директории retDir = self.processingDirectory(scanDir, scanDir) if retDir is None: return None elif retDir is False: return False if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]): for fileOrDir in sorted(os.listdir(scanDir)): absPath = os.path.join(scanDir,fileOrDir) relPath = absPath.split(prefix)[1] stInfo = os.lstat(absPath) statInfo = stInfo[stat.ST_MODE] if stat.S_ISREG(statInfo): # Обработка файла if relPath in skipFile: continue if not self.processingFile(absPath, prefix): ret = False break elif stat.S_ISDIR(statInfo): # Обработка директории if relPath in skipDir: continue retDir = self.processingDirectory(absPath, prefix) if retDir is None: continue elif retDir is False: ret = False break ret = self.scanningDirectory(absPath, skipFile, skipDir, prefix, True) if ret is False: break return ret def runOsCommand(cmd, inStr=None, ret_first=None, env_dict=None): """Выполняет внешнюю программу Параметры: cmd внешняя программа inStr данные передаваемые программе на страндартный вход. ret_first вернуть только первую строку env_dict словарь переменных окружения Возвращаемые параметры: строка/строки которую выведет внешняя программа Возвращает код возврата, stdout+stderr """ pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_dict, close_fds=True, shell=True) fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr) # если есть данные на вход, передать их if inStr: fin.write(inStr) fin.close() # Код возврата retcode = pipe.wait() res = fout.readlines() fout.close() res += ferr.readlines() ferr.close() if res: if len(res) == 1 or ret_first: return retcode, res[0].strip() else: return retcode, res return retcode, None def genpassword(passlen=9): '''Вернуть случайный набор символов указанной длины Параметры: passlen длина пароля который нужно сгенерировать Возвращаемые параметры: Сгенерированный пароль указанной длины ''' res=''.join([choice(string.ascii_letters+string.digits)\ for i in xrange(passlen)]) return res def fillstr(char, width): '''Заполнить строку указанным числом символов. Псеводоним символ*кол-во''' return str(char) * width def getpathenv(): """Вернуть пути для запуска утилит""" bindir=['/sbin','/bin','/usr/sbin','/usr/bin'] env=os.environ if env and env.has_key('PATH'): lpath=env['PATH'].split(":") npath=[] for dirname in bindir: if os.path.exists(dirname) and dirname not in lpath: npath.append(dirname) lpath=npath+lpath return ":".join(lpath) class MultiReplace: """MultiReplace function object Usage: replacer = MultiReplace({'str':'efg','in':'asd'}) s = replacer("string") """ def __init__(self, repl_dict): # string to string mapping; use a regular expression keys = repl_dict.keys() keys.sort(reverse=True) # lexical order pattern = u"|".join([re.escape(key) for key in keys]) self.pattern = re.compile(pattern) self.dict = repl_dict def replace(self, s): # apply replacement dictionary to string def repl(match, get=self.dict.get): item = match.group(0) return get(item, item) return self.pattern.sub(repl, s) __call__ = replace def str2dict(s): """Convert string to dictionary: String format: {'key1':'value1','key2':'val\'ue2'} """ value = r'(?:\\\\|\\\'|[^\'])' pair = r""" \s*' # begin key (%(v)s*) ' # end key \s*:\s* # delimeter key/value ' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reDict = re.compile(pair, re.X) reMatchDict = re.compile(""" ^{ # begin dict ((%(v)s,)* # many pair with comma at end %(v)s)? # pair without comma }$ # end dict """ % {'v':pair}, re.X) if reMatchDict.match(s.strip()): d = dict(reDict.findall(s)) replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) for i in d.keys(): d[i] = replaceSlash(d[i]) return d else: cl_overriding.printERROR(_("wrong dict value: %s"%s)) cl_overriding.exit(1) def str2list(s): """Convert string to list: String format: ['value1','val\'ue2'] """ value = r'(?:\\\\|\\\'|[^\'])' element = r""" \s*' # begin value (%(v)s*) '\s* # end value """ % {"v":value} reList = re.compile(element, re.X) reMatchList = re.compile(""" ^\[ # begin dict ((%(v)s,)* # many elements with comma at end %(v)s)? # element without comma \]$ # end dict """ % {'v':element}, re.X) if reMatchList.match(s.strip()): replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''}) return [replaceSlash(i) for i in reList.findall(s)] else: cl_overriding.printERROR(_("wrong list value: %s"%s)) cl_overriding.exit(1) def list2str(list): """Convert list to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i)) for i in list ]) def dict2str(dict): """Convert dictionry to string Return string with escaped \ and '. """ replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''}) return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \ for (k,v) in dict.items()]) def convertStrListDict(val): """Convert data between string, list, dict""" # if val is list if type(val) == types.ListType: return list2str(val) # if val is dictionary elif type(val) == types.DictType: return dict2str(val) # else it is string else: # detect dictionary if re.match("^{.*}$",val): return str2dict(val) # detect list elif re.match("^[.*]$",val): return str2list(val) # else is simple string else: return val def _toUNICODE(val): """Convert text to unicode""" if type(val) == types.UnicodeType: return val else: return str(val).decode('UTF-8') def getModeFile(nameFile, mode="all"): """Выдает информацию о файле mode=="all" права файла, владелец, группа файла mode=="mode" права файла mode=="owner" владелец, группа файла """ fst = os.lstat(nameFile) if mode == "all": return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid) if mode == "mode": return stat.S_IMODE(fst.st_mode) if mode == "owner": return fst.st_uid, fst.st_gid def chownR(directory, uid, gid): """изменяет владельца и группу для всех файлов и директорий внутри directory """ def chownPaths(rootPath, listPath, uid, gid): for chPath in listPath: chownPath = os.path.join(rootPath, chPath) statInfo = os.lstat(chownPath)[stat.ST_MODE] if stat.S_ISLNK(statInfo): os.lchown(chownPath, uid, gid) else: os.chown(chownPath, uid, gid) for root, dirs, files in os.walk(directory): # меняем владельца директории os.chown(root, uid, gid) # Меняем владельца директорий chownPaths(root, dirs, uid, gid) # Меняем владельца файлов chownPaths(root, files, uid, gid) return True def copyDir(srcDir, destDir): """Копируем директорию srcDir в destDir При копировании сохраняются владелец, группа, права """ def ignoreFile(path, names): """Игнорирование сокетов при копировании""" ignore = [] for name in names: if stat.S_ISSOCK(os.lstat(os.path.join(path, name))[stat.ST_MODE]): ignore.append(name) return ignore copytree(srcDir, destDir, ignore=ignoreFile) return True def removeDir(rmDir): """Рекурсивное удаление директории""" rmtree(rmDir) return True def isMount(path): """В случае монтирования директории выдает другой примонтированный путь""" absPath = os.path.abspath(path) return filter(lambda x: x!=absPath, reduce(lambda x,y: y, filter(lambda x: absPath in x, map(lambda x: [x[0], x[1]], map(lambda x: x.split(" "), open("/etc/mtab")))),[""]))[0]