You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/cl_utils.py

940 lines
30 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import string
from random import choice
import os
13 years ago
from os import path
import types
import subprocess
from subprocess import Popen,PIPE,STDOUT
14 years ago
import stat
from shutil import copytree, rmtree
import cl_overriding
import re
14 years ago
import sys
import getpass
from types import StringType
try:
13 years ago
from magic import open as type_file, MAGIC_NONE as MAGIC_NONE
except ImportError:
try:
13 years ago
from magic import open as type_file, NONE as MAGIC_NONE
except:
type_file = None
MAGIC_NONE = None
14 years ago
import cl_lang
tr = cl_lang.lang()
tr.setLocalDomain('cl_lib')
tr.setLanguage(sys.modules[__name__])
class _error:
# Здесь ошибки, если они есть
error = []
def getError(self):
"""Выдать ошибки"""
if not self.error:
return False
error = ""
for e in self.error:
error += e + "\n"
return error
def setError(self, error):
"""Установка ошибки"""
self.error.append(error)
return True
class _warning:
# Здесь предупреждения
warning = []
def getWarning(self):
"""Выдать ошибки"""
if not self.warning:
return False
warning = ""
for w in self.warning:
warning += w + "\n"
return warning
def setWarning(self, warning):
"""Установка ошибки"""
self.warning.append(warning)
return True
class proxy_type_file:
def __init__(self,flags):
self.flags = flags
def load(self):
pass
def setflags(self,flags):
self.flags = flags
def close(self):
pass
def file(self,filename):
13 years ago
if path.exists(filename):
if self.flags == 0x410:
processFile = process("file","-bi",filename)
if processFile.success():
return processFile.read().rstrip()
else:
13 years ago
processFile = process("file","-bz",filename)
if processFile.success():
return processFile.read().rstrip()
return None
14 years ago
class typeFile:
"""Получение типа файла"""
14 years ago
def __init__(self, magic=0x410):
if type_file is None:
self.magicObject = proxy_type_file(MAGIC_NONE)
else:
self.magicObject = type_file(MAGIC_NONE)
self.magicObject.load()
self.magicObject.setflags(magic)
14 years ago
def __del__(self):
"""Закрываем magic"""
self.magicObject.close()
14 years ago
def getMType(self, filename):
"""Информация о типе файла"""
return self.magicObject.file(filename)
14 years ago
def isBinary(self, filename):
14 years ago
"""является ли файл бинарным"""
mime = self.getMType(filename)
14 years ago
# В случае ошибки
14 years ago
if mime.count("`"):
return mime
elif mime.count("binary"):
return True
return False
14 years ago
class scanDirectory:
"""Класс для cканирования директории"""
13 years ago
def processingFile(self, pathname, prefix):
14 years ago
"""Обработка в случае файла"""
return True
13 years ago
def processingDirectory(self, pathname, prefix):
14 years ago
"""Обработка в случае директории если возвращаем None то пропуск дир."""
return True
def scanningDirectory(self, scanDir, skipFile=[], skipDir=[],
prefix=None, flagDir=False):
"""Сканирование и обработка шаблонов в директории scanDir"""
ret = True
if not prefix:
13 years ago
prefix = path.join(scanDir,"")[:-1]
14 years ago
if not flagDir:
# проверка корневой директории
retDir = self.processingDirectory(scanDir, scanDir)
if retDir is None:
return None
elif retDir is False:
return False
if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]):
13 years ago
for absPath in sorted(listDirectory(scanDir,fullPath=True)):
14 years ago
relPath = absPath.split(prefix)[1]
stInfo = os.lstat(absPath)
statInfo = stInfo[stat.ST_MODE]
if stat.S_ISREG(statInfo):
# Обработка файла
if relPath in skipFile:
continue
if not self.processingFile(absPath, prefix):
13 years ago
return False
14 years ago
elif stat.S_ISDIR(statInfo):
# Обработка директории
if relPath in skipDir:
continue
retDir = self.processingDirectory(absPath, prefix)
if retDir is None:
continue
elif retDir is False:
13 years ago
return False
14 years ago
ret = self.scanningDirectory(absPath, skipFile,
skipDir, prefix, True)
if ret is False:
13 years ago
return False
14 years ago
return ret
14 years ago
class process:
"""Execute system command by Popen
Examples:
execute program and get result:
if process("/bin/gzip","/boot/somefile").success():
print "Gzip success"
unzip and process unzip data by cpio (list files):
processGzip = process("/bin/gzip","-dc","/boot/initrd")
processCpio = process("/bin/cpio","-tf",stdin=processGzip)
filelist = processCpio.readlines()
execute command and send data:
processGrub = process("/sbin/grub")
processGrub.write("root (hd0,0)\n")
processGrub.write("setup (hd0)\n")
processGrub.write("quit\n")
isok = processGrub.success()
union stdout and stderr:
process("/bin/ls","/",stderr=STDOUT)
result to stdout:
process("/bin/ls",stdout=None)
get data from keyboard:
process("/bin/cat",stdin=None)
"""
def __init__(self,command,*params,**kwarg):
if not "stdin" in kwarg:
stdin=self._defaultStdin
else:
if kwarg["stdin"] == None:
stdin = self._keyboardStdin
else:
stdin=kwarg["stdin"].getStdout
self.stdout = kwarg.get("stdout",PIPE)
self.envdict = kwarg.get("envdict",None)
self.stderr = kwarg.get("stderr",PIPE)
self.command = [command] + list(params)
self.stdin = stdin
self.iter = None
self.pipe = None
self.cacheresult = None
self.communicated = False
def _open(self):
"""Open pipe if it not open"""
if not self.pipe:
self.pipe = Popen(self.command,
stdout=self.stdout,
stdin=self.stdin(),
stderr=self.stderr,
env=self.envdict)
def _defaultStdin(self):
"""Return default stdin"""
return PIPE
def _keyboardStdin(self):
"""Return keyboard stdin"""
return None
def getStdout(self):
"""Get current stdout"""
self.close()
self._open()
return self.pipe.stdout
def write(self,data):
"""Write to process stdin"""
self._open()
self.pipe.stdin.write(data)
self.pipe.stdin.flush()
def close(self):
"""Close stdin"""
if self.pipe:
self.pipe.stdin.close()
def read(self):
"""Read all data"""
try:
self._open()
if self.cacheresult is None:
self.cacheresult = self.pipe.communicate()[0]
return self.cacheresult
except KeyboardInterrupt:
self.kill()
raise KeyboardInterrupt
def readlines(self):
"""Read lines"""
return self.read().split('\n')
def __iter__(self):
"""Get iterator"""
if not self.iter:
self.iter = iter(self.readlines())
return self.iter
def kill(self):
"""Kill this process"""
if self.pipe:
self.pipe.kill()
def next(self):
"""Next string from stdout"""
return self.__iter__().next()
def returncode(self):
"""Get return code"""
self.read()
return self.pipe.returncode
def success(self):
"""Success or not"""
return self.returncode() == 0
def failed(self):
"""Failed or not"""
return self.returncode() != 0
class processProgress(process):
"""Execute system command by Popen for parse stdout."""
def __init__(self,command,*params,**kwarg):
process.__init__(self,command,*params,**kwarg)
self.readsize = kwarg.get("readsize",10)
self.init(**kwarg)
def init(self,**kwarg):
pass
def read(self):
"""Read data with parsing ability"""
try:
self.processInit()
self._open()
if self.cacheresult is None:
self.cacheresult = []
self.buf = ""
part = self.pipe.stdout.read(1)
while part:
if self.buf:
self.buf += part
else:
self.buf = part
if self.processStdout():
self.processDraw()
self.cacheresult.append(part)
part = self.pipe.stdout.read(self.readsize)
self.pipe.poll()
self.processEnd(self.success())
except KeyboardInterrupt:
self.cacheresult = "".join(self.cacheresult)
self.pipe.kill()
self.processEnd(False)
raise KeyboardInterrupt()
self.cacheresult = "".join(self.cacheresult)
return self.cacheresult
def processInit(self):
"""Called when read first byte"""
pass
def processDraw(self):
"""Called when processStdout return True"""
pass
def processStdout(self):
"""Called when read readsize byte from stdout"""
return True
def processEnd(self,res=True):
"""Called when process end"""
pass
def runOsCommand(cmd,in_str=None, env_dict=None):
"""Выполняет внешнюю программу
Параметры:
cmd внешняя программа
in_str данные передаваемые программе на страндартный вход.
env_dict словарь переменных окружения
Возвращает (код возврата, список stdout+stderr)
"""
pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_dict,
close_fds=True,
shell=True)
fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr)
# если есть данные на вход, передать их
if in_str:
fin.write(in_str)
fin.close()
14 years ago
res = map(lambda x: x.rstrip(), fout.readlines())
fout.close()
14 years ago
res += map(lambda x: x.rstrip(), ferr.readlines())
ferr.close()
# Код возврата
retcode = pipe.wait()
return retcode, res
def genpassword(passlen=9):
14 years ago
'''Вернуть случайный набор символов указанной длины
Параметры:
passlen длина пароля который нужно сгенерировать
Возвращаемые параметры:
Сгенерированный пароль указанной длины
'''
res=''.join([choice(string.ascii_letters+string.digits)\
for i in xrange(passlen)])
return res
def getpathenv():
"""Вернуть пути для запуска утилит"""
bindir=['/sbin','/bin','/usr/sbin','/usr/bin']
env=os.environ
if env and env.has_key('PATH'):
lpath=env['PATH'].split(":")
npath=[]
for dirname in bindir:
13 years ago
if path.exists(dirname) and dirname not in lpath:
npath.append(dirname)
lpath=npath+lpath
return ":".join(lpath)
class MultiReplace:
"""MultiReplace function object
Usage:
replacer = MultiReplace({'str':'efg','in':'asd'})
s = replacer("string")
"""
def __init__(self, repl_dict):
# string to string mapping; use a regular expression
keys = repl_dict.keys()
keys.sort(reverse=True) # lexical order
pattern = u"|".join([re.escape(key) for key in keys])
self.pattern = re.compile(pattern)
self.dict = repl_dict
def replace(self, s):
# apply replacement dictionary to string
def repl(match, get=self.dict.get):
item = match.group(0)
return get(item, item)
return self.pattern.sub(repl, s)
__call__ = replace
def str2dict(s):
"""Convert string to dictionary:
String format:
{'key1':'value1','key2':'val\'ue2'}
"""
value = r'(?:\\\\|\\\'|[^\'])'
pair = r"""
\s*' # begin key
(%(v)s*)
' # end key
\s*:\s* # delimeter key/value
' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reDict = re.compile(pair, re.X)
reMatchDict = re.compile("""
^{ # begin dict
((%(v)s,)* # many pair with comma at end
%(v)s)? # pair without comma
}$ # end dict
""" % {'v':pair}, re.X)
if reMatchDict.match(s.strip()):
d = dict(reDict.findall(s))
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
for i in d.keys():
d[i] = replaceSlash(d[i])
return d
else:
cl_overriding.printERROR(_("wrong dict value: %s"%s))
cl_overriding.exit(1)
def str2list(s):
"""Convert string to list:
String format:
['value1','val\'ue2']
"""
value = r'(?:\\\\|\\\'|[^\'])'
element = r"""
\s*' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reList = re.compile(element, re.X)
reMatchList = re.compile("""
^\[ # begin dict
((%(v)s,)* # many elements with comma at end
%(v)s)? # element without comma
\]$ # end dict
""" % {'v':element}, re.X)
if reMatchList.match(s.strip()):
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
return [replaceSlash(i) for i in reList.findall(s)]
else:
cl_overriding.printERROR(_("wrong list value: %s"%s))
cl_overriding.exit(1)
def list2str(list):
"""Convert list to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i))
for i in list ])
def dict2str(dict):
"""Convert dictionry to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \
for (k,v) in dict.items()])
def convertStrListDict(val):
"""Convert data between string, list, dict"""
# if val is list
if type(val) == types.ListType:
return list2str(val)
# if val is dictionary
elif type(val) == types.DictType:
return dict2str(val)
# else it is string
else:
# detect dictionary
if re.match("^{.*}$",val):
return str2dict(val)
# detect list
elif re.match("^\[.*\]$",val):
return str2list(val)
# else is simple string
else:
return val
def _toUNICODE(val):
"""Convert text to unicode"""
if type(val) == types.UnicodeType:
return val
else:
return str(val).decode('UTF-8')
14 years ago
def getModeFile(nameFile, mode="all"):
"""Выдает информацию о файле
14 years ago
mode=="all"
права файла, владелец, группа файла
mode=="mode"
права файла
mode=="owner"
владелец, группа файла
"""
14 years ago
fst = os.lstat(nameFile)
if mode == "all":
return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid)
if mode == "mode":
return stat.S_IMODE(fst.st_mode)
if mode == "owner":
return fst.st_uid, fst.st_gid
14 years ago
def chownR(directory, uid, gid):
13 years ago
"""Recusive chown"""
14 years ago
def chownPaths(rootPath, listPath, uid, gid):
for chPath in listPath:
13 years ago
chownPath = path.join(rootPath, chPath)
14 years ago
statInfo = os.lstat(chownPath)[stat.ST_MODE]
if stat.S_ISLNK(statInfo):
os.lchown(chownPath, uid, gid)
else:
os.chown(chownPath, uid, gid)
for root, dirs, files in os.walk(directory):
# меняем владельца директории
os.chown(root, uid, gid)
# Меняем владельца директорий
chownPaths(root, dirs, uid, gid)
# Меняем владельца файлов
chownPaths(root, files, uid, gid)
return True
def copyDir(srcDir, destDir):
"""Копируем директорию srcDir в destDir
При копировании сохраняются владелец, группа, права
"""
13 years ago
def ignoreFile(pathname, names):
14 years ago
"""Игнорирование сокетов при копировании"""
ignore = []
for name in names:
13 years ago
if stat.S_ISSOCK(os.lstat(path.join(pathname, name))[stat.ST_MODE]):
14 years ago
ignore.append(name)
return ignore
copytree(srcDir, destDir, ignore=ignoreFile)
return True
def removeDir(rmDir):
"""Рекурсивное удаление директории"""
rmtree(rmDir)
return True
def getRunCommands():
"""List run program"""
def getCmd(procNum):
cmdLineFile = '/proc/%s/cmdline'%procNum
try:
13 years ago
if path.exists(cmdLineFile):
return open(cmdLineFile,'r').read().strip()
except:
pass
return ""
if not os.access('/proc',os.R_OK):
return []
13 years ago
return map(getCmd,
filter(lambda x:x.isdigit(),
listDirectory('/proc')))
13 years ago
def isFstabMount(pathname,mapDevUuid={},listFstab=[]):
"""Get mount point or device from fstab"""
def removeQuotes(s):
return s.replace('"','').replace("'","")
if pathname == "swap":
absPath = "swap"
else:
13 years ago
absPath = path.abspath(pathname)
devuuid = '/dev/disk/by-uuid'
13 years ago
if not mapDevUuid:
mapDevUuid.update(getUUIDDict())
# convert fstab to
# [['/dev/sda3', '/', '', 'reiserfs', 'noatime', '', '', '0', '2\n'],
# ['/dev/sda5', '/var/calculate', 'reiserfs', 'noatime', '0', '0\n']]
13 years ago
#if not listFstab:
if not listFstab:
listFstab.extend(
map(lambda x: [mapDevUuid.get(removeQuotes(x[0]),x[0]),
x[1] if x[2] != "swap" else "swap"],
filter(lambda x: len(x) >= 4,
map(lambda x: filter(lambda x: x,
x.replace('\t',' ').split(' ')),
filter(lambda x: not x.startswith('#') and x.strip(),
13 years ago
open("/etc/fstab"))))))
# get mount point or device or dir
return filter(lambda x: x!=absPath,
reduce(lambda x,y: y,
filter(lambda x: absPath in x and x[1] != "none",
13 years ago
listFstab),[""]))[0]
13 years ago
def isMount(pathname):
"""В случае монтирования директории выдает другой примонтированный путь"""
13 years ago
absPath = path.abspath(pathname)
mtabFile = '/etc/mtab'
if not os.access(mtabFile,os.R_OK):
return ""
return filter(lambda x: x!=absPath,
13 years ago
reduce(lambda x,y: y,
filter(lambda x: absPath in x,
map(lambda x: [x[0], x[1]],
map(lambda x: x.split(" "),
open(mtabFile)))), [""]))[0]
def commonPath(*paths):
"""Return common path from list of paths"""
13 years ago
paths = map(lambda x:path.normpath(x).split('/'),paths)
res = map(lambda x:x[0],
filter(lambda x:filter(lambda y:x[0]==y,x[1:]),zip(*paths)))
return "/".join(res)
13 years ago
def childMounts(pathname):
"""Get all mount points which contain path"""
13 years ago
if pathname != "none":
absPath = path.abspath(pathname)
else:
13 years ago
absPath = pathname
mtabFile = '/etc/mtab'
if not os.access(mtabFile,os.R_OK):
return ""
return reduce(lambda x,y: x + [y],
filter(lambda x: commonPath(absPath,x[0])==absPath or \
commonPath(absPath,x[1])==absPath,
13 years ago
map(lambda x: [x[0], x[1]],
map(lambda x: x.split(" "),
open(mtabFile)))),
[])
def pathJoin(*paths):
"""Складывает пути, в отличии от os.path.join, складывает абсолютные пути"""
if len(paths)==1:
return paths[0]
13 years ago
return reduce(path.join,
filter(lambda x:x and x != "/",
13 years ago
map(lambda x: x.startswith("/") and x[1:] or x,
paths[1:])),paths[0])
14 years ago
def getUserPassword(flag="dialog", pwDialog=False):
"""Получить пароль у пользователя
flag - опция "dalog" или "stdin" - откуда получаем пароль
pwDialog - структура для вывода приглашения в режиме диалога
"""
userPwd = ""
if flag == "dialog":
if not pwDialog:
pwDialog = [_("New password"),
_("Retype new password")]
pwdA = getpass.getpass(pwDialog[0]+":")
pwdB = getpass.getpass(pwDialog[1]+":")
elif flag == "stdin":
pwdA = sys.stdin.readline().rstrip()
pwdB = sys.stdin.readline().rstrip()
else:
cl_overriding.printERROR(_("ERROR in function getUserPassword, \
incorrect option 'flag=%s'")%flag)
return False
if not pwdA or not (pwdA == pwdB):
cl_overriding.printERROR(_("ERROR") + ": " +\
_("password incorrect")+ ": " + _("try again"))
return False
userPwd = pwdA
return userPwd
def cmpVersion(v1,v2):
"""Compare versions specified by tuple or string"""
if isinstance(v1,StringType):
v1 = getTupleVersion(v1)
if isinstance(v2,StringType):
v2 = getTupleVersion(v2)
return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]),
(v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1]))
def getTupleVersion(ver):
"""Get version specified by string as list:
Example:
2.6.30 [(2,6,30),('r',0)]
2.6.31-r1 [(2,6,31),('r',1)]
"""
suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3,
"rc": -1}
def toTuple(v):
return map(lambda x: suffix_value[x] if x in suffix_value else x,
map(lambda x: int(x) if x.isdigit() else x,
re.findall("r\d+$|\d+|[a-zA-Z+]+",
v.replace('-SNAPSHOT',''))))
vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups()
vers = toTuple(vers)
revision = toTuple(revision or "r0")
return [vers,revision]
def appendProgramToEnvFile(nameProg, objVar):
"""Append name program to variable cl-merges and
save /etc/calculate/calculate.env """
if not objVar.AppendToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def removeProgramToEnvFile(nameProg, objVar):
"""Remove name program from variable cl-merges and save
/etc/calculate/calculate.env"""
if not objVar.RemoveToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def checkDigestFile(digestfile):
"""Check digest by digestfile"""
reEntry = re.compile(r"# (\S+) HASH\n(\S+) (\S+)",re.S)
result = []
for alg,hashdata,filename in \
reEntry.findall(open(digestfile,'r').read()):
if hasattr(hashlib,alg.lower()):
hashobj = getattr(hashlib,alg.lower())
13 years ago
filename = path.join(path.dirname(digestfile),filename)
if os.path.exists(filename):
digest = hashobj(open(filename,'r').read())
result.append((alg,
digest.hexdigest().upper() == hashdata.upper()))
return result
def getFilesCount(directory):
"""Get files count from directory"""
13 years ago
if path.exists(directory):
return len(reduce(lambda x,y:x+y,map(lambda x:x[1]+x[2],
os.walk(directory)),[]))
return 0
13 years ago
def listDirectory(directory,fullPath=False):
"""Get files from directory, if it exists"""
13 years ago
if not path.exists(directory):
return []
try:
13 years ago
if fullPath:
return map(lambda x:path.join(directory,x),
os.listdir(directory))
else:
return os.listdir(directory)
except OSError:
pass
return []
13 years ago
def getUUIDDict(revers=False):
"""Get dict UUID -> dev"""
devuuid = '/dev/disk/by-uuid'
datafunc = lambda x,y: (x,y)
if revers:
datafunc = lambda x,y: (y,x)
return dict(
map(lambda x:datafunc("UUID=%s"%path.basename(x),
path.normpath(path.join(devuuid,os.readlink(x)))),
filter(path.islink,
listDirectory(devuuid,fullPath=True))))
def detectDeviceForPartition(dev):
"""Detect parent device for partition by /sys/block (sysfs)"""
reDeviceSplit = re.compile("^(.*/)?(.*?)(\d+)$")
device = map(lambda x:filter(lambda x:x in dev,
x[1]),
os.walk('/sys/block'))
if device:
device = device[0]
parentdevices = \
13 years ago
filter(lambda x: path.split(dev)[-1] in \
reduce(lambda y,z:y+z[1],
13 years ago
os.walk(path.join('/sys/block',x)),[]), device)
if parentdevices:
return parentdevices[0]
res = reDeviceSplit.search(dev)
if res:
return res.groups()[1]
return None
def getProgPath(progname):
"""Get full path of program or False"""
13 years ago
baseprogname = path.basename(progname)
env = {"LANG":"C"}
env.update(os.environ.items() + [("PATH",getpathenv())] +\
env.items())
res = runOsCommand("which %s"%progname,env_dict=env)
if res[0] == 0:
return res[1][0].strip()
13 years ago
elif path.isabs(progname) and path.exists(progname):
return progname
else:
return False
reVerSplit = re.compile(r"^(.*?)-(([^-]+?)(?:-(r\d+))?)(?:.(tbz2))?$",re.S)
def reVerSplitToPV(x):
"""Convert match from reVerSplit to PV hash"""
if x:
match = x.groups()
return {'PN':match[0],
'PF':"%s-%s"%(match[0],match[1]),
'P':"%s-%s"%(match[0],match[2]),
'PV':match[2],
'PR':match[3] or "r0",
'PVR':match[1]}
return {'PN':"",
'PF':"",
'P':"",
'PV':"",
'PR':"",
'PVR':""}.copy()
def getPkgUses(fullpkg):
"""Get USE and IUSE from package"""
category,slash,pkg = fullpkg.partition('/')
pkgCategory = '/var/db/pkg/{0}'.format(category)
packages = filter(lambda x:x['PN'] == pkg,
map(reVerSplitToPV,
filter(lambda x:x,
map(lambda x:reVerSplit.search(x),
listDirectory(pkgCategory)))))
if not packages:
return None
13 years ago
usePath = path.join(pkgCategory,packages[-1]['PF'],"USE")
iusePath = path.join(pkgCategory,packages[-1]['PF'],"IUSE")
iuse = open(iusePath,'r').read().strip().split() \
13 years ago
if path.exists(iusePath) else \
[]
use = open(usePath,'r').read().strip().split() \
13 years ago
if path.exists(usePath) else \
[]
return (map(lambda x:x[1:] if x.startswith("+") else x,
filter(lambda x:x,
use)),
map(lambda x:x[1:] if x.startswith("+") else x,
filter(lambda x:x,
iuse)))
def isPkgInstalled(pkg,prefix='/'):
"""Check is package installed"""
pkgDir = path.join(prefix,'var/db/pkg')
if "/" in pkg:
category,op,pkg = pkg.partition('/')
return bool(
filter(lambda x:x.startswith(pkg),
listDirectory(path.join(pkgDir,category))))
else:
return bool(
filter(lambda x: filter(lambda y:y.startswith(pkg),
listDirectory(x)),
listDirectory(pkgDir,fullPath=True)))
def getPkgActiveUses(fullpkg):
"""Get active uses from package"""
res = getPkgUses(fullpkg)
if not res:
return None
return list(set(res[0]) & set(res[1]))
def getSquashList():
"""Get supprted squashfs compressions method"""
wantMethod = set(["lzo","lzma","xz","gzip"])
usesSquashFs = getPkgActiveUses("sys-fs/squashfs-tools")
if not usesSquashFs:
return ["gzip"]
else:
return map(lambda x:{"lzma":"xz"}.get(x,x),
list(set(usesSquashFs) & wantMethod))
def getAvailableX11Drivers(prefix="/"):
"""Get available x11 drivers"""
distfiles = path.join(prefix,'usr/portage/distfiles')
xorg_modules_dir = path.join(prefix,'usr/lib/xorg/modules/drivers')
return list(set(
map(lambda x:'fglrx' if x.startswith('ati-driver') else "nvidia",
filter(lambda x:x.startswith('ati-driver-installer') or
x.startswith('NVIDIA-Linux'),
listDirectory(distfiles))) + \
map(lambda x: x[:-7],
filter(lambda x:x.endswith('_drv.so'),
listDirectory(xorg_modules_dir)))))