You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/cl_utils.py

680 lines
22 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import string
from random import choice
import os
import types
import subprocess
from subprocess import Popen,PIPE,STDOUT
import stat
from shutil import copytree, rmtree
import cl_overriding
import re
import sys
import getpass
from types import StringType
import magic as type_file
import cl_lang
tr = cl_lang.lang()
tr.setLocalDomain('cl_lib')
tr.setLanguage(sys.modules[__name__])
class _error:
# Здесь ошибки, если они есть
error = []
def getError(self):
"""Выдать ошибки"""
if not self.error:
return False
error = ""
for e in self.error:
error += e + "\n"
return error
def setError(self, error):
"""Установка ошибки"""
self.error.append(error)
return True
class _warning:
# Здесь предупреждения
warning = []
def getWarning(self):
"""Выдать ошибки"""
if not self.warning:
return False
warning = ""
for w in self.warning:
warning += w + "\n"
return warning
def setWarning(self, warning):
"""Установка ошибки"""
self.warning.append(warning)
return True
class typeFile:
"""Получение типа файла"""
def __init__(self, magic=0x410):
self.magicObject = type_file.open(type_file.MAGIC_NONE)
self.magicObject.load()
self.magicObject.setflags(magic)
def __del__(self):
"""Закрываем magic"""
self.magicObject.close()
def getMType(self, filename):
"""Информация о типе файла"""
return self.magicObject.file(filename)
def isBinary(self, filename):
"""является ли файл бинарным"""
mime = self.getMType(filename)
# В случае ошибки
if mime.count("`"):
return mime
elif mime.count("binary"):
return True
return False
class scanDirectory:
"""Класс для cканирования директории"""
def processingFile(self, path, prefix):
"""Обработка в случае файла"""
return True
def processingDirectory(self, path, prefix):
"""Обработка в случае директории если возвращаем None то пропуск дир."""
return True
def scanningDirectory(self, scanDir, skipFile=[], skipDir=[],
prefix=None, flagDir=False):
"""Сканирование и обработка шаблонов в директории scanDir"""
ret = True
if not prefix:
prefix = os.path.join(scanDir,"")[:-1]
if not flagDir:
# проверка корневой директории
retDir = self.processingDirectory(scanDir, scanDir)
if retDir is None:
return None
elif retDir is False:
return False
if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]):
for fileOrDir in sorted(os.listdir(scanDir)):
absPath = os.path.join(scanDir,fileOrDir)
relPath = absPath.split(prefix)[1]
stInfo = os.lstat(absPath)
statInfo = stInfo[stat.ST_MODE]
if stat.S_ISREG(statInfo):
# Обработка файла
if relPath in skipFile:
continue
if not self.processingFile(absPath, prefix):
ret = False
break
elif stat.S_ISDIR(statInfo):
# Обработка директории
if relPath in skipDir:
continue
retDir = self.processingDirectory(absPath, prefix)
if retDir is None:
continue
elif retDir is False:
ret = False
break
ret = self.scanningDirectory(absPath, skipFile,
skipDir, prefix, True)
if ret is False:
break
return ret
class process:
"""Execute system command by Popen
Examples:
execute program and get result:
if process("/bin/gzip","/boot/somefile").success():
print "Gzip success"
unzip and process unzip data by cpio (list files):
processGzip = process("/bin/gzip","-dc","/boot/initrd")
processCpio = process("/bin/cpio","-tf",stdin=processGzip)
filelist = processCpio.readlines()
execute command and send data:
processGrub = process("/sbin/grub")
processGrub.write("root (hd0,0)\n")
processGrub.write("setup (hd0)\n")
processGrub.write("quit\n")
isok = processGrub.success()
union stdout and stderr:
process("/bin/ls","/",stderr=STDOUT)
result to stdout:
process("/bin/ls",stdout=None)
get data from keyboard:
process("/bin/cat",stdin=None)
"""
def __init__(self,command,*params,**kwarg):
if not "stdin" in kwarg:
stdin=self._defaultStdin
else:
if kwarg["stdin"] == None:
stdin = self._keyboardStdin
else:
stdin=kwarg["stdin"].getStdout
self.stdout = kwarg.get("stdout",PIPE)
self.envdict = kwarg.get("envdict",None)
self.stderr = kwarg.get("stderr",PIPE)
self.command = [command] + list(params)
self.stdin = stdin
self.iter = None
self.pipe = None
self.cacheresult = None
self.communicated = False
def _open(self):
"""Open pipe if it not open"""
if not self.pipe:
self.pipe = Popen(self.command,
stdout=self.stdout,
stdin=self.stdin(),
stderr=self.stderr,
env=self.envdict)
def _defaultStdin(self):
"""Return default stdin"""
return PIPE
def _keyboardStdin(self):
"""Return keyboard stdin"""
return None
def getStdout(self):
"""Get current stdout"""
self.close()
return self.pipe.stdout
def write(self,data):
"""Write to process stdin"""
self._open()
self.pipe.stdin.write(data)
self.pipe.stdin.flush()
def close(self):
"""Close stdin"""
if self.pipe:
self.pipe.stdin.close()
def read(self):
"""Read all data"""
try:
self._open()
if self.cacheresult is None:
self.cacheresult = self.pipe.communicate()[0]
return self.cacheresult
except KeyboardInterrupt:
self.kill()
raise KeyboardInterrupt
def readlines(self):
"""Read lines"""
return self.read().split('\n')
def __iter__(self):
"""Get iterator"""
if not self.iter:
self.iter = iter(self.readlines())
return self.iter
def kill(self):
"""Kill this process"""
if self.pipe:
self.pipe.kill()
def next(self):
"""Next string from stdout"""
return self.__iter__().next()
def returncode(self):
"""Get return code"""
self.read()
return self.pipe.returncode
def success(self):
"""Success or not"""
return self.returncode() == 0
def failed(self):
"""Failed or not"""
return self.returncode() != 0
class processProgress(process):
"""Execute system command by Popen for parse stdout."""
def __init__(self,command,*params,**kwarg):
process.__init__(self,command,*params,**kwarg)
self.readsize = kwarg.get("readsize",10)
self.init(**kwarg)
def init(self,**kwarg):
pass
def read(self):
"""Read data with parsing ability"""
try:
self.processInit()
self._open()
if self.cacheresult is None:
self.cacheresult = []
self.buf = ""
part = self.pipe.stdout.read(1)
while part:
if self.buf:
self.buf += part
else:
self.buf = part
if self.processStdout():
self.processDraw()
self.cacheresult.append(part)
part = self.pipe.stdout.read(self.readsize)
self.pipe.poll()
self.processEnd(self.success())
except KeyboardInterrupt:
self.cacheresult = "".join(self.cacheresult)
self.pipe.kill()
self.processEnd(False)
raise KeyboardInterrupt()
self.cacheresult = "".join(self.cacheresult)
return self.cacheresult
def processInit(self):
"""Called when read first byte"""
pass
def processDraw(self):
"""Called when processStdout return True"""
pass
def processStdout(self):
"""Called when read readsize byte from stdout"""
return True
def processEnd(self,res=True):
"""Called when process end"""
pass
def runOsCommand(cmd,in_str=None, env_dict=None):
"""Выполняет внешнюю программу
Параметры:
cmd внешняя программа
in_str данные передаваемые программе на страндартный вход.
env_dict словарь переменных окружения
Возвращает (код возврата, список stdout+stderr)
"""
pipe = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_dict,
close_fds=True,
shell=True)
fout, fin, ferr = (pipe.stdout, pipe.stdin, pipe.stderr)
# если есть данные на вход, передать их
if in_str:
fin.write(in_str)
fin.close()
res = map(lambda x: x.rstrip(), fout.readlines())
fout.close()
res += map(lambda x: x.rstrip(), ferr.readlines())
ferr.close()
# Код возврата
retcode = pipe.wait()
return retcode, res
def genpassword(passlen=9):
'''Вернуть случайный набор символов указанной длины
Параметры:
passlen длина пароля который нужно сгенерировать
Возвращаемые параметры:
Сгенерированный пароль указанной длины
'''
res=''.join([choice(string.ascii_letters+string.digits)\
for i in xrange(passlen)])
return res
def fillstr(char, width):
'''Заполнить строку указанным числом символов. Псеводоним символ*кол-во'''
return str(char) * width
def getpathenv():
"""Вернуть пути для запуска утилит"""
bindir=['/sbin','/bin','/usr/sbin','/usr/bin']
env=os.environ
if env and env.has_key('PATH'):
lpath=env['PATH'].split(":")
npath=[]
for dirname in bindir:
if os.path.exists(dirname) and dirname not in lpath:
npath.append(dirname)
lpath=npath+lpath
return ":".join(lpath)
class MultiReplace:
"""MultiReplace function object
Usage:
replacer = MultiReplace({'str':'efg','in':'asd'})
s = replacer("string")
"""
def __init__(self, repl_dict):
# string to string mapping; use a regular expression
keys = repl_dict.keys()
keys.sort(reverse=True) # lexical order
pattern = u"|".join([re.escape(key) for key in keys])
self.pattern = re.compile(pattern)
self.dict = repl_dict
def replace(self, s):
# apply replacement dictionary to string
def repl(match, get=self.dict.get):
item = match.group(0)
return get(item, item)
return self.pattern.sub(repl, s)
__call__ = replace
def str2dict(s):
"""Convert string to dictionary:
String format:
{'key1':'value1','key2':'val\'ue2'}
"""
value = r'(?:\\\\|\\\'|[^\'])'
pair = r"""
\s*' # begin key
(%(v)s*)
' # end key
\s*:\s* # delimeter key/value
' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reDict = re.compile(pair, re.X)
reMatchDict = re.compile("""
^{ # begin dict
((%(v)s,)* # many pair with comma at end
%(v)s)? # pair without comma
}$ # end dict
""" % {'v':pair}, re.X)
if reMatchDict.match(s.strip()):
d = dict(reDict.findall(s))
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
for i in d.keys():
d[i] = replaceSlash(d[i])
return d
else:
cl_overriding.printERROR(_("wrong dict value: %s"%s))
cl_overriding.exit(1)
def str2list(s):
"""Convert string to list:
String format:
['value1','val\'ue2']
"""
value = r'(?:\\\\|\\\'|[^\'])'
element = r"""
\s*' # begin value
(%(v)s*)
'\s* # end value
""" % {"v":value}
reList = re.compile(element, re.X)
reMatchList = re.compile("""
^\[ # begin dict
((%(v)s,)* # many elements with comma at end
%(v)s)? # element without comma
\]$ # end dict
""" % {'v':element}, re.X)
if reMatchList.match(s.strip()):
replaceSlash = MultiReplace({'\\\\':'\\','\\\'':'\''})
return [replaceSlash(i) for i in reList.findall(s)]
else:
cl_overriding.printERROR(_("wrong list value: %s"%s))
cl_overriding.exit(1)
def list2str(list):
"""Convert list to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return "[%s]" % ','.join(["'%s'"%replaceSlash(str(i))
for i in list ])
def dict2str(dict):
"""Convert dictionry to string
Return string with escaped \ and '.
"""
replaceSlash = MultiReplace({'\\':'\\\\','\'':'\\\''})
return '{%s}' % ','.join(["'%s':'%s'" % (str(k),replaceSlash(str(v))) \
for (k,v) in dict.items()])
def convertStrListDict(val):
"""Convert data between string, list, dict"""
# if val is list
if type(val) == types.ListType:
return list2str(val)
# if val is dictionary
elif type(val) == types.DictType:
return dict2str(val)
# else it is string
else:
# detect dictionary
if re.match("^{.*}$",val):
return str2dict(val)
# detect list
elif re.match("^[.*]$",val):
return str2list(val)
# else is simple string
else:
return val
def _toUNICODE(val):
"""Convert text to unicode"""
if type(val) == types.UnicodeType:
return val
else:
return str(val).decode('UTF-8')
def getModeFile(nameFile, mode="all"):
"""Выдает информацию о файле
mode=="all"
права файла, владелец, группа файла
mode=="mode"
права файла
mode=="owner"
владелец, группа файла
"""
fst = os.lstat(nameFile)
if mode == "all":
return (stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid)
if mode == "mode":
return stat.S_IMODE(fst.st_mode)
if mode == "owner":
return fst.st_uid, fst.st_gid
def chownR(directory, uid, gid):
"""изменяет владельца и группу
для всех файлов и директорий внутри directory
"""
def chownPaths(rootPath, listPath, uid, gid):
for chPath in listPath:
chownPath = os.path.join(rootPath, chPath)
statInfo = os.lstat(chownPath)[stat.ST_MODE]
if stat.S_ISLNK(statInfo):
os.lchown(chownPath, uid, gid)
else:
os.chown(chownPath, uid, gid)
for root, dirs, files in os.walk(directory):
# меняем владельца директории
os.chown(root, uid, gid)
# Меняем владельца директорий
chownPaths(root, dirs, uid, gid)
# Меняем владельца файлов
chownPaths(root, files, uid, gid)
return True
def copyDir(srcDir, destDir):
"""Копируем директорию srcDir в destDir
При копировании сохраняются владелец, группа, права
"""
def ignoreFile(path, names):
"""Игнорирование сокетов при копировании"""
ignore = []
for name in names:
if stat.S_ISSOCK(os.lstat(os.path.join(path, name))[stat.ST_MODE]):
ignore.append(name)
return ignore
copytree(srcDir, destDir, ignore=ignoreFile)
return True
def removeDir(rmDir):
"""Рекурсивное удаление директории"""
rmtree(rmDir)
return True
def getRunCommands():
"""List run program"""
if not os.access('/proc',os.R_OK):
return []
return map(lambda x:open('/proc/%s/cmdline'%x).read().strip(),
filter(lambda x:x.isdigit(),
os.listdir('/proc')))
def isMount(path):
"""В случае монтирования директории выдает другой примонтированный путь"""
absPath = os.path.abspath(path)
mtabFile = '/etc/mtab'
if not os.access(mtabFile,os.R_OK):
return ""
return filter(lambda x: x!=absPath,
reduce(lambda x,y: y, filter(lambda x: absPath in x,
map(lambda x: [x[0], x[1]], map(lambda x: x.split(" "),
open(mtabFile)))),[""]))[0]
def pathJoin(*paths):
"""Складывает пути, в отличии от os.path.join, складывает абсолютные пути"""
if len(paths)==1:
return paths[0]
return reduce(os.path.join,
filter(lambda x:x and x != "/",
map(lambda x: x.startswith("/") and x[1:] or x,
paths[1:])),paths[0])
def getUserPassword(flag="dialog", pwDialog=False):
"""Получить пароль у пользователя
flag - опция "dalog" или "stdin" - откуда получаем пароль
pwDialog - структура для вывода приглашения в режиме диалога
"""
userPwd = ""
if flag == "dialog":
if not pwDialog:
pwDialog = [_("New password"),
_("Retype new password")]
pwdA = getpass.getpass(pwDialog[0]+":")
pwdB = getpass.getpass(pwDialog[1]+":")
elif flag == "stdin":
pwdA = sys.stdin.readline().rstrip()
pwdB = sys.stdin.readline().rstrip()
else:
cl_overriding.printERROR(_("ERROR in function getUserPassword, \
incorrect option 'flag=%s'")%flag)
return False
if not pwdA or not (pwdA == pwdB):
cl_overriding.printERROR(_("ERROR") + ": " +\
_("password incorrect")+ ": " + _("try again"))
return False
userPwd = pwdA
return userPwd
def cmpVersion(v1,v2):
"""Compare versions specified by tuple or string"""
if isinstance(v1,StringType):
v1 = getTupleVersion(v1)
if isinstance(v2,StringType):
v2 = getTupleVersion(v2)
return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]),
(v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1]))
def getTupleVersion(ver):
"""Get version specified by string as list:
Example:
2.6.30 [(2,6,30),('r',0)]
2.6.31-r1 [(2,6,31),('r',1)]
"""
suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3,
"rc": -1}
def toTuple(v):
return map(lambda x: suffix_value[x] if x in suffix_value else x,
map(lambda x: int(x) if x.isdigit() else x,
re.findall("r\d+$|\d+|[a-zA-Z+]+",
v.replace('-SNAPSHOT',''))))
vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups()
vers = toTuple(vers)
revision = toTuple(revision or "r0")
return [vers,revision]
def appendProgramToEnvFile(nameProg, objVar):
"""Append name program to variable cl-merges and
save /etc/calculate/calculate.env """
if not objVar.AppendToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def removeProgramToEnvFile(nameProg, objVar):
"""Remove name program from variable cl-merges and save
/etc/calculate/calculate.env"""
if not objVar.RemoveToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True