# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys from os import path from itertools import * from calculate.lib.datavars import (TableVariable, Variable, ReadonlyVariable, VariableError, ReadonlyTableVariable, FieldValue, HumanReadable) from calculate.lib.utils.files import (listDirectory, process, PIPE, pathJoin, readFileEx) from calculate.lib.utils.portage import (isPkgInstalled, RepositoryPath, getInstalledAtom, getPortagePython) from calculate.lib.utils.common import cmpVersion from calculate.lib.datavars import DataVars from itertools import dropwhile import re import contextlib import multiprocessing from collections import namedtuple _ = lambda x: x from calculate.lib.cl_lang import setLocalTranslate setLocalTranslate('cl_lib3', sys.modules[__name__]) _envData = [('system', '/var/lib/calculate/calculate.env'), ('custom', '/etc/calculate/calculate.env'), ('local', '/var/calculate/calculate.env'), ('remote', '/var/calculate/remote/calculate.env')] class VariableClEnvData(TableVariable): """ Aliases and path to ini files """ source = ["cl_env_location", "cl_env_path"] class VariableClEnvLocation(ReadonlyVariable): """ Aliases to ini files (from cl_env_data) """ type = 'list' value = list(list(zip(*_envData))[0]) class VariableClEnvPath(Variable): """ Path to ini files (from cl_env_data) """ type = 'list' value = list(list(zip(*_envData))[1]) class VariableClTemplateData(TableVariable): """ Aliases to templates path """ source = ["cl_template_location", "cl_template_path"] class VariableClTemplateLocation(Variable): """ Name to templates """ type = 'list' value = ["overlay", "local", "remote"] class VariableClTemplatePath(Variable): """ Path to information file on server """ type = 'list' value = [RepositoryPath.CalculateTemplates, "/var/calculate/templates", "/var/calculate/remote/templates"] class VariableClEnvServerPath(ReadonlyVariable): """ Path to server environment """ value = '/var/calculate/remote/server.env' class VariableClTemplateCltPath(ReadonlyVariable): """ Paths to clt-template files """ type = 'list' def get(self): """ Clt templates path is /etc and CONFIG_PROTECT """ chroot = self.Get('cl_chroot_path') if "CONFIG_PROTECT" in os.environ: protect_paths = ["/etc"] + list(filter( lambda x: x.strip(), os.environ["CONFIG_PROTECT"].split(" "))) else: protect_paths = ["/etc", "/usr/share/X11/xkb", "var/lib/hsqldb", "/usr/share/config"] return filter(path.exists, map(lambda x: pathJoin(chroot, x), protect_paths)) class VariableClRootPath(Variable): """ Path to directory relative which perform joining templates to system files (sandbox) """ value = '/' class VariableClRootPathNext(Variable): """ Path to directory relative which perform joining templates to system files (sandbox). This set for configure packages specified throug merge= """ def get(self): return self.Get('cl_root_path') class VariableClChrootPath(ReadonlyVariable): """ Path to directory which contain other system """ value = '/' class VariableClPassStep(Variable): """ Pass for templates join 1,2,3,4,5 and etc """ class VariableClTaskName(ReadonlyVariable): """ Название задачи выполняемой cl-core """ class VariableClPassFile(Variable): """ Template file performed at now """ class VariableClPassLocation(Variable): """ Текущая обрабатываемая локация """ value = "" class VariableClSetupSkipProfile(Variable): """ Список пакетов и местоположения шаблонов пропускаемых при настройке профиля """ type = "table" value = [[]] class VariableClSetupSkipPatch(Variable): """ Список пакетов и местоположения шаблонов пропускаемых при выполнении патчей """ type = "table" value = [[]] class VariableClSetupSkipMerge(Variable): """ Список пакетов и местоположения шаблонов пропускаемых при настройке пакетов системы """ type = "table" value = [[]] class VariableClMergeSet(ReadonlyVariable): """ Force by package template appling """ type = "bool" value = "off" class VariableClMergePkg(ReadonlyVariable): """ This variable work with template function belong(package_name) if the variable is defined then will use only template, which has package_name in belong equal value of this variable or hasn't belong function (package_name == value of cl_belong_pkg) """ type = "list" def get(self): return self.Get('cl_belong_pkg') class VariableClMergePkgNew(ReadonlyVariable): """ New variable value for cl_merge_pkg """ type = "list" class VariableClMergePkgPass(ReadonlyVariable): """ Performed packages """ type = "list" class VariableClBelongPkg(ReadonlyVariable): """ Depricated by cl_merge_pkg """ type = "list" class VariableClAction(ReadonlyVariable): """ Program action Example: install, uninstall, merge, domain, undomain, system, desktop """ class VariableClPassState(ReadonlyVariable): """ Program state specifies addition to cl_pass_action for needing """ class VariableClMerges(Variable): """ Programs have templates setup """ type = 'list' def get(self): return [] def set(self, value): return [] class VariableClAutoupdateSet(Variable): """ (on or off) autoupdate config from install program """ type = 'bool' def get(self): if self.Get('cl_dispatch_conf') == 'usenew': return "on" else: return "off" class VariableClHumanEditSet(Variable): """ Параметр для отметки "ручной правки" """ type = "bool" opt = ["--human-edit"] value = "off" def init(self): self.help = _("mark as human modification") self.label = _("Mark as human modification") class VariableClProtectUseSet(ReadonlyVariable): """ Использовать portage интеграцию с конфигурационными файлами """ type = "bool" def get(self): return "on" if self.Get('cl_human_edit_set') == 'off' else "off" class VariableClDispatchConfDefault(Variable): """ Используется для обхода экспертных параметров """ value = "dispatch" class VariableClDispatchConf(Variable): """ """ type = "choice" opt = ["--conf"] syntax = "--{choice}-conf" metavalue = "METHOD" def init(self): self.help = "'usenew' - " + _("use the new config files") + \ ",\n'skip' - " + _("skip the update of config files") + \ ",\n'dispatch' - " + _("manually update config files") self.label = _("Method for updating config files") def get(self): return self.Get('cl_dispatch_conf_default') def choice(self): return [("usenew", _("Use the new config files")), ("skip", _("Skip the update of config files")), ("dispatch", _("Manually update config files"))] class VariableClWsdl(Variable): """ Packages with wsdl """ type = "list-choice" def choice(self): return self.Get('cl_wsdl_available') class VariableClWsdlAvailable(ReadonlyVariable): """ Packages which has wsdl interfaces """ type = "list" def get(self): site_packages = [path.join(x, "calculate") for x in sys.path if (x.endswith('site-packages') and x.startswith('/usr/lib'))] ret_list = [] for module, modDir in chain( *map(lambda x: map(lambda y: (path.basename(y), y), listDirectory(x, True, True)), site_packages)): if path.exists(path.join(modDir, "wsdl_%s.py" % module)): if not "calculate-%s" % module in ret_list: ret_list.append("calculate-%s" % module) return ret_list class VariableClConfigProtectMask(ReadonlyVariable): """ Value of CONFIG_PROTECT after source /etc/profile, and /etc append """ type = "list" def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config and "CONFIG_PROTECT_MASK" in emerge_config: return emerge_config['CONFIG_PROTECT_MASK'].encode('utf-8').split() display_env = process('/bin/bash', "-c", "source /etc/profile;env", stdout=PIPE) for line in display_env: if line.startswith("CONFIG_PROTECT_MASK="): config_protect_mask = line.rstrip().partition('=')[2].split() break else: config_protect_mask = [] return config_protect_mask class VariableClConfigProtect(ReadonlyVariable): """ Value of CONFIG_PROTECT after source /etc/profile, and /etc append """ type = "list" def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config and "CONFIG_PROTECT" in emerge_config: return emerge_config['CONFIG_PROTECT'].encode('utf-8').split() display_env = process('/bin/bash', "-c", "source /etc/profile;env", stdout=PIPE) for line in display_env: if line.startswith("CONFIG_PROTECT="): config_protect = line.rstrip().partition('=')[2].split() break else: config_protect = [] config_protect.append('/etc') return config_protect class VariableClVerboseSet(Variable): """ Verbose output variable """ type = "bool" opt = ["-v", "--verbose"] value = "off" def init(self): self.help = _("verbose output") self.label = _("Verbose output") class VariableClEbuildPhase(ReadonlyVariable): """ Current ebuild phase """ def get(self): return os.environ.get("EBUILD_PHASE", "") class EmergeInfoData: def __init__(self): self.data = process("/usr/bin/emerge", "--ask=n", "--info", envdict=os.environ).read().split('\n') def __iter__(self): return iter(self.data) def __str__(self): return "emerge --info" class VariableClEmergeInfo(ReadonlyVariable): """ Emerge --info cache """ type = "list" systemRoot = "/" def get(self): if self.systemRoot != '/': return [] return EmergeInfoData() class VariableClPkgdir(ReadonlyVariable): """ PKGDIR """ def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config and "PKGDIR" in emerge_config: return emerge_config['PKGDIR'].encode('utf-8') else: emerge_info = self.Get('cl_emerge_info') for line in filter(lambda x: x.startswith("PKGDIR="), emerge_info): return line.partition("=")[2].strip('\n"\'') return "" class VariableClFeatures(ReadonlyVariable): """ PKGDIR """ type = "list" def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config and "FEATURES" in emerge_config: return filter( None, emerge_config['FEATURES'].encode('utf-8').split()) else: emerge_info = self.Get('cl_emerge_info') for line in filter(lambda x: x.startswith("FEATURES="), emerge_info): return filter( None, line.partition("=")[2].strip('\n"\'').split()) return "" class VariableClDistdir(ReadonlyVariable): """ DISTDIR """ def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config and "DISTDIR" in emerge_config: return emerge_config['DISTDIR'].encode('utf-8') return "/var/calculate/remote/distfiles" class VariableClRepositoryData(ReadonlyTableVariable): """ """ source = ["cl_repository_name", "cl_repository_location"] def portage_ver_ge(self, ver): portage_package = "sys-apps/portage" portage_pkg_info = isPkgInstalled(portage_package) if portage_pkg_info: return cmpVersion(portage_pkg_info[0]['PV'], ver) >= 0 return False def parse_repository_block(self, i): text = "\n".join(i) re_block = re.compile(r"^(\w+)$\n\s+location: (.*)$", re.M) return re_block.findall(text) def from_portdir_vars(self, info): for line in filter(lambda x: x.startswith("PORTDIR="), info): yield ["gentoo", line.partition("=")[2].strip('\n"\'')] self.Get('cl_emerge_info') for line in filter(lambda x: x.startswith("PORTDIR_OVERLAY="), info): for i in ([path.basename(x), x] for x in line.partition("=")[2].strip('\n"\'').split(' ') if x): yield i def get(self, hr=HumanReadable.No): emerge_config = self.Get('cl_emerge_config') if emerge_config and "PKGDIR" in emerge_config: return [[repo.name.encode('UTF-8'), repo.location.encode('UTF-8')] for repo in emerge_config.repositories] else: info = self.Get("cl_emerge_info") if self.portage_ver_ge("2.2.18"): return self.parse_repository_block( dropwhile(lambda x: not x.startswith("Repositories:"), info)) return list(self.from_portdir_vars(info)) class VariableClRepositoryName(FieldValue, ReadonlyVariable): """ """ type = "list" source_variable = "cl_repository_data" column = 0 class VariableClRepositoryLocation(FieldValue, ReadonlyVariable): """ """ type = "list" source_variable = "cl_repository_data" column = 1 class VariableClPortdir(ReadonlyVariable): """ PORTDIR """ def get(self): return self.Select('cl_repository_location', where='cl_repository_name', eq="gentoo", limit=1) or "" class VariableClPortdirOverlay(ReadonlyVariable): """ Overlays path """ type = "list" def get(self): return self.Select('cl_repository_location', where='cl_repository_name', ne="gentoo") or [] RepoConfigProxy = namedtuple("RepoConfigProxy", ["name", "location"]) @contextlib.contextmanager def stderr_devnull(): oldstderr = sys.stderr sys.stderr = open(os.devnull, 'w') try: yield finally: sys.stderr = oldstderr class PortageConfigWrapper(object): def __init__(self): self._dict = {} self.repositories = [] def __getitem__(self, item): return self._dict[item] def __setitem__(self, key, value): self._dict[key] = value def __contains__(self, item): return item in self._dict @classmethod def getconfig_chroot(cls, chroot): python3 = getPortagePython(prefix=chroot) return cls.getconfig_python(process("/usr/bin/chroot", chroot, python3)) @classmethod def getconfig(cls): python3 = getPortagePython() return cls.getconfig_python(process(python3)) @classmethod def getconfig_python(cls, pythonprocess): import json pythonprocess.write( "from portage.package.ebuild.config import config\n" "import json\n" "c = config()\n" "obj = {}\n" "for key in ('CONFIG_PROTECT_MASK', 'CONFIG_PROTECT',\n" " 'FEATURES', 'DISTDIR',\n" " 'PKGDIR', 'EMERGE_DEFAULT_OPTS', 'LINGUAS'):\n" " obj[key] = c[key]\n" "obj['repositories'] = {}\n" "for x in c.repositories:\n" " obj['repositories'][x.name] = x.location\n" "print(json.dumps(obj))\n") pythonprocess.stdin.close() if pythonprocess.success(): data = pythonprocess.read() try: c = json.loads(data) obj = cls() for key in ('CONFIG_PROTECT_MASK', 'CONFIG_PROTECT', 'FEATURES', 'DISTDIR', 'PKGDIR', 'EMERGE_DEFAULT_OPTS', 'LINGUAS'): obj[key] = c[key] obj.repositories = [RepoConfigProxy(name, location) for name, location in c['repositories'].items()] return obj except ValueError as e: pass raise VariableError(_("Failed to receive portage config")) class VariableClEmergeConfig(ReadonlyVariable): """ Emerge config object """ type = "object" systemRoot = "/" def get(self): try: if self.systemRoot == '/': c = PortageConfigWrapper.getconfig() else: c = PortageConfigWrapper.getconfig_chroot(self.systemRoot) c.__str__ = lambda self: "EmergeConfig" return c except Exception: pass return "" class VariableClEmergeDefaultOpts(ReadonlyVariable): """ EMERGE_DEFAULT_OPTS """ def get(self): emerge_config = self.Get('cl_emerge_config') if emerge_config: return emerge_config['EMERGE_DEFAULT_OPTS'].encode('UTF-8') else: emerge_info = self.Get('cl_emerge_info') for line in filter(lambda x: x.startswith("EMERGE_DEFAULT_OPTS="), emerge_info): key, op, value = line.partition('=') for quote in ("'", '"'): if value.startswith(quote): value = value[1:-1] return value.replace(r'\"', '"') return "" class VariableClTemplatesLocate(Variable): """ Выбранные типы хранилищ шаблонов """ type = "choice-list" # value = ['overlay','local','remote','clt'] element = "selecttable" opt = ["-T", "--templates"] metavalue = "TEMPLATES" untrusted = True descriptionMap = {'overlay': _('Overlay templates'), 'local': _('Local templates'), 'distro': _('Distribution templates'), 'remote': _('Remote templates'), 'clt': _('clt templates')} def init(self): self.label = _("Templates location") self.help = _("select location for templates %s") % ",".join(self.get()) def get(self): vals = \ self.Get('cl_template_location')[:len(self.Get('cl_template_path'))] if self.Get('cl_action') == 'desktop': return vals else: return vals + ['clt'] def choice(self): return map(lambda x: (x, self.descriptionMap.get(x, x)), self.get()) class VariableClTemplatePathUse(ReadonlyVariable): """ Пути до шаблонов, используемых в настройке системы или профиля пользователя """ type = 'list' def get(self): return self.Select('cl_template_path', where='cl_template_location', _in=self.Get('cl_templates_locate')) class VariableClTemplateCltSet(ReadonlyVariable): """ Использовать clt шаблоны для настройки системы """ def get(self): return "on" if "clt" in self.Get('cl_templates_locate') else "off" class VariableClEnvDebugSet(Variable): """ Переменная для включения отладки """ type = "bool" value = "on" class VariableClMakeProfile(Variable): """ Путь до актуального make.profile """ systemRoot = "/" def get_work_link(self, *links): for link in links: if path.exists(link): return link def get(self): files = ["etc/portage/make.profile", "etc/make.profile"] val = self.get_work_link( *[path.join(self.systemRoot, x) for x in files]) if isinstance(self.parent, DataVars) and not val: raise VariableError( _("Failed to detect the system profile.") + " " + _("Select the profile with command {cmd}").format( cmd="cl-update-profile")) return val or "" class VariableClHelpSet(ReadonlyVariable): """ Заполнение переменных выполняется для отображения --help """ type = "bool" value = "off" class VariableClConsoleArgs(Variable): """ Список аргументов переданных через консоль """ type = "list" value = [] class VariableClClientType(ReadonlyVariable): """ Тип клиента получающего информацию от core """ value = "" class VariableOsLibPath(ReadonlyVariable): """ Использовать lib или lib64 """ def get(self): if self.Get('os_arch_machine') == 'x86_64': return "lib64" else: return "lib" class VariableOsPython(ReadonlyVariable): """ Путь до модулей синхронизации """ def get_root(self): return "/" def get_usrlib(self): return "/usr/%s" % self.Get('os_lib_path') def get(self): chroot_path = self.get_root() if chroot_path == "/": p = process("/usr/bin/emerge","--version") m = re.search(r"python (\d\.\d)", p.read()) if m: return "python%s" % m.group(1) pythons = readFileEx(pathJoin(chroot_path, "/etc/python-exec/python-exec.conf"), grab=True).split() if pythons: for python in pythons: libpath = self.get_usrlib() if (path.exists(pathJoin(chroot_path, libpath, python)) and path.exists(pathJoin(chroot_path, "usr/bin", python))): return python for atom in sorted( getInstalledAtom("dev-lang/python",prefix=chroot_path), reverse=True): return "python{SLOT}".format(**atom) class VariableClTemplateWrongPatch(Variable): """ Действие выполняемое в случае если патч в EBUILD_PHASE=configure не подходит """ type = "choice" value = "warning" def choice(self): return [("warning", _("Show warning")), ("break", _("Break package building"))]