You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/common.py

628 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from random import choice
import os
import re
import math
from os import path
from .dracut import Dracut
from .system import SystemPath
import sys
import getpass
import string
import glob
import pwd
import itertools
from ..cl_lang import setLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_lib3', sys.modules[__name__])
class CommonError(Exception):
pass
class _error():
# Здесь ошибки, если они есть
error = []
def getError(self):
"""Выдать ошибки"""
if not self.error:
return False
error = ""
for e in self.error:
error += e + "\n"
return error
def setError(self, error):
"""Установка ошибки"""
self.error.append(error)
return True
def clearErrors(self):
while self.error:
self.error.pop()
class _warning():
# Здесь предупреждения
warning = []
def getWarning(self):
"""Выдать ошибки"""
if not self.warning:
return False
warning = ""
for w in self.warning:
warning += w + "\n"
return warning
def setWarning(self, warning):
"""Установка ошибки"""
self.warning.append(warning)
return True
def clearWarnings(self):
while self.warning:
self.warning.pop()
def genpassword(passlen=9, chars=string.ascii_letters + string.digits):
"""Return random charset specified lenght (passlen)"""
return ''.join((choice(chars) for i in range(0, passlen)))
def getpathenv():
"""Return path for run utilities"""
bindir = set(filter(path.exists,
['/sbin', '/bin', '/usr/sbin', '/usr/bin']))
env = os.environ
envPath = set(env.get('PATH', '').split(":")) | bindir
return ":".join(envPath)
#decorator
def ensure_unicode(func):
def _ensure_unicode_wrapper(*args, **kwargs):
args = [x if not isinstance(x, bytes) else x.decode("UTF-8") for x in args]
for x in kwargs:
if isinstance(kwargs[x], bytes):
kwargs[x] = kwargs[x].decode("UTF-8")
return func(*args, **kwargs)
return _ensure_unicode_wrapper
from .files import process, listDirectory
def cmp(a, b):
return (a > b) - (a < b)
def cmpVersion(v1, v2):
"""Compare versions specified by tuple or string"""
if isinstance(v1, str):
v1 = getTupleVersion(v1)
if isinstance(v2, str):
v2 = getTupleVersion(v2)
return cmp((v1[0] + [0, ] * (len(v2[0]) - len(v1[0])), v1[1]),
(v2[0] + [0, ] * (len(v1[0]) - len(v2[0])), v2[1]))
def getTupleVersion(ver):
"""Get version specified by string as list:
Example:
2.6.30 [(2,6,30),('r',0)]
2.6.31-r1 [(2,6,31),('r',1)]
"""
suffix_value = {"pre": -2, "p": 0, "alpha": -4, "beta": -3,
"rc": -1}
def toTuple(v):
return [suffix_value[x] if x in suffix_value else x for x
in [int(z) if z.isdigit() else z for z
in re.findall("r\d+$|\d+|[a-zA-Z+]+",v.replace('-SNAPSHOT', ''))]]
vers, revision = re.search("(^.*?)(-r\d+)?$", ver, re.S).groups()
vers = toTuple(vers)
vers.extend([0] * (10 - len(vers)))
revision = toTuple(revision or "r0")
return [vers, revision]
def getInstalledVideo(prefix="/"):
"""Get installed video drivers"""
usrlib = SystemPath(prefix).usrlib
x11Drivers = "%s/xorg/modules/drivers" % usrlib
return [x[:-7] for x in listDirectory(x11Drivers) if x.endswith('_drv.so')]
def getDistfilesVideo(prefix="/"):
"""Get video drivers from distfiles"""
distFiles = path.join(prefix, "usr/portage/distfiles")
driver_map = {'ati': 'fglrx',
'amd': 'fglrx',
'radeon': 'fglrx',
'nvidia-linux': 'nvidia',
'fglrx': 'fglrx'}
def driver_by_fn(fn):
fn = fn.lower()
for part in driver_map:
if part in fn:
return driver_map[part]
else:
return None
return list(set([x for x in map(driver_by_fn, listDirectory(distFiles)) if x]))
def getAvailableVideo(prefix="/"):
"""Get available video drivers (installed and maybe installed)"""
return list(set(getInstalledVideo(prefix=prefix) +
getDistfilesVideo(prefix=prefix)))
def getPasswdUsers(minId=1000, maxId=65000, prefix="/",
datafile="etc/passwd"):
"""
Get users from passwd from minId to maxId
"""
retList = []
fileName = path.join(prefix, datafile)
if os.access(fileName, os.R_OK):
reNumb = re.compile("^\d+$")
lenData = 7
with open(fileName) as f:
userData = [x for x in (y.rstrip().split(":") for y in f) if len(x) == lenData]
userData = [x for x in userData if reNumb.match(x[2]) and minId <= int(x[2]) <= maxId]
sortUsers = [x[0] for x in userData]
sortUsers.sort()
retList = ["root"] + sortUsers
return retList
def getSupportArch():
"""Get supported architectures by processor.
Is support processor x86_64 else only i686.
"""
# if list(filter(lambda x: x.startswith('flags') and " lm " in x,
# readLinesFile('/proc/cpuinfo'))):
if any([x for x in readLinesFile('/proc/cpuinfo') if x.startswith('flags') and " lm " in x]):
return ['i686', 'x86_64']
else:
return ['i686']
class CmdlineParams():
"""
Параметры опции загрузки ядра calculate=
"""
# названия параметров
Calculate = "calculate"
IsoscanFile = Dracut.IsoScanCmdParam
IOScheduler = "elevator"
# внутренние параметры calculate
Locale = "lang"
Keymap = "keymap"
Timezone = "timezone"
Resolution = "resolution"
Video = "video"
Composite = "composite"
Domain = "domain"
DomainPassword = "domain_pw"
Audio = "audio"
Clock = "clock"
def getValueFromCmdLine(option, num=None):
"""Get value of parameter from boot params
Parameters:
option param name
num number part of value parameter (, split)
"""
cmdLine = "/proc/cmdline"
calculateParam = option
names = (
CmdlineParams.Locale,
CmdlineParams.Keymap,
CmdlineParams.Timezone,
CmdlineParams.Resolution,
CmdlineParams.Video,
CmdlineParams.Composite,
CmdlineParams.Domain,
CmdlineParams.DomainPassword,
CmdlineParams.Audio,
CmdlineParams.Clock)
# try get timezone from kernel calculate param
try:
if num is None:
name = None
elif type(num) == str and not num.isdigit():
name = num
num = names.index(name)
else:
name = names[int(num)]
with open(cmdLine, "r") as f:
for param in f.read().split(" "):
parname, op, value = param.partition("=")
if parname == calculateParam and op == "=":
# new format
if name is None:
return value.strip()
if ":" in value:
params = dict((x.partition(':')[0::2] for x in value.split(',') if x))
return params.get(name, "").strip()
# old format
else:
values = value.split(",")
if len(values) > num and values[num].strip():
return values[num].strip()
except (IOError, ValueError, IndexError):
return ""
def getValueFromConfig(config, name):
"""Get value of parameter from bash type file
Parameters:
config config file name
name param name
"""
reMatch = re.compile("^%s\s*=\s*(.*?)\s*$" % name, re.I)
val = None
try:
if path.exists(config):
for line in open(config, "r"):
match = reMatch.match(line)
if match:
val = match.groups()[0].strip()
for quotas in ('"', "'"):
if val.startswith(quotas) and val.endswith(quotas):
val = val.strip(quotas)
break
except Exception:
pass
return val
def getVideoFromXorgLog(prefix="/", available_drivers=()):
"""Get video driver from Xorg log"""
# Try analize Xorg.{DISPLAY}.log
display = os.environ.get('DISPLAY', ':0')
if display and available_drivers:
reDriver = re.compile('|'.join(("%s_drv.so" % x for x in available_drivers)))
display_number = re.search(r':(\d+)(\..*)?', display)
reDriverName = re.compile(r'([^/]+)_drv.so')
if display_number:
xorg_log_file = path.join(prefix, 'var/log/Xorg.%s.log' %
display_number.group(1))
if path.exists(xorg_log_file):
try:
matchStrs = [x.group(1) for x in [reDriverName.search(y)
for y in readLinesFile(xorg_log_file)
if "drv" in y and reDriver.search(y)] if x]
except UnicodeDecodeError as e:
return ""
if matchStrs:
reUnload = re.compile('UnloadModule: "(%s)"' %'|'.join(x
for x in matchStrs))
matchUnload = [x.group(1) for x in
(reUnload.search(x) for x in
readLinesFile(xorg_log_file)) if x]
for drv in matchUnload:
if drv in matchStrs:
matchStrs.remove(drv)
if matchStrs:
return matchStrs[-1]
return ""
def getVideoFromXorgConf(prefix="/"):
"""Get video driver from xorg.conf"""
xorg_conf = path.join(prefix, 'etc/X11/xorg.conf')
# analize /etc/X11/xorg.conf
if path.exists(xorg_conf):
with open(xorg_conf) as f:
matchSect = re.search(r'Section "Device".*?EndSection',
f.read(), re.S)
if matchSect:
resDriver = re.search(r'\n\s*Driver\s*"([^"]+)"',
matchSect.group(0), re.S)
if resDriver:
return resDriver.group(1)
return ""
def getVideoFromCmdLine():
"""Get video driver from cmdline"""
videoVal = getValueFromCmdLine(CmdlineParams.Calculate,
CmdlineParams.Video)
videoVal = {'i915': 'intel'}.get(videoVal, videoVal)
return videoVal
def getVideoFromModules():
workedModules = [x[0] for x in [z.split()[:3:2] for z
in readLinesFile('/proc/modules')]
if x[1].isdigit() and int(x[1]) > 0]
mapModules = {'nouveau': 'nouveau',
'radeon': 'radeon',
'i915': 'intel',
'fglrx': 'fglrx',
'nvidia': 'nvidia',
# 'amdgpu': 'amdgpu',
'via': 'via',
'vmware': 'vmware'}
for module, videodrv in mapModules.items():
if module in workedModules:
return videodrv
else:
return ""
def getVideoFromVendor(hr_video, available_drivers):
if "nvidia" in available_drivers:
defaultNvidia = "nvidia"
else:
defaultNvidia = "nv"
if "fglrx" in available_drivers:
defaultAti = "fglrx"
else:
defaultAti = "radeon"
defaultDriver = {
'nvidia': defaultNvidia,
'ati': defaultAti,
'intel': 'intel',
'via': 'via',
'vmware': 'vmware'}
if hr_video in defaultDriver and \
defaultDriver[hr_video] in available_drivers:
return defaultDriver[hr_video]
else:
return "other"
def getCompositeFromXorgconf(prefix="/"):
"""Get composite value from xorg.conf"""
xorgConfig = path.join(prefix,
"etc/X11/xorg.conf")
confLines = readLinesFile(xorgConfig)
flagStartExtensions = False
lineCompositeTmp = ""
lineComposite = ""
for line in confLines:
line = line.strip()
if flagStartExtensions:
if line.startswith('EndSection'):
lineComposite = lineCompositeTmp
break
elif line.startswith('Section'):
break
if line.startswith('Option') and '"Composite"' in line:
lineCompositeTmp = line
else:
if line.startswith('Section') and '"Extensions"' in line:
flagStartExtensions = True
if lineComposite:
listOpt = [x for x in lineComposite.split('"') if x.strip()]
if len(listOpt) == 3:
ret = listOpt[2].lower()
if ret in ("on", "true", "yes", "1"):
return "on"
elif ret in ("off", "false", "no", "0"):
return "off"
return None
def getKernelUid(dev):
"""Get Kernel UID by UUID of device"""
blkidProcess = process('/sbin/blkid', '-c', '/dev/null', '-s', 'UUID',
'-o', 'value', dev)
res = blkidProcess.read().strip()
if res:
return res[:8]
else:
return dev.replace("/", "")
def dict_by_columns(i, sep, key, value):
"""
Получить словарь из файла, где каждая строка разделена sep символом
на колонки, key - номер колонки ключей, value номер колонки значений
"""
max_val = max(key, value)
return dict(((x[key], x[value]) for x in (y.split(sep) for y in i) if len(x) >= max_val))
from ..utils.files import readLinesFile
def getUserGroups(userName, prefix="/"):
"""
Get user groups from /etc/groups
"""
# return list(map(lambda x: x[0],
# filter(lambda x: len(x) > 1 and userName in x[1].split(','),
# map(lambda x: x.split(':')[0::3],
# readLinesFile(path.join(prefix, 'etc/group'))))))
return [x[0] for x in [z.split(':')[0::3] for z
in readLinesFile(path.join(prefix, 'etc/group'))]
if len(x) > 1 and userName in x[1].split(',')]
def getUserPrimaryGroup(userName, prefix="/"):
"""
Получить основную группу пользователя
"""
passwd = path.join(prefix, 'etc/passwd')
group = path.join(prefix, 'etc/group')
sep_symb = ":"
userGidMap = dict_by_columns(readLinesFile(passwd), sep_symb, 0, 3)
gidGroupMap = dict_by_columns(readLinesFile(group), sep_symb, 2, 0)
if userName in userGidMap:
gid = userGidMap[userName]
return gidGroupMap.get(gid, None)
return None
def getGroups(prefix="/"):
"""
Get groups from etc/group
"""
return [x for x in [z.split(':')[0] for z in readLinesFile(path.join(prefix, 'etc/group'))] if x]
def getPagesInterval(count, offset, length):
"""
Получить интервал страниц (например 2 из 5, при (5,5,15)
"""
rt = max(0, length - offset - count)
lt = min(length, offset)
lt = int(math.ceil(float(lt) / count))
rt = int(math.ceil(float(rt) / count))
return lt + 1, rt + lt + 1
def mountEcryptfs(userName, userPwd, userDir):
"""
Подключить ecryptfs
Args:
userName: логин пользователя
userPwd: пароль пользователя
userDir: домашний каталог пользователя
"""
from ..utils.files import (process, readLinesFile, STDOUT)
from ..utils.mount import isMount
if ".Private" in isMount(userDir):
return True
# расшифровать passphrase
ecryptUserName = path.join('/home/.ecryptfs', userName)
wrappedPassphraseFile = path.join(ecryptUserName,
".ecryptfs/wrapped-passphrase")
p = process('/usr/bin/ecryptfs-unwrap-passphrase', wrappedPassphraseFile)
p.write(userPwd)
try:
if p.failed():
raise Exception
result = p.readlines()
if len(result) > 1 and "Passphrase:" in result[0] and result[1]:
passphrase = result[1]
else:
raise Exception
except Exception:
raise CommonError(_("Failed to unwrap the passphrase"))
# добавить passphrase в ключи ядра
p = process('/usr/bin/ecryptfs-add-passphrase', '--fnek', '-',
stderr=STDOUT)
p.write(passphrase)
if not p.success():
raise CommonError(p.read() + "Failed to add passphrase")
# получить сигнатуры шифрования содержимого файлов и имен файлов
try:
ecryptfs_sig, ecryptfs_fnek_sig = \
readLinesFile(path.join(ecryptUserName, ".ecryptfs/Private.sig"))
except ValueError:
raise CommonError(_("Failed to parse Private.sig"))
# подключить шифрованный раздел
mountProcess = process(
'/sbin/mount.ecryptfs',
path.join(ecryptUserName, '.Private'),
userDir,
'-o', 'ecryptfs_passthrough=n,' 'ecryptfs_cipher=aes,'
'ecryptfs_key_bytes=16,' 'no_sig_cache,'
'ecryptfs_enable_filename_crypto=y,'
'ecryptfs_sig={ecryptfs_sig},'
'ecryptfs_fnek_sig={ecryptfs_fnek_sig},'
'passphrase_passwd_fd=0'.format(
ecryptfs_sig=ecryptfs_sig,
ecryptfs_fnek_sig=ecryptfs_fnek_sig), stderr=STDOUT)
# отправить пароль через stdin
mountProcess.write("passphrase_passwd=" + userPwd)
return mountProcess.success()
def isBootstrapDataOnly(user_dir):
"""
Каталог содержит только сертификат, созданный командой cl-core
а так же настроенные симлинки на .face, skel
"""
from ..utils.files import find
max_diff = 10
skel_files = set([x for x in find('/etc/skel', onefilesystem=True,
fullpath=False)
if not x.startswith(".calculate")])
user_data = find(user_dir, onefilesystem=True, fullpath=False)
user_files = itertools.islice((x for x in user_data
if not x.startswith(".calculate")),
len(skel_files)+max_diff)
user_files = set(user_files)
return not len(user_files - skel_files) > max_diff
def getPortageUidGid():
"""
Получить uid и gid пользователя portage
"""
data = pwd.getpwnam('portage')
if data:
return data.pw_uid, data.pw_gid
else:
return 250, 250
def getXorgConfContent(prefix="/"):
def readFile(fn):
with open(fn, 'r') as f:
return f.read()
return "\n".join(readFile(x) for x in
[path.join(prefix, "etc/X11/xorg.conf")] +
glob.glob(path.join(prefix,"etc/X11/xorg.conf.d/*"))
if path.isfile(x))
def get_fastlogin_domain_path(dv):
"""
Получить пути до ресурсов для определения нужно ли выполнять
шаблоны для доменного профиля пользователя
"""
from ..cl_template import LayeredIni
overlay_suffix_path = 'profiles/templates'
template_paths = dv.Get('main.cl_template_path')
for template_path in template_paths:
if template_path.endswith(overlay_suffix_path):
template_path = template_path[:-len(overlay_suffix_path)-1]
yield os.path.join(template_path, ".git")
for envfile in (LayeredIni.IniPath.System,
LayeredIni.IniPath.Etc,
LayeredIni.IniPath.Local,
LayeredIni.IniPath.Remote):
yield envfile
for cenvfile in ("/etc/calculate/calculate.env",
"/var/calculate/calculate.env",
"/var/lib/calculate/calculate.env",
"/var/calculate/remote/calculate.env"):
yield cenvfile
yield "/var/log/emerge.log"