#-*- coding: utf-8 -*- # Copyright 2008-2010 Mir Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import string from random import choice import os import types import subprocess import stat from shutil import copytree, rmtree import ctypes class _error: # Здесь ошибки, если они есть error = [] def getError(self): """Выдать ошибки""" if not self.error: return False error = "" for e in self.error: error += e + "\n" return error def setError(self, error): """Установка ошибки""" self.error.append(error) return True class typeFile: """Получение типа файла""" __mgc = None __cookie = None def __init__(self): # Загружаем libmagic.so self.__mgc = ctypes.cdll.LoadLibrary("libmagic.so") # Создаем новый cookie (требуется для # работы с magic-последовательностями) # 0x10 | 0x400 = MAGIC_MIME (константа # декларируется в magic.h) self.__cookie = self.__mgc.magic_open(0x10 | 0x400) # Загружаем в __cookie # /etc/file/magic.mime (т.к. указано None) self.__mgc.magic_load(self.__cookie, None) def __del__(self): """Закрываем __cookie""" self.__mgc.magic_close(self.__cookie) def getMType(self,filename): """информация о типе файла""" result = self.__mgc.magic_file(self.__cookie,filename) # magic_file возвращает указатель const char*, # mimetype.value - это строка по указателю mimetype = ctypes.c_char_p(result) rez = mimetype.value return rez def isBinary(self,filename): """является ли файл бинарным""" mime = self.getMType(filename) # В случае ошибки if mime.count("`"): return mime elif mime.count("binary"): return True return False class scanDirectory: """Класс для cканирования директории""" def processingFile(self, path, prefix): """Обработка в случае файла""" return True def processingDirectory(self, path, prefix): """Обработка в случае директории если возвращаем None то пропуск дир.""" return True def scanningDirectory(self, scanDir, skipFile=[], skipDir=[], prefix=None, flagDir=False): """Сканирование и обработка шаблонов в директории scanDir""" ret = True if not prefix: prefix = os.path.join(scanDir,"")[:-1] if not flagDir: # проверка корневой директории retDir = self.processingDirectory(scanDir, scanDir) if retDir is None: return None elif retDir is False: return False if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]): for fileOrDir in sorted(os.listdir(scanDir)): absPath = os.path.join(scanDir,fileOrDir) relPath = absPath.split(prefix)[1] stInfo = os.lstat(absPath) statInfo = stInfo[stat.ST_MODE] if stat.S_ISREG(statInfo): # Обработка файла if relPath in skipFile: continue if not self.processingFile(absPath, prefix): ret = False break elif stat.S_ISDIR(statInfo): # Обработка директории if relPath in skipDir: continue retDir = self.processingDirectory(absPath, prefix) if retDir is None: continue elif retDir is False: ret = False break ret = self.scanningDirectory(absPath, skipFile, skipDir, prefix, True) if ret is False: break return ret def runOsCommand(cmd, inStr=None, ret_first=None, env_dict=None): """Выполняет внешнюю программу Параметры: cmd внешняя программа inStr данные передаваемые программе на страндартный вход. ret_first вернуть только первую строку env_dict словарь переменных окружения Возвращаемые параметры: строка/строки которую выведет внешняя программа Возвращает код возврата, stdout+stderr """ pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_dict, close_fds=True, shell=True) fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr) # если есть данные на вход, передать их if inStr: fin.write(inStr) fin.close() # Код возврата retcode = pipe.wait() res = fout.readlines() fout.close() res += ferr.readlines() ferr.close() if res: if len(res) == 1 or ret_first: return retcode, res[0].strip() else: return retcode, res return retcode, None def genpassword(passlen=9): '''Вернуть случайный пассворд указанной длины Параметры: passlen длина пароля который нужно сгенерировать Возвращаемые параметры: Сгенерированный пароль указанной длины ''' res=''.join([choice(string.ascii_letters+string.digits)\ for i in xrange(passlen)]) return res def fillstr(char, width): '''Заполнить строку указанным числом символов. Псеводоним символ*кол-во''' return str(char) * width def getpathenv(): """Вернуть пути для запуска утилит""" bindir=['/sbin','/bin','/usr/sbin','/usr/bin'] env=os.environ if env and env.has_key('PATH'): lpath=env['PATH'].split(":") npath=[] for dirname in bindir: if os.path.exists(dirname) and dirname not in lpath: npath.append(dirname) lpath=npath+lpath return ":".join(lpath) def list2str(list): '''Функция переводит список в строку''' return '['+','.join(list)+']' def str2list(s): '''Функция переводит строку в список''' return s[1:-1].split(',') def dict2str(dict): '''Функция перводит словарь в строку''' return '{'+','.join(["%s:%s" % (str(k),str(v)) \ for (k,v) in dict.items()])+'}' #: def str2dict(s): '''Функция переводит строку в словарь''' dict = {} for i in s[1:-1].split(','): k,v = i.split(':') dict[k] = v return dict def convertStrListDict(val): '''Функция определеяется что на входе (строка, список, словарь) и переводит их в строку и обратно''' # если подан список if type(val) == types.ListType: return list2str(val) # если подан словарь elif type(val) == types.DictType: return dict2str(val) # если подана строка else: # если поданная строка содержит словарь if ':' in val and '{' in val: return str2dict(val) # если поданная строка содержит список elif ',' in val and '[' in val: return str2list(val) # если это просто строка else: return val def _toUNICODE(val): """перевод текста в юникод""" if type(val) == types.UnicodeType: return val else: return str(val).decode('UTF-8') def getModeFile(nameFile, mode="all"): """Выдает информацию о файле mode=="all" права файла, владелец, группа файла mode=="mode" права файла mode=="owner" владелец, группа файла """ fst = os.lstat(nameFile) if mode == "all": return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid) if mode == "mode": return stat.S_IMODE(fst.st_mode) if mode == "owner": return fst.st_uid, fst.st_gid def chownR(directory, uid, gid): """изменяет владельца и группу для всех файлов и директорий внутри directory """ def chownPaths(rootPath, listPath, uid, gid): for chPath in listPath: chownPath = os.path.join(rootPath, chPath) statInfo = os.lstat(chownPath)[stat.ST_MODE] if stat.S_ISLNK(statInfo): os.lchown(chownPath, uid, gid) else: os.chown(chownPath, uid, gid) for root, dirs, files in os.walk(directory): # меняем владельца директории os.chown(root, uid, gid) # Меняем владельца директорий chownPaths(root, dirs, uid, gid) # Меняем владельца файлов chownPaths(root, files, uid, gid) return True def copyDir(srcDir, destDir): """Копируем директорию srcDir в destDir При копировании сохраняются владелец, группа, права """ def ignoreFile(path, names): """Игнорирование сокетов при копировании""" ignore = [] for name in names: if stat.S_ISSOCK(os.lstat(os.path.join(path, name))[stat.ST_MODE]): ignore.append(name) return ignore copytree(srcDir, destDir, ignore=ignoreFile) return True def removeDir(rmDir): """Рекурсивное удаление директории""" rmtree(rmDir) return True