25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/cl_utils.py

1523 lines
50 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import string
from random import choice
import os
13 년 전
from os import path
import types
import subprocess
from subprocess import Popen,PIPE,STDOUT
14 년 전
import stat
from shutil import copytree, rmtree
import cl_overriding
import re
14 년 전
import sys
import getpass
from types import StringType
import tarfile
try:
from magic import (open as type_file, MAGIC_NONE as MAGIC_NONE,
MAGIC_CONTINUE)
except ImportError:
try:
from magic import (open as type_file, NONE as MAGIC_NONE,
CONTINUE as MAGIC_CONTINUE)
except:
type_file = None
MAGIC_NONE = None
MAGIC_CONTINUE = None
14 년 전
import cl_lang
tr = cl_lang.lang()
tr.setLocalDomain('cl_lib')
tr.setLanguage(sys.modules[__name__])
class _error:
# Здесь ошибки, если они есть
error = []
def getError(self):
"""Выдать ошибки"""
if not self.error:
return False
error = ""
for e in self.error:
error += e + "\n"
return error
def setError(self, error):
"""Установка ошибки"""
self.error.append(error)
return True
class _warning:
# Здесь предупреждения
warning = []
def getWarning(self):
"""Выдать ошибки"""
if not self.warning:
return False
warning = ""
for w in self.warning:
warning += w + "\n"
return warning
def setWarning(self, warning):
"""Установка ошибки"""
self.warning.append(warning)
return True
class proxy_type_file:
def __init__(self,flags):
self.flags = flags
def load(self):
pass
def setflags(self,flags):
self.flags = flags
def close(self):
pass
def file(self,filename):
13 년 전
if path.exists(filename):
if self.flags == 0x410:
processFile = process("file","-bi",filename)
if processFile.success():
return processFile.read().rstrip()
else:
processFile = process("file","-bkz",filename)
if processFile.success():
return processFile.read().rstrip()
return None
14 년 전
class typeFile:
"""Получение типа файла"""
14 년 전
def __init__(self, magic=0x410):
if type_file is None:
self.magicObject = proxy_type_file(MAGIC_NONE)
else:
self.magicObject = type_file(MAGIC_NONE)
self.magicObject.load()
self.magicObject.setflags(magic)
14 년 전
def __del__(self):
"""Закрываем magic"""
self.magicObject.close()
14 년 전
def getMType(self, filename):
"""Информация о типе файла"""
try:
ret = self.magicObject.file(filename)
except UnicodeDecodeError:
try:
ret = self.magicObject.file(filename.decode('utf-8'))
except UnicodeDecodeError:
return None
# fix for kernel 3.7.7 (bad work samba with big files)
if ret is None and self.magicObject.errno() == 5:
r,w = os.pipe()
devnull = os.open(os.devnull,os.O_WRONLY)
cat = subprocess.Popen(['/bin/cat',filename],stdout=w,
stderr=devnull,close_fds=True)
ret = self.magicObject.descriptor(r)
os.close(w)
os.close(devnull)
cat.wait()
return ret
14 년 전
def isBinary(self, filename):
14 년 전
"""является ли файл бинарным"""
mime = self.getMType(filename)
14 년 전
# В случае ошибки
14 년 전
if mime.count("`"):
return mime
elif mime.count("binary"):
return True
return False
14 년 전
class scanDirectory:
"""Класс для cканирования директории"""
13 년 전
def processingFile(self, pathname, prefix):
14 년 전
"""Обработка в случае файла"""
return True
13 년 전
def processingDirectory(self, pathname, prefix):
14 년 전
"""Обработка в случае директории если возвращаем None то пропуск дир."""
return True
def scanningDirectory(self, scanDir, skipFile=[], skipDir=[],
prefix=None, flagDir=False):
"""Сканирование и обработка шаблонов в директории scanDir"""
ret = True
if not prefix:
13 년 전
prefix = path.join(scanDir,"")[:-1]
14 년 전
if not flagDir:
# проверка корневой директории
retDir = self.processingDirectory(scanDir, scanDir)
if retDir is None:
return None
elif retDir is False:
return False
if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]):
13 년 전
for absPath in sorted(listDirectory(scanDir,fullPath=True)):
14 년 전
relPath = absPath.split(prefix)[1]
stInfo = os.lstat(absPath)
statInfo = stInfo[stat.ST_MODE]
if stat.S_ISREG(statInfo):
# Обработка файла
if relPath in skipFile:
continue
if not self.processingFile(absPath, prefix):
13 년 전
return False
14 년 전
elif stat.S_ISDIR(statInfo):
# Обработка директории
if relPath in skipDir:
continue
retDir = self.processingDirectory(absPath, prefix)
if retDir is None:
continue
elif retDir is False:
13 년 전
return False
14 년 전
ret = self.scanningDirectory(absPath, skipFile,
skipDir, prefix, True)
if ret is False:
13 년 전
return False
14 년 전
return ret
14 년 전
class process:
"""Execute system command by Popen
Examples:
execute program and get result:
if process("/bin/gzip","/boot/somefile").success():
print "Gzip success"
unzip and process unzip data by cpio (list files):
processGzip = process("/bin/gzip","-dc","/boot/initrd")
processCpio = process("/bin/cpio","-tf",stdin=processGzip)
filelist = processCpio.readlines()
execute command and send data:
processGrub = process("/sbin/grub")
processGrub.write("root (hd0,0)\n")
processGrub.write("setup (hd0)\n")
processGrub.write("quit\n")
isok = processGrub.success()
union stdout and stderr:
process("/bin/ls","/",stderr=STDOUT)
result to stdout:
process("/bin/ls",stdout=None)
get data from keyboard:
process("/bin/cat",stdin=None)
"""
def __init__(self,command,*params,**kwarg):
if not "stdin" in kwarg:
stdin=self._defaultStdin
else:
if kwarg["stdin"] == None:
stdin = self._keyboardStdin
else:
stdin=kwarg["stdin"].getStdout
self.stdout = kwarg.get("stdout",PIPE)
self.envdict = kwarg.get("envdict",{})
if "lang" in kwarg:
self.envdict["LANG"] = kwarg.get('lang')
self.langc = "langc" in kwarg
self.stderr = kwarg.get("stderr",PIPE)
self.command = [command] + list(params)
self.stdin = stdin
self.iter = None
self.pipe = None
self.cacheresult = None
self.communicated = False
def _open(self):
"""Open pipe if it not open"""
if not self.pipe:
self.pipe = Popen(self.command,
stdout=self.stdout,
stdin=self.stdin(),
stderr=self.stderr,
env=self.envdict)
def _defaultStdin(self):
"""Return default stdin"""
return PIPE
def _keyboardStdin(self):
"""Return keyboard stdin"""
return None
def getStdout(self):
"""Get current stdout"""
self.close()
self._open()
return self.pipe.stdout
def write(self,data):
"""Write to process stdin"""
self._open()
self.pipe.stdin.write(data)
self.pipe.stdin.flush()
def close(self):
"""Close stdin"""
if self.pipe:
self.pipe.stdin.close()
def read(self):
"""Read all data"""
try:
self._open()
if self.cacheresult is None:
self.cacheresult = self.pipe.communicate()[0]
return self.cacheresult
except KeyboardInterrupt:
self.kill()
raise KeyboardInterrupt
def readlines(self):
"""Read lines"""
return self.read().split('\n')
def __iter__(self):
"""Get iterator"""
if not self.iter:
self.iter = iter(self.readlines())
return self.iter
def kill(self):
"""Kill this process"""
if self.pipe:
self.pipe.kill()
def next(self):
"""Next string from stdout"""
return self.__iter__().next()
def returncode(self):
"""Get return code"""
self.read()
return self.pipe.returncode
def success(self):
"""Success or not"""
return self.returncode() == 0
def failed(self):
"""Failed or not"""
return self.returncode() != 0
class processProgress(process):
"""Execute system command by Popen for parse stdout."""
def __init__(self,command,*params,**kwarg):
process.__init__(self,command,*params,**kwarg)
self.readsize = kwarg.get("readsize",10)
self.init(**kwarg)
def init(self,**kwarg):
pass
def read(self):
"""Read data with parsing ability"""
try:
self.processInit()
self._open()
if self.cacheresult is None:
self.cacheresult = []
self.buf = ""
part = self.pipe.stdout.read(1)
while part:
if self.buf:
self.buf += part
else:
self.buf = part
if self.processStdout():
self.processDraw()
self.cacheresult.append(part)
part = self.pipe.stdout.read(self.readsize)
self.pipe.poll()
self.processEnd(self.success())
except KeyboardInterrupt:
self.cacheresult = "".join(self.cacheresult)
self.pipe.kill()
self.processEnd(False)
raise KeyboardInterrupt()
self.cacheresult = "".join(self.cacheresult)
return self.cacheresult
def processInit(self):
"""Called when read first byte"""
pass
def processDraw(self):
"""Called when processStdout return True"""
pass
def processStdout(self):
"""Called when read readsize byte from stdout"""
return True
def processEnd(self,res=True):
"""Called when process end"""
pass
def runOsCommand(cmd,in_str=None, env_dict=None):
"""Run system command
Параметры:
cmd внешняя программа
in_str данные передаваемые программе на страндартный вход.
env_dict словарь переменных окружения
Возвращает (код возврата, список stdout+stderr)
"""
pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_dict,
close_fds=True,
shell=True)
fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr)
# если есть данные на вход, передать их
if in_str:
fin.write(in_str)
fin.close()
14 년 전
res = map(lambda x: x.rstrip(), fout.readlines())
fout.close()
14 년 전
res += map(lambda x: x.rstrip(), ferr.readlines())
ferr.close()
# Код возврата
retcode = pipe.wait()
return retcode, res
def genpassword(passlen=9):
"""Return random charset specified lenght (passlen)"""
return ''.join(map(lambda x:choice(string.ascii_letters+string.digits),
xrange(0,passlen)))
def getpathenv():
"""Return path for run utilities"""
bindir=set(filter(path.exists,
['/sbin','/bin','/usr/sbin','/usr/bin']))
env=os.environ
envPath = set(env.get('PATH','').split(":")) | bindir
return ":".join(envPath)
class MultiReplace:
"""MultiReplace function object
Usage:
replacer = MultiReplace({'str':'efg','in':'asd'})
s = replacer("string")
"""
def __init__(self, repl_dict):
# string to string mapping; use a regular expression
keys = repl_dict.keys()
keys.sort(reverse=True) # lexical order
pattern = u"|".join([re.escape(key) for key in keys])
self.pattern = re.compile(pattern)
self.dict = repl_dict
def replace(self, s):
# apply replacement dictionary to string
def repl(match, get=self.dict.get):
item = match.group(0)
return get(item, item)
return self.pattern.sub(repl, s)
__call__ = replace
def str2dict(s):
"""Convert string to dictionary:
String format:
{'key1':'value1','key2':'val\'ue2'}
"""
value = r'(?:\\\\|\\\'|[^\'])'
pair = r"""
\s*' # begin key
(%(v)s*)
' # end key
\s*:\s* # delimeter key/value
' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reDict = re.compile(pair, re.X)
reMatchDict = re.compile("""
^{ # begin dict
((%(v)s,)* # many pair with comma at end
%(v)s)? # pair without comma
}$ # end dict
""" % {'v':pair}, re.X)
if reMatchDict.match(s.strip()):
d = dict(reDict.findall(s))
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
for i in d.keys():
d[i] = replaceSlash(d[i])
return d
else:
cl_overriding.printERROR(_("wrong dictionary value: %s"%s))
cl_overriding.exit(1)
def str2list(s):
"""Convert string to list:
String format:
['value1','val\'ue2']
"""
value = r'(?:\\\\|\\\'|[^\'])'
element = r"""
\s*' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reList = re.compile(element, re.X)
reMatchList = re.compile("""
^\[ # begin dict
((%(v)s,)* # many elements with comma at end
%(v)s)? # element without comma
\]$ # end dict
""" % {'v':element}, re.X)
if reMatchList.match(s.strip()):
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
return [replaceSlash(i) for i in reList.findall(s)]
else:
cl_overriding.printERROR(_("wrong list value: %s"%s))
cl_overriding.exit(1)
def list2str(list):
"""Convert list to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i))
for i in list ])
def dict2str(dict):
"""Convert dictionry to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \
for (k,v) in dict.items()])
def convertStrListDict(val):
"""Convert data between string, list, dict"""
# if val is list
if type(val) == types.ListType:
return list2str(val)
# if val is dictionary
elif type(val) == types.DictType:
return dict2str(val)
# else it is string
else:
# detect dictionary
if re.match("^{.*}$",val):
return str2dict(val)
# detect list
elif re.match("^\[.*\]$",val):
return str2list(val)
# else is simple string
else:
return val
def _toUNICODE(val):
"""Convert text to unicode"""
if type(val) == types.UnicodeType:
return val
else:
return str(val).decode('UTF-8')
14 년 전
def getModeFile(nameFile, mode="all"):
"""Выдает информацию о файле
14 년 전
mode=="all"
права файла, владелец, группа файла
mode=="mode"
права файла
mode=="owner"
владелец, группа файла
"""
14 년 전
fst = os.lstat(nameFile)
if mode == "all":
return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid)
if mode == "mode":
return stat.S_IMODE(fst.st_mode)
if mode == "owner":
return fst.st_uid, fst.st_gid
14 년 전
def chownR(directory, uid, gid):
13 년 전
"""Recusive chown"""
14 년 전
def chownPaths(rootPath, listPath, uid, gid):
for chPath in listPath:
13 년 전
chownPath = path.join(rootPath, chPath)
14 년 전
statInfo = os.lstat(chownPath)[stat.ST_MODE]
if stat.S_ISLNK(statInfo):
os.lchown(chownPath, uid, gid)
else:
os.chown(chownPath, uid, gid)
for root, dirs, files in os.walk(directory):
# меняем владельца директории
os.chown(root, uid, gid)
# Меняем владельца директорий
chownPaths(root, dirs, uid, gid)
# Меняем владельца файлов
chownPaths(root, files, uid, gid)
return True
def copyDir(srcDir, destDir):
"""Копируем директорию srcDir в destDir
При копировании сохраняются владелец, группа, права
"""
13 년 전
def ignoreFile(pathname, names):
14 년 전
"""Игнорирование сокетов при копировании"""
ignore = []
for name in names:
13 년 전
if stat.S_ISSOCK(os.lstat(path.join(pathname, name))[stat.ST_MODE]):
14 년 전
ignore.append(name)
return ignore
copytree(srcDir, destDir, ignore=ignoreFile)
return True
def removeDir(rmDir):
"""Рекурсивное удаление директории"""
rmtree(rmDir)
return True
def getRunCommands():
"""List run program"""
def getCmd(procNum):
cmdLineFile = '/proc/%s/cmdline'%procNum
try:
13 년 전
if path.exists(cmdLineFile):
return open(cmdLineFile,'r').read().strip()
except:
pass
return ""
if not os.access('/proc',os.R_OK):
return []
13 년 전
return map(getCmd,
filter(lambda x:x.isdigit(),
listDirectory('/proc')))
13 년 전
def isFstabMount(pathname,mapDevUuid={},listFstab=[]):
"""Get mount point or device from fstab"""
def removeQuotes(s):
return s.replace('"','').replace("'","")
if pathname == "swap":
absPath = "swap"
else:
13 년 전
absPath = path.abspath(pathname)
if not mapDevUuid:
mapDevUuid.update(getUUIDDict())
# convert fstab to
# [['/dev/sda3', '/', '', 'reiserfs', 'noatime', '', '', '0', '2\n'],
# ['/dev/sda5', '/var/calculate', 'reiserfs', 'noatime', '0', '0\n']]
13 년 전
#if not listFstab:
if not listFstab:
listFstab.extend(
map(lambda x: [mapDevUuid.get(removeQuotes(x[0]),x[0]),
x[1] if x[2] != "swap" else "swap"],
filter(lambda x: len(x) >= 4,
map(lambda x: filter(lambda x: x,
x.replace('\t',' ').split(' ')),
filter(lambda x: not x.startswith('#') and x.strip(),
13 년 전
open("/etc/fstab"))))))
# get mount point or device or dir
return filter(lambda x: x!=absPath,
reduce(lambda x,y: y,
filter(lambda x: absPath in x and x[1] != "none",
13 년 전
listFstab),[""]))[0]
class SingletonParam(type):
def __init__(cls, name, bases, dict):
super(SingletonParam, cls).__init__(name, bases, dict)
cls.instance = {}
def __call__(cls,*args,**kw):
keyarg = args[0] if args else ""
if not keyarg in cls.instance:
cls.instance[keyarg] = \
super(SingletonParam, cls).__call__(*args, **kw)
return cls.instance[keyarg]
class FStab(object):
"""Data reader for fstab"""
__metaclass__ = SingletonParam
fstab_file = '/etc/fstab'
NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0,6)
def __init__(self,fstab_file=None):
if fstab_file:
self.fstab_file = fstab_file
self.cache = []
self.rotateCache = []
self.dictUUID = getUUIDDict()
self.rebuildCache()
def rebuildCache(self):
"""Rebuild cache from fstab file"""
self.cache = \
map(lambda x:map(lambda y:y.strip(),x.split()),
filter(lambda x:x and not x.lstrip().startswith("#"),
open(self.fstab_file,'r').read().split('\n')))
for data in self.cache:
convertDev = lambda x: path.realpath(x) if x.startswith('/') else x
data[0] = getUdevDeviceInfo(
name=convertDev(self.dictUUID.get(data[0],data[0]))
).get('DEVNAME','')
data[1] = data[1] if data[2] != "swap" else "swap"
self.rotateCache = zip(*self.cache)
def getBy(self,what=DIR,where=NAME,eq=None,noteq=None,allentry=False):
"""Get data from fstab"""
if not eq is None:
filterfunc = lambda x: x[where] == eq
else:
filterfunc = lambda x: x[where] != noteq
res = map(lambda x:x[what],filter(filterfunc,self.cache))
if allentry:
return res
else:
return "" if not res else res[-1]
def getFields(self,*fields):
"""Get all data by specifie fields"""
return zip(*reduce(lambda x,y:x+[self.rotateCache[y]],fields,[]))
def isExists(self,what=DIR,eq=None,noteq=None):
"""Field with condition exist in fstab"""
if not eq is None:
filterfunc = lambda x: x[what] == eq
else:
filterfunc = lambda x: x[what] != noteq
return bool(filter(filterfunc,self.cache))
13 년 전
def isMount(pathname):
"""В случае монтирования директории выдает другой примонтированный путь"""
13 년 전
absPath = path.abspath(pathname)
mtabFile = '/etc/mtab'
if not os.access(mtabFile,os.R_OK):
return ""
return filter(lambda x: x!=absPath,
13 년 전
reduce(lambda x,y: y,
filter(lambda x: absPath in x,
map(lambda x: [x[0], x[1]],
map(lambda x: x.split(" "),
open(mtabFile)))), [""]))[0]
def commonPath(*paths):
"""Return common path from list of paths"""
13 년 전
paths = map(lambda x:path.normpath(x).split('/'),paths)
res = map(lambda x:x[0],
filter(lambda x:filter(lambda y:x[0]==y,x[1:]),zip(*paths)))
return "/".join(res)
13 년 전
def childMounts(pathname):
"""Get all mount points which contain path"""
13 년 전
if pathname != "none":
absPath = path.abspath(pathname)
else:
13 년 전
absPath = pathname
mtabFile = '/etc/mtab'
if not os.access(mtabFile,os.R_OK):
return ""
return reduce(lambda x,y: x + [y],
filter(lambda x: commonPath(absPath,x[0])==absPath or \
commonPath(absPath,x[1])==absPath,
13 년 전
map(lambda x: [x[0], x[1]],
map(lambda x: x.split(" "),
open(mtabFile)))),
[])
def pathJoin(*paths):
"""Складывает пути, в отличии от os.path.join, складывает абсолютные пути"""
if len(paths)==1:
return paths[0]
13 년 전
return reduce(path.join,
filter(lambda x:x and x != "/",
13 년 전
map(lambda x: x.startswith("/") and x[1:] or x,
paths[1:])),paths[0])
14 년 전
def getUserPassword(flag="dialog", pwDialog=False):
"""Получить пароль у пользователя
flag - опция "dalog" или "stdin" - откуда получаем пароль
pwDialog - структура для вывода приглашения в режиме диалога
"""
userPwd = ""
if flag == "dialog":
if not pwDialog:
pwDialog = [_("New password"),
_("Retype the new password")]
14 년 전
pwdA = getpass.getpass(pwDialog[0]+":")
pwdB = getpass.getpass(pwDialog[1]+":")
elif flag == "stdin":
pwdA = sys.stdin.readline().rstrip()
pwdB = sys.stdin.readline().rstrip()
else:
cl_overriding.printERROR(_("ERROR in function getUserPassword, \
incorrect option 'flag=%s'")%flag)
return False
if not pwdA or not (pwdA == pwdB):
cl_overriding.printERROR(_("ERROR") + ": " +\
_("wrong password")+ ": " + _("try again"))
14 년 전
return False
userPwd = pwdA
return userPwd
def cmpVersion(v1,v2):
"""Compare versions specified by tuple or string"""
if isinstance(v1,StringType):
v1 = getTupleVersion(v1)
if isinstance(v2,StringType):
v2 = getTupleVersion(v2)
return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]),
(v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1]))
def getTupleVersion(ver):
"""Get version specified by string as list:
Example:
2.6.30 [(2,6,30),('r',0)]
2.6.31-r1 [(2,6,31),('r',1)]
"""
suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3,
"rc": -1}
def toTuple(v):
return map(lambda x: suffix_value[x] if x in suffix_value else x,
map(lambda x: int(x) if x.isdigit() else x,
re.findall("r\d+$|\d+|[a-zA-Z+]+",
v.replace('-SNAPSHOT',''))))
vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups()
vers = toTuple(vers)
revision = toTuple(revision or "r0")
return [vers,revision]
def appendProgramToEnvFile(nameProg, objVar):
"""Append name program to variable cl-merges and
save /etc/calculate/calculate.env """
if not objVar.AppendToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def removeProgramToEnvFile(nameProg, objVar):
"""Remove name program from variable cl-merges and save
/etc/calculate/calculate.env"""
if not objVar.RemoveToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def checkDigestFile(digestfile):
"""Check digest by digestfile"""
reEntry = re.compile(r"# (\S+) HASH\n(\S+) (\S+)",re.S)
result = []
for alg,hashdata,filename in \
reEntry.findall(open(digestfile,'r').read()):
if hasattr(hashlib,alg.lower()):
hashobj = getattr(hashlib,alg.lower())
13 년 전
filename = path.join(path.dirname(digestfile),filename)
if os.path.exists(filename):
digest = hashobj(open(filename,'r').read())
result.append((alg,
digest.hexdigest().upper() == hashdata.upper()))
return result
def getFilesCount(directory):
"""Get files count from directory"""
13 년 전
if path.exists(directory):
return reduce(lambda x,y:x+len(y[1])+len(y[2]),
os.walk(directory),0)
return 0
13 년 전
def listDirectory(directory,fullPath=False):
"""Get files from directory, if it exists"""
13 년 전
if not path.exists(directory):
return []
try:
13 년 전
if fullPath:
return map(lambda x:path.join(directory,x),
os.listdir(directory))
else:
return os.listdir(directory)
except OSError:
pass
return []
def getInstalledVideo(prefix="/"):
"""Get installed video drivers"""
x11Drivers = path.join(prefix,"usr/lib/xorg/modules/drivers")
return map(lambda x:x[:-7],
filter(lambda x:x.endswith('_drv.so'),
listDirectory(x11Drivers)))
def getDistfilesVideo(prefix="/"):
"""Get video drivers from distfiles"""
distFiles = path.join(prefix,"usr/portage/distfiles")
return list(set(
map(lambda x:'fglrx' if "amd" in x else "nvidia",
filter(lambda x:"amd" in x or
x.startswith('NVIDIA-Linux'),
listDirectory(distFiles)))))
def getAvailableVideo(prefix="/"):
"""Get available video drivers (installed and maybe installed)"""
return list(set(getInstalledVideo(prefix=prefix) + \
getDistfilesVideo(prefix=prefix)))
def readLinesFile(filename):
"""Read file by line"""
try:
if path.exists(filename):
for line in open(filename,'r'):
yield line.rstrip('\n')
except (OSError,IOError):
pass
finally:
raise StopIteration
def readFile(filename):
"""Read whole file or return empty string"""
try:
if path.exists(filename):
return open(filename,'r').read()
except (OSError,IOError):
pass
return ""
13 년 전
def getUUIDDict(revers=False):
"""Get dict UUID -> dev"""
blkidProcess = process("/sbin/blkid","-s","UUID","-c","/dev/null")
13 년 전
if revers:
datafunc = lambda x,y: (y,x)
else:
datafunc = lambda x,y: (x,y)
DEV,UUID = 0,1
reSplit = re.compile('^([^:]+):.*UUID="([^"]+)"',re.S)
13 년 전
return dict(
map(lambda x:datafunc("UUID=%s"%x[UUID],
getUdevDeviceInfo(name=x[DEV]).get('DEVNAME',x[DEV])),
map(lambda x:x.groups(),
filter(lambda x:x,
map(reSplit.search,
blkidProcess)))))
13 년 전
def detectDeviceForPartition(dev):
"""Detect parent device for partition by udev and return property"""
prop = getUdevDeviceInfo(name=dev)
if prop.get('DEVTYPE','') != 'partition':
return ''
parentpath = path.dirname(prop.get('DEVPATH',''))
if parentpath:
devProp = getUdevDeviceInfo(path=parentpath)
return devProp.get('DEVNAME','')
return None
def getProgPath(progname):
"""Get full path of program or False"""
13 년 전
baseprogname = path.basename(progname)
env = {"LANG":"C"}
env.update(os.environ.items() + [("PATH",getpathenv())] +\
env.items())
res = runOsCommand("which %s"%baseprogname,env_dict=env)
if res[0] == 0:
return res[1][0].strip()
13 년 전
elif path.isabs(progname) and path.exists(progname):
return progname
else:
return False
def checkUtils(*utils):
"""Check utils, exit if it not found and return fullpath"""
retval = []
for util in utils:
utilPath = getProgPath(util)
if not utilPath:
cl_overriding.printERROR(_("Command not found '%s'")%
path.basename(util))
cl_overriding.exit(1)
retval.append(utilPath)
if len(retval) == 1:
return retval[0]
else:
return retval
reVerSplit = re.compile(r"^(.*?)-(([^-]+?)(?:-(r\d+))?)(?:.(tbz2))?$",re.S)
def reVerSplitToPV(x):
"""Convert match from reVerSplit to PV hash"""
if type(x) == str:
x = reVerSplit.search(x)
if x:
match = x.groups()
return {'PN':match[0],
'PF':"%s-%s"%(match[0],match[1]),
'P':"%s-%s"%(match[0],match[2]),
'PV':match[2],
'PR':match[3] or "r0",
'PVR':match[1]}
return {'PN':"",
'PF':"",
'P':"",
'PV':"",
'PR':"",
'PVR':""}.copy()
def getPkgUses(fullpkg):
"""Get USE and IUSE from package"""
category,slash,pkg = fullpkg.partition('/')
pkgCategory = '/var/db/pkg/{0}'.format(category)
packages = filter(lambda x:x['PN'] == pkg,
map(reVerSplitToPV,
filter(lambda x:x,
map(lambda x:reVerSplit.search(x),
listDirectory(pkgCategory)))))
if not packages:
return None
13 년 전
usePath = path.join(pkgCategory,packages[-1]['PF'],"USE")
iusePath = path.join(pkgCategory,packages[-1]['PF'],"IUSE")
iuse = open(iusePath,'r').read().strip().split() \
13 년 전
if path.exists(iusePath) else \
[]
use = open(usePath,'r').read().strip().split() \
13 년 전
if path.exists(usePath) else \
[]
return (map(lambda x:x[1:] if x.startswith("+") else x,
filter(lambda x:x,
use)),
map(lambda x:x[1:] if x.startswith("+") else x,
filter(lambda x:x,
iuse)))
def isPkgInstalled(pkg,prefix='/'):
"""Check is package installed"""
pkgDir = path.join(prefix,'var/db/pkg')
if "/" in pkg:
category,op,pkg = pkg.partition('/')
return bool(
filter(lambda x:x['PN'] == pkg,
map(reVerSplitToPV,
listDirectory(path.join(pkgDir,category)))))
else:
return bool(
filter(lambda x: filter(lambda y:y['PN'] == pkg,
map(reVerSplitToPV,
listDirectory(x))),
listDirectory(pkgDir,fullPath=True)))
def getPkgInstalled(pkg,prefix='/'):
"""Check is package installed"""
pkgDir = path.join(prefix,'var/db/pkg')
if "/" in pkg:
category,op,pkg = pkg.partition('/')
return filter(lambda x:x['PN'] == pkg,
map(reVerSplitToPV,
listDirectory(path.join(pkgDir,category))))
else:
return filter(lambda x: filter(lambda y:y['PN'] == pkg,
map(reVerSplitToPV,
listDirectory(x))),
listDirectory(pkgDir,fullPath=True))
def getPkgActiveUses(fullpkg):
"""Get active uses from package"""
res = getPkgUses(fullpkg)
if not res:
return None
return list(set(res[0]) & set(res[1]))
def getSquashList():
"""Get supprted squashfs compressions method"""
wantMethod = set(["lzo","lzma","xz","gzip"])
usesSquashFs = getPkgActiveUses("sys-fs/squashfs-tools")
if not usesSquashFs:
return ["gzip"]
else:
pkgInfo = getPkgInstalled('sys-fs/squashfs-tools')
if pkgInfo and pkgInfo[0]['PV']:
pkgVer = getTupleVersion(pkgInfo[0]['PV'])
gzipVer = getTupleVersion('4.2')
if pkgVer >= gzipVer:
usesSquashFs.append('gzip')
return map(lambda x:{"lzma":"xz"}.get(x,x),
list(set(usesSquashFs) & wantMethod))
def countPartitions(devname):
"""Count partition for specified device"""
syspath = getUdevDeviceInfo(name=devname).get('DEVPATH','')
if not syspath:
return 0
deviceName = path.basename(syspath)
if not syspath.startswith("/sys"):
syspath = pathJoin("/sys",syspath)
return len(filter(lambda x:x.startswith(deviceName),
listDirectory(syspath)))
def getLvmGroups():
"""Get LVM groups"""
pvdisplayCmd = getProgPath('/sbin/pvdisplay')
pvdata = process(pvdisplayCmd,"-C","-o", "vg_name","--noh")
return filter(lambda x:x,pvdata.read().split())
def getLvmPartitions(vg_name,lv_name,cache=[]):
"""Get lvm partitions"""
if not cache:
pvdisplayCmd = getProgPath('/sbin/pvdisplay')
pvdata = process(pvdisplayCmd,"-C","-o",
"vg_name,lv_name,pv_name","--noh")
if pvdata.success():
cache.extend(
filter(lambda x:x and len(x)==3,
map(lambda x:x.split(),
pvdata.read().split('\n'))))
if cache:
res = map(lambda x:x[2],
filter(lambda x:x[0]==vg_name and x[1]==lv_name,cache))
if res:
return res
return []
def getPartitionDevice(syspath):
"""Get real parent device by partition,lvm,mdraid"""
prop = getUdevDeviceInfo(path=syspath)
# real device
if prop.get('ID_TYPE',"") == "disk" and \
prop.get('DEVTYPE',"") == "disk":
return prop.get('DEVNAME',"")
# partition
if prop.get('DEVTYPE') == "partition":
return getPartitionDevice(path.dirname(syspath))
# md raid
if prop.get('MD_LEVEL',"").startswith("raid"):
if not syspath.startswith('/sys'):
syspath = pathJoin('/sys',syspath)
syspath = pathJoin(syspath,"md")
for rd in filter(lambda x:path.basename(x).startswith('rd'),
listDirectory(syspath,fullPath=True)):
rdBlockPath = path.join(rd,"block")
if path.exists(rdBlockPath):
return getPartitionDevice(path.realpath(rdBlockPath))
else:
return ""
# lvm
if prop.get('DM_LV_NAME',"") != "":
parts = getLvmPartitions(prop.get('DM_VG_NAME',''),
prop.get('DM_LV_NAME',''))
if parts:
propPartLvm = getUdevDeviceInfo(name=parts[0])
if 'DEVPATH' in propPartLvm:
return getPartitionDevice(propPartLvm['DEVPATH'])
return ""
def getAvailableX11Drivers(prefix="/"):
"""Get available x11 drivers (Depricated Function)"""
xorg_modules_dir = path.join(prefix,'usr/lib/xorg/modules/drivers')
return map(lambda x: x[:-7],
filter(lambda x:x.endswith('_drv.so'),
listDirectory(xorg_modules_dir)))
def lspci(filtername=None,shortInfo=False):
"""Get hash of lspci, filtred by filtername. If shortInfo, then
type,vendor and name get only first word
pcidata(domain,bus,slot,func)
'type'
'vendor'
'name'"""
reData = re.compile(r'(\S+)\s"([^"]+)"\s+"([^"]+)"\s+"([^"]+)"',re.S)
if filtername:
filterfunc = lambda x: filtername in x
else:
filterfunc = lambda x:x
if shortInfo:
sfunc = lambda x:x.partition(" ")[0]
else:
sfunc = lambda x:x
lspciProg = checkUtils('/usr/sbin/lspci')
processLsPci = process(lspciProg,"-m")
retData = {}
for device in map(lambda x:x.groups(),
filter(lambda x:x,
map(reData.search,
filter(filterfunc,
processLsPci)))):
retData[device[0]] = {'type':sfunc(device[1]),\
'vendor':sfunc(device[2]),\
'name':sfunc(device[3])}
return retData
def getUdevDeviceInfo(path="",name=""):
"""Get device info by syspath of name"""
udevadmCmd = getProgPath('/sbin/udevadm')
typeQuery = "--path" if path else "--name"
value = path if path else name
return dict(
filter(lambda x:x[0],
map(lambda x:x.partition("=")[0::2],
process(udevadmCmd,"info","--query","property",
typeQuery,value).read().split("\n"))))
def getPartitionSize(dev):
"""Get partition size"""
SECTORSIZE=512
sizeFile = pathJoin(dev,"size")
if path.exists(sizeFile):
size = int(open(sizeFile,'r').read().strip())*SECTORSIZE
suffix = (((1024**0),"",False),
((1024**1),"K",False),
((1024**2),"M",False),
((1024**3),"G",True),
((1024**4),"T",True),
((1024**5),"P",True))
suffix = filter(lambda x:size >x[0],suffix)
if suffix:
suffix = suffix[-1]
printSize = int(size / (float(suffix[0])/10))
printSizeTail = printSize % 10
printSize = printSize / 10
if suffix[2] and printSizeTail:
return "%d.%d%s"%(printSize,printSizeTail,suffix[1])
else:
return "%d%s"%(printSize,suffix[1])
return ""
def getDeviceType(syspath):
"""Get device type (disk,partition,lvm,raid)"""
prop = getUdevDeviceInfo(path=syspath)
# real device
if prop.get('ID_CDROM',""):
return "cdrom"
if prop.get('ID_TYPE',"") == "disk" and \
prop.get('DEVTYPE',"") == "disk":
return "disk"
# partition
if prop.get('DEVTYPE') == "partition":
return getDeviceType(path.dirname(syspath))+"-partition"
# md raid
if prop.get('MD_LEVEL',"").startswith("raid"):
if not syspath.startswith('/sys'):
syspath = pathJoin('/sys',syspath)
syspath = pathJoin(syspath,"md")
for rd in filter(lambda x:path.basename(x).startswith('rd'),
listDirectory(syspath,fullPath=True)):
rdBlockPath = path.join(rd,"block")
if path.exists(rdBlockPath):
return getDeviceType(path.realpath(rdBlockPath))+"-raid"
else:
return "loop"
# lvm
if prop.get('DM_LV_NAME',"") != "":
parts = getLvmPartitions(prop.get('DM_VG_NAME',''),
prop.get('DM_LV_NAME',''))
if parts:
propPartLvm = getUdevDeviceInfo(name=parts[0])
if 'DEVPATH' in propPartLvm:
return getDeviceType(propPartLvm['DEVPATH'])+"-lvm"
return "loop"
def getRaidPartitions(raidpath):
"""Get raid partitions"""
prop = getUdevDeviceInfo(path=raidpath)
raidParts = []
if prop.get('MD_LEVEL',"").startswith("raid"):
if not raidpath.startswith('/sys'):
raidpath = pathJoin('/sys',raidpath)
raidpath = pathJoin(raidpath,"md")
for rd in filter(lambda x:path.basename(x).startswith('rd'),
listDirectory(raidpath,fullPath=True)):
rdpath = path.join(raidpath,rd,"block")
if path.exists(rdpath):
raidParts.append(
getUdevDeviceInfo(path=path.realpath(rdpath)).get(
"DEVNAME",''))
return filter(lambda x:x,raidParts)
def getPartitionType(prop):
"""Get type of dos part table (primary,extended or logical)"""
if prop.get('ID_PART_ENTRY_SCHEME') == 'dos':
partId = prop.get('ID_PART_ENTRY_TYPE','')
partNumber = prop.get('ID_PART_ENTRY_NUMBER','')
if partId and partNumber:
if partId == "0x5":
return "extended"
elif int(partNumber)>4:
return "logical"
else:
return "primary"
return prop.get('ID_PART_TABLE_TYPE','')
def detectBuild(pathname,dictInfo):
"""Detect build by root passwd 'root'"""
shadowPath = pathJoin(pathname,'/etc/shadow')
if r"root:$1$JMvNh5xg$VnV1DyJdTcwuZ0hp5YiJG0:14349:0:::::" in \
readFile(shadowPath):
dictInfo['type'] = ' assemble'
elif path.exists(pathJoin(pathname,"delta")) and \
path.exists(pathJoin(pathname,"workspace")):
dictInfo['type'] = " builder"
issue = readFile(pathJoin(pathname,'etc/gentoo-release'))
if "Server" in issue:
if "Scratch" in issue:
dictInfo['name'] = "CSS"
else:
dictInfo['name'] = "CDS"
elif "Desktop" in issue:
if "XFCE" in issue:
dictInfo['name'] = "CLDX"
elif "KDE" in issue:
dictInfo['name'] = "CLD"
elif "GNOME" in issue:
dictInfo['name'] = "CLDG"
elif "Scratch" in issue:
dictInfo['name'] = "CLS"
else:
dictInfo['type'] = ''
return dictInfo
def getOsProberHash(getContentFunc=None):
"""Get partition content by os-prober"""
os_prober = getProgPath('/usr/bin/os-prober')
if os_prober:
DEV,LONG,SHORT,TYPE = 0,1,2,3
osProberList = \
map(lambda x:[getUdevDeviceInfo(name=x[DEV]).get('DEVNAME',''),
x[LONG],x[SHORT],x[TYPE]],
filter(lambda x:len(x)>=4,
map(lambda x:x.split(":"),
process(os_prober))))
for osRecord in osProberList:
if "Gentoo" in osRecord[SHORT] and getContentFunc:
osDescr = getContentFunc(osRecord[DEV],addFunc=detectBuild)
if "name" in osDescr and "march" in osDescr and \
"build" in osDescr and "ver" in osDescr and \
(osDescr["ver"] != "0" or osDescr["build"]):
if osDescr['build']:
osDescr['build'] = "-%s"%osDescr['build']
else:
osDescr['build'] = "-%s"%osDescr['ver']
osRecord[SHORT] = \
"{name}-{march}{build}{type}".format(**osDescr)
else:
osRecord[SHORT] = "Gentoo"
elif "Gentoo" in osRecord[SHORT] and "Calculate" in osRecord[LONG]:
osRecord[SHORT] = "Calculate"
osProberHash = \
dict(
map(lambda x:(x[DEV],x[SHORT]),
osProberList))
else:
osProberHash = {}
return osProberHash
def refreshLVM():
"""Run command which refresh information about LVM"""
vgscan = getProgPath('/sbin/vgscan')
vgchange = getProgPath('/sbin/vgchange')
lvchange = getProgPath('/sbin/lvchange')
if vgscan and vgchange and lvchange:
process(vgscan).success()
process(vgchange,'-ay','--refresh').success()
for group in getLvmGroups():
process(lvchange,'-ay','--refresh',group).success()
def refreshUdev():
"""Run command which refresh information about device in udev"""
udevadm = getProgPath('/sbin/udevadm')
if udevadm:
blkidFile = '/etc/blkid.tab'
if path.exists(blkidFile):
try:
os.unlink(blkidFile)
except OSError:
pass
process(udevadm,"trigger","--subsystem-match","block").success()
def getPasswdUsers(minId=1000,maxId=65000):
"""
Get users from passwd from minId to maxId
"""
retList = []
fileName = "/etc/passwd"
if os.access(fileName, os.R_OK):
reNumb = re.compile("^\d+$")
lenData=7
userData = filter(lambda x: len(x)==lenData,
map(lambda x: x.rstrip().split(":"),
open(fileName)))
userData = filter(lambda x:\
reNumb.match(x[2]) and minId<=int(x[2])<=maxId,
userData)
sortUsers = map(lambda x: x[0], userData)
sortUsers.sort()
retList = ["root"] + sortUsers
return retList
def getCmdLineParam(paramName):
"""Get value of param /proc/cmdline. If param not found then empty.
"""
cmdLine = '/proc/cmdline'
paramName = "%s="%paramName
params = \
map(lambda x:x.partition('=')[2],
filter(lambda x:x.startswith(paramName),
readFile(cmdLine).split(' ')))
if params:
return params[-1]
else:
return ""
def getSupportArch():
"""Get supported architectures by processor.
Is support processor x86_64 else only i686.
"""
if filter(lambda x:x.startswith('flags') and " lm " in x,
readLinesFile('/proc/cpuinfo')):
return ['i686','x86_64']
else:
return ['i686']
def makeDirectory(pathname):
"""Make directory and parent.
If directory exists then return False else True"""
try:
parent = path.split(path.normpath(pathname))[0]
if not path.exists(parent):
makeDirectory(parent)
else:
if path.exists(pathname):
return False
os.mkdir(pathname)
return True
except Exception, e:
return False
def tarLinks(rootpath,archpath,skip=[]):
"""Add symbolic links to archive file"""
links = []
if skip:
reSkip = re.compile("|".join(map(lambda x:x.replace("*",".*"),
skip))).search
else:
reSkip = lambda x:False
lenprefix = len(path.normpath(rootpath))+1
if path.exists(archpath):
os.unlink(archpath)
# create arch
tar = tarfile.open(archpath,"w:bz2")
# find links
for root, dirs, files in os.walk(rootpath):
for link in filter(os.path.islink,
map(lambda x:path.join(root,x),
dirs+files)):
# try add link
try:
if not reSkip(link):
ti = tar.gettarinfo(link)
ti.name = link[lenprefix:]
tar.addfile(ti)
links.append(link)
except OSError:
pass
# discard mounted paths
removeDirs = map(lambda x:x[0],
filter(lambda x:path.islink(x[1]) or path.ismount(x[1]),
map(lambda x:(x,path.join(root,x)),
dirs)))
map(lambda x:dirs.remove(x),
removeDirs)
tar.close()
return links
def countFiles(dirpath,onefilesystem=True):
"""
Count files in dirpath
"""
num = 1
for dirpath,dirnames,filenames in os.walk(dirpath):
num += len(set(dirnames) | set(filenames))
if onefilesystem:
mountDirs = filter(lambda x:path.ismount(path.join(dirpath,x)),
dirnames)
for dirname in mountDirs:
dirnames.remove(dirname)
return num
class InitrdFile(object):
re_kver_path = re.compile("/modules/([^/]+)/kernel")
def __init__(self, _file):
self._file = _file
def get_kernel_version(self):
for fn in self.get_names():
if "/modules/" in fn and "/kernel" in fn:
m = self.re_kver_path.search(fn)
if m:
return m.group(1)
else:
break
return ""
def get_names(self):
if not path.exists(self._file):
# raise IOError
open(self._file)
ftype = typeFile(magic=0x4).getMType
rdtype = ftype(self._file)
if "LZ4" in rdtype:
arch_cmd = '/usr/bin/lz4'
elif "XZ" in rdtype:
arch_cmd = '/usr/bin/xz'
else:
arch_cmd = '/bin/gzip'
gz = Popen([arch_cmd, "-dc", self._file], stdout=PIPE, stderr=PIPE,
close_fds=True)
cpio = Popen(["/bin/cpio","-tf"], stdout=PIPE, stdin=gz.stdout,
stderr=PIPE, close_fds=True)
try:
for fn in cpio.stdout.xreadlines():
yield fn.rstrip()
finally:
cpio.terminate()
gz.terminate()
def __iter__(self):
return iter(self.get_names())