You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/common.py

508 lines
17 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2008-2013 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from random import choice
import os
import re
import math
from os import path
from calculate.lib import cl_overriding
import sys
import getpass
from types import StringType
import string
import pwd
import grp
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3',sys.modules[__name__])
class CommonError(Exception):
pass
class _error:
# Здесь ошибки, если они есть
error = []
def getError(self):
"""Выдать ошибки"""
if not self.error:
return False
error = ""
for e in self.error:
error += e + "\n"
return error
def setError(self, error):
"""Установка ошибки"""
self.error.append(error)
return True
def clearErrors(self):
while(self.error):
self.error.pop()
class _warning:
# Здесь предупреждения
warning = []
def getWarning(self):
"""Выдать ошибки"""
if not self.warning:
return False
warning = ""
for w in self.warning:
warning += w + "\n"
return warning
def setWarning(self, warning):
"""Установка ошибки"""
self.warning.append(warning)
return True
def clearWarnings(self):
for i in range(len(self.error)):
self.error.pop()
def genpassword(passlen=9):
"""Return random charset specified lenght (passlen)"""
return ''.join(map(lambda x:choice(string.ascii_letters+string.digits),
xrange(0,passlen)))
def getpathenv():
"""Return path for run utilities"""
bindir=set(filter(path.exists,
['/sbin','/bin','/usr/sbin','/usr/bin']))
env=os.environ
envPath = set(env.get('PATH','').split(":")) | bindir
return ":".join(envPath)
from files import (process,listDirectory,readLinesFile)
def getUserPassword(flag="dialog", pwDialog=False):
"""Получить пароль у пользователя
flag - опция "dalog" или "stdin" - откуда получаем пароль
pwDialog - структура для вывода приглашения в режиме диалога
"""
userPwd = ""
if flag == "dialog":
if not pwDialog:
pwDialog = [_("New password"),
_("Retype the new password")]
pwdA = getpass.getpass(pwDialog[0]+":")
pwdB = getpass.getpass(pwDialog[1]+":")
elif flag == "stdin":
pwdA = sys.stdin.readline().rstrip()
pwdB = sys.stdin.readline().rstrip()
else:
cl_overriding.printERROR(_("ERROR in function getUserPassword, \
incorrect option 'flag=%s'")%flag)
return False
if not pwdA or not (pwdA == pwdB):
cl_overriding.printERROR(_("ERROR") + ": " +\
_("wrong password")+ ": " + _("try again"))
return False
userPwd = pwdA
return userPwd
def cmpVersion(v1,v2):
"""Compare versions specified by tuple or string"""
if isinstance(v1,StringType):
v1 = getTupleVersion(v1)
if isinstance(v2,StringType):
v2 = getTupleVersion(v2)
return cmp((v1[0]+[0,]*(len(v2[0])-len(v1[0])),v1[1]),
(v2[0]+[0,]*(len(v1[0])-len(v2[0])),v2[1]))
def getTupleVersion(ver):
"""Get version specified by string as list:
Example:
2.6.30 [(2,6,30),('r',0)]
2.6.31-r1 [(2,6,31),('r',1)]
"""
suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3,
"rc": -1}
def toTuple(v):
return map(lambda x: suffix_value[x] if x in suffix_value else x,
map(lambda x: int(x) if x.isdigit() else x,
re.findall("r\d+$|\d+|[a-zA-Z+]+",
v.replace('-SNAPSHOT',''))))
vers, revision = re.search("(^.*?)(-r\d+)?$",ver,re.S).groups()
vers = toTuple(vers)
vers.extend([0]*(10-len(vers)))
revision = toTuple(revision or "r0")
return [vers,revision]
def appendProgramToEnvFile(nameProg, objVar):
"""Append name program to variable cl-merges and
save /etc/calculate/calculate.env """
if not objVar.AppendToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def removeProgramToEnvFile(nameProg, objVar):
"""Remove name program from variable cl-merges and save
/etc/calculate/calculate.env"""
if not objVar.RemoveToList("cl_merges", nameProg, force=True):
return False
if not objVar.WriteList("cl_merges", force=True):
return False
return True
def getInstalledVideo(prefix="/"):
"""Get installed video drivers"""
x11Drivers = path.join(prefix,"usr/lib/xorg/modules/drivers")
return map(lambda x:x[:-7],
filter(lambda x:x.endswith('_drv.so'),
listDirectory(x11Drivers)))
def getDistfilesVideo(prefix="/"):
"""Get video drivers from distfiles"""
distFiles = path.join(prefix,"usr/portage/distfiles")
return list(set(
map(lambda x:'fglrx' if "amd" in x or "ati" in x else "nvidia",
filter(lambda x:"ati" in x or "amd" in x or
x.startswith('nvidia-linux'),
map(lambda x:x.lower(),
listDirectory(distFiles))))))
def getAvailableVideo(prefix="/"):
"""Get available video drivers (installed and maybe installed)"""
return list(set(getInstalledVideo(prefix=prefix) + \
getDistfilesVideo(prefix=prefix)))
def getPasswdUsers(minId=1000,maxId=65000,prefix="/"):
"""
Get users from passwd from minId to maxId
"""
retList = []
fileName = path.join(prefix,"etc/passwd")
if os.access(fileName, os.R_OK):
reNumb = re.compile("^\d+$")
lenData=7
userData = filter(lambda x: len(x)==lenData,
map(lambda x: x.rstrip().split(":"),
open(fileName)))
userData = filter(lambda x:\
reNumb.match(x[2]) and minId<=int(x[2])<=maxId,
userData)
sortUsers = map(lambda x: x[0], userData)
sortUsers.sort()
retList = ["root"] + sortUsers
return retList
def getSupportArch():
"""Get supported architectures by processor.
Is support processor x86_64 else only i686.
"""
if filter(lambda x:x.startswith('flags') and " lm " in x,
readLinesFile('/proc/cpuinfo')):
return ['i686','x86_64']
else:
return ['i686']
def getValueFromCmdLine(option,num):
"""Get value of parameter from boot params
Parameters:
option param name
num number part of value parameter (, split)
"""
cmdLine = "/proc/cmdline"
calculateParam = option
names = ("lang","keymap","timezone",
"resolution","video","composite",
"domain","domain_pw")
# try get timezone from kernel calculate param
try:
if type(num) == str and not num.isdigit():
name = num
num = names.index(name)
else:
name = names[int(num)]
for param in open(cmdLine,"r").read().split(" "):
parname,op,value = param.partition("=")
if parname == calculateParam and op == "=":
# new format
if ":" in value:
params = dict(
map(lambda x:x.partition(':')[0::2],
filter(lambda x:x,
value.split(','))))
return params.get(name,"").strip()
# old format
else:
values = value.split(",")
if len(values) > num and values[num].strip():
return values[num].strip()
except (IOError,ValueError,IndexError),e:
return ""
def getValueFromConfig(config,name):
"""Get value of parameter from bash type file
Parameters:
config config file name
name param name
"""
reMatch = re.compile("^%s\s*=\s*\"?(.*?)(\"\s*)?$"%name, re.I)
try:
if path.exists(config):
for line in open(config,"r").readlines():
match = reMatch.match(line)
if match:
return match.groups()[0].strip()
except:
pass
return False
def getVideoFromXorgLog(prefix="/",available_drivers=[]):
"""Get video driver from Xorg log"""
# Try analize Xorg.{DISPLAY}.log
display = os.environ.get('DISPLAY','0')
if display and available_drivers:
reDriver = re.compile('|'.join(map(lambda x: "%s_drv.so"%x,
available_drivers)))
display_number = re.search(r':(\d+)(\..*)?', display)
reDriverName = re.compile(r'([^/]+)_drv.so')
if display_number:
xorg_log_file = path.join(prefix,'var/log/Xorg.%s.log' % \
display_number.group(1))
if path.exists(xorg_log_file):
matchStrs = \
map(lambda x:x.group(1),
filter(lambda x:x,
map(reDriverName.search,
filter(lambda x:"drv" in x and reDriver.search(x),
readLinesFile(xorg_log_file)))))
if matchStrs:
return matchStrs[-1]
return ""
def getVideoFromXorgConf(prefix="/"):
"""Get video driver from xorg.conf"""
xorg_conf = path.join(prefix,'etc/X11/xorg.conf')
# analize /etc/X11/xorg.conf
if path.exists(xorg_conf):
matchSect = re.search(r'Section "Device".*?EndSection',
open(xorg_conf).read(),re.S)
if matchSect:
resDriver = re.search(r'\n\s*Driver\s*"([^"]+)"',
matchSect.group(0),re.S)
if resDriver:
return resDriver.group(1)
return ""
def getVideoFromCmdLine():
"""Get video driver from cmdline"""
videoVal = getValueFromCmdLine("calculate","video")
videoVal = {'i915':'intel'}.get(videoVal,videoVal)
return videoVal
def getVideoFromModules():
workedModules = map(lambda x:x[0],
filter(lambda x:x[1].isdigit() and int(x[1])>0,
map(lambda x:x.split()[:3:2],
readLinesFile('/proc/modules'))))
mapModules = {'nouveau':'nouveau',
'radeon':'radeon',
'i915':'intel',
'fglrx':'fglrx',
'nvidia':'nvidia',
'via':'via',
'vmware':'vmware'}
for module,videodrv in mapModules.items():
if module in workedModules:
return videodrv
else:
return ""
def getVideoFromVendor(hr_video,available_drivers):
if "nvidia" in available_drivers:
defaultNvidia = "nvidia"
else:
defaultNvidia = "nv"
if "fglrx" in available_drivers:
defaultAti = "fglrx"
else:
defaultAti = "radeon"
defaultDriver = {
'nvidia':defaultNvidia,
'ati':defaultAti,
'intel':'intel',
'via':'via',
'vmware':'vmware'}
if hr_video in defaultDriver and \
defaultDriver[hr_video] in available_drivers:
return defaultDriver[hr_video]
else:
return "other"
def getCompositeFromXorgconf(prefix="/"):
"""Get composite value from xorg.conf"""
xorgConfig = path.join(prefix,
"etc/X11/xorg.conf")
confLines = readLinesFile(xorgConfig)
flagStartExtensions = False
lineCompositeTmp = ""
lineComposite = ""
for line in confLines:
line = line.strip()
if flagStartExtensions:
if line.startswith('EndSection'):
lineComposite = lineCompositeTmp
break
elif line.startswith('Section'):
break
if line.startswith('Option') and '"Composite"' in line:
lineCompositeTmp = line
else:
if line.startswith('Section') and '"Extensions"' in line:
flagStartExtensions = True
if lineComposite:
listOpt = filter(lambda x: x.strip(), lineComposite.split('"'))
if len(listOpt) == 3:
ret = listOpt[2].lower()
if ret in ("on","true","yes","1"):
return "on"
elif ret in ("off","false","no","0"):
return "off"
return None
def getKernelUid(device):
"""Get Kernel UID by UUID of device"""
blkidProcess = process('/sbin/blkid','-c','/dev/null','-s','UUID',
'-o','value',device)
res = blkidProcess.read().strip()
if res:
return res[:8]
else:
return device.replace("/","")
from calculate.lib.utils.files import readLinesFile
def getUserGroups(userName,prefix="/"):
"""
Get user groups from /etc/groups
"""
return map(lambda x:x[0],
filter(lambda x:len(x)>1 and userName in x[1].split(','),
map(lambda x:x.split(':')[0::3],
readLinesFile(path.join(prefix,'etc/group')))))
def getGroups(prefix="/"):
"""
Get groups from etc/group
"""
return filter(None,
map(lambda x:x.split(':')[0],
readLinesFile(path.join(prefix,'etc/group'))))
def getPagesInterval(count,offset,length):
"""
Получить интервал страниц (например 2 из 5, при (5,5,15)
"""
rt = max(0,length-offset-count)
lt = min(length,offset)
lt = int(math.ceil(float(lt)/count))
rt = int(math.ceil(float(rt)/count))
return (lt + 1, rt + lt + 1)
def mountEcryptfs(userName,userPwd,userDir):
"""
Подключить ecryptfs
Args:
userName: логин пользователя
userPwd: пароль пользователя
userDir: домашний каталог пользователя
"""
from calculate.lib.utils.files import (process,readLinesFile,STDOUT,
isMount)
if ".Private" in isMount(userDir):
return True
# расшифровать passphrase
ecryptUserName = path.join('/home/.ecryptfs', userName)
wrappedPassphraseFile = path.join(ecryptUserName,
".ecryptfs/wrapped-passphrase")
p = process('/usr/bin/ecryptfs-unwrap-passphrase',wrappedPassphraseFile)
p.write(userPwd)
try:
if p.failed():
raise Exception
result = p.readlines()
if len(result) > 1 and "Passphrase:" in result[0] and result[1]:
passphrase = result[1]
else:
raise Exception
except:
raise CommonError(_("Failed to unwrap the passphrase"))
# добавить passphrase в ключи ядра
p = process('/usr/bin/ecryptfs-add-passphrase', '--fnek', '-',stderr=STDOUT)
p.write(passphrase)
if not p.success():
raise CommonError(p.read()+"Failed to add passphrase")
# получить сигнатуры шифрования содержимого файлов и имен файлов
try:
ecryptfs_sig, ecryptfs_fnek_sig = \
readLinesFile(path.join(ecryptUserName,".ecryptfs/Private.sig"))
except ValueError:
raise CommonError(_("Failed to parse Private.sig"))
# подключить шифрованный раздел
mountProcess = process('/sbin/mount.ecryptfs',
path.join(ecryptUserName,'.Private'),
userDir,
'-o','ecryptfs_passthrough=n,'
'ecryptfs_cipher=aes,'
'ecryptfs_key_bytes=16,'
'no_sig_cache,'
'ecryptfs_enable_filename_crypto=y,'
'ecryptfs_sig={ecryptfs_sig},'
'ecryptfs_fnek_sig={ecryptfs_fnek_sig},'
'passphrase_passwd_fd=0'.format(
ecryptfs_sig=ecryptfs_sig,
ecryptfs_fnek_sig=ecryptfs_fnek_sig),stderr=STDOUT)
# отправить пароль через stdin
mountProcess.write("passphrase_passwd="+userPwd)
return mountProcess.success()
def isBootstrapDataOnly(directory):
"""
Каталог содержит только сертификат, созданный командой cl-core
"""
from calculate.lib.utils.files import (process,readLinesFile,STDOUT,
isMount)
userCalculate = path.join(directory,".calculate")
return (set(listDirectory(directory)) == set([".calculate"]) and
set(listDirectory(userCalculate)) == set(["client_cert"]))
def getPortageUidGid():
"""
Получить uid и gid пользователя portage
"""
data = pwd.getpwnam('portage')
if data:
return data.pw_uid,data.pw_gid
else:
return 250,250