You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/portage.py

1827 lines
60 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
import os
from os import path
import time
import datetime
import functools
import bz2
from ..cl_xml import ET, str_to_xml_doc
import pexpect
from ..configparser import ConfigParser
from .colortext.palette import TextState
from .common import cmpVersion
from .files import (getProgPath, find, process, listDirectory, readFile,
readLinesFile, pathJoin, makeDirectory,
FilesError, rsync_files, RsyncOptions,
removeDir, removeFileWithEmptyDirectory, FindFileType)
from .tools import SaveableIterator, ignore
from .system import SystemPath
from collections.abc import Mapping
from collections import defaultdict
from .common import getTupleVersion
from contextlib import closing
from functools import total_ordering
#no ifiller, imap in python3
from itertools import chain, groupby
import hashlib
Colors = TextState.Colors
import glob
_ = lambda x: x
from ..cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
VDB_PATH = 'var/db/pkg'
reVerSplit = re.compile(r"^(?:.*/%s/)?(?:(\w+-\w+)/)?(.*?)-(([^-]+?)"
"(?:-(r\d+))?)(?:.(tbz2))?$" % VDB_PATH, re.S)
class RepositoryPath():
Layman = '/var/lib/layman'
Repos = "/var/db/pkg"
Calculate = path.join(Repos, "calculate")
Distros = path.join(Repos, "distros")
Gentoo = '/usr/portage'
CalculateProfiles = path.join(Calculate, "profiles")
CalculateTemplates = path.join(CalculateProfiles, "templates")
def reVerSplitToPV(x):
"""Convert match from reVerSplit to PV hash"""
# if type(x) in (str, unicode):
if isinstance(x, str):
x = reVerSplit.search(x)
if x:
match = x.groups()
return {'CATEGORY': match[0] or "",
'PN': match[1],
'PF': "%s-%s" % (match[1], match[2]),
'P': "%s-%s" % (match[1], match[3]),
'PV': match[3],
'PR': match[4] or "r0",
'PVR': match[2]}
return {'PN': "",
'PF': "",
'P': "",
'PV': "",
'PR': "",
'PVR': ""}.copy()
def getPkgUses(fullpkg, version=None, prefix="/"):
"""Get USE and IUSE from package"""
category, slash, pkg = fullpkg.partition('/')
_pkgCategory = '{0}/{1}'.format(VDB_PATH, category)
pkgCategory = path.join(prefix, _pkgCategory)
if version is None:
packages = [x for x
in [reVerSplitToPV(y) for y
in [reVerSplit.search(z) for z
in listDirectory(pkgCategory)] if y] if x['PN'] == pkg]
if not packages:
return None
usePath = path.join(pkgCategory, packages[-1]['PF'], "USE")
iusePath = path.join(pkgCategory, packages[-1]['PF'], "IUSE")
else:
usePath = path.join(pkgCategory, "%s-%s" % (pkg, version), "USE")
iusePath = path.join(pkgCategory, "%s-%s" % (pkg, version), "IUSE")
iuse = readFile(iusePath).strip().split()
use = readFile(usePath).strip().split()
return ([x[1:] if x.startswith("+") else x for x in use if x],
[x[1:] if x.startswith("+") else x for x in iuse if x])
def isPkgInstalled(pkg, prefix='/', sortByVersion=False):
"""Check is package installed"""
pkgDir = path.join(prefix, VDB_PATH)
if "/" in pkg:
category, op, pkgname = pkg.partition('/')
def package_generator(pkg_dn):
for dn in listDirectory(pkg_dn, fullPath=True):
pkg_info = reVerSplitToPV(dn)
if pkg_info['PN'] == pkgname:
pkg_info.update({'CATEGORY': category})
yield pkg_info
res = list(package_generator(path.join(pkgDir, category)))
if len(res) > 1 and sortByVersion:
return sorted(res, key=lambda x: getTupleVersion(x['PVR']))
else:
return res
else:
return [x for x in listDirectory(pkgDir, fullPath=True)
if [y for y in map(reVerSplitToPV,listDirectory(x)) if y['PN'] == pkg]]
def getPkgSlot(pkg, prefix='/'):
"""Get package slot"""
pkgs = isPkgInstalled(pkg, prefix)
pkgDir = path.join(prefix, VDB_PATH)
return [readFile(path.join(pkgDir, x['CATEGORY'],
x['PF'], "SLOT")).strip()
for x in pkgs]
def getPkgActiveUses(fullpkg, prefix="/"):
"""Get active uses from package"""
res = getPkgUses(fullpkg, prefix=prefix)
if not res:
return None
return list(set(res[0]) & set(res[1]))
def getInstalledAtom(str_atom, prefix='/'):
"""
Получить установленные пакеты по указанному ATOM
:param str_atom:
:param prefix:
:return:
"""
useglob = "*" in str_atom
atom = EmergePackage(str_atom)
db_path = path.join(prefix, VDB_PATH)
def get_package_dn():
if useglob:
if str_atom == "*":
mask = "*/*"
else:
mask= str_atom
glob_fn = path.join(db_path, "%s/*.ebuild" % mask)
for pkg_dn in (path.dirname(x) for x in glob.glob(glob_fn)):
yield pkg_dn
else:
glob_fn = path.join(db_path, "%s-*" % atom["CATEGORY/PN"])
for pkg_dn in glob.glob(glob_fn):
yield pkg_dn
for pkg_dn in get_package_dn():
slot = readFile(path.join(pkg_dn, "SLOT"))
# удаляем subslot
slot = slot.partition("/")[0]
if not slot.strip():
continue
find_atom = EmergePackage("%s:%s" % (pkg_dn[len(db_path) + 1:], slot))
if useglob:
yield find_atom
else:
if find_atom["CATEGORY/PN"] == atom["CATEGORY/PN"]:
if atom["SLOT!"] and atom["SLOT"] != find_atom["SLOTONLY"]:
continue
if atom["PN"] != atom["PF"] and find_atom["PF"] != atom["PF"]:
continue
yield find_atom
def getPortagePython(prefix='/'):
"""
Получить python с которым может рабоать portage
"""
for use in getPkgActiveUses("sys-apps/portage", prefix=prefix):
if use.startswith("python_targets_python3"):
return "/usr/bin/%s" % use[15:].replace("_",".")
return "/usr/bin/python3"
def getSquashList():
"""Get supprted squashfs compressions method"""
wantMethod = {"lzo", "lz4", "lzma", "xz", "gzip", "zstd"}
squashfs_tools = "sys-fs/squashfs-tools"
usesSquashFs = getPkgActiveUses(squashfs_tools)
if not usesSquashFs:
return ["gzip"]
else:
pkgInfo = isPkgInstalled(squashfs_tools)
if pkgInfo and pkgInfo[0].get('PV', None):
pkgVer = getTupleVersion(pkgInfo[0].get('PV'))
gzipVer = getTupleVersion('4.2')
if pkgVer >= gzipVer:
usesSquashFs.append('gzip')
return list(set(usesSquashFs) & wantMethod)
class RepositorySubstituting():
"""
Объект выполняющий подставнку repository:path пути
так как для подстановки необходимо получить все пути репозиториев, и это
занимает время, то объек извлекает данные из переменной только при
необходимости: в строке найдено repository:
"""
token = re.compile("^\w+:")
def __init__(self, dv, system_root=''):
self.dv = dv
self._substitution = None
self.system_root = system_root.rstrip("/")
@classmethod
def has_repos_name(cls, s):
return bool(cls.token.search(s))
def _prepare_substitution(self):
# TODO: проверка выхода за chroot
emerge_config = self.dv.Get('cl_emerge_config')
if emerge_config and emerge_config.repositories:
repos = {x.name: x.location
for x in emerge_config.repositories}
r = re.compile("|".join("^%s:" % x for x in repos.keys()))
self._substitution = (
functools.partial(
r.sub, lambda *args: "%s%s/profiles/" % (
self.system_root, repos.get(
args[0].group(0)[:-1], ""))))
else:
self._substitution = lambda x: x
def __call__(self, s):
if self.has_repos_name(s):
if not self._substitution:
self._prepare_substitution()
return self._substitution(s)
return s
def searchProfile(dirpath, configname, repository_sub=None):
"""Search profile"""
def search(dirpath):
parentpath = path.join(dirpath, "parent")
if path.exists(parentpath):
for line in open(parentpath, 'r'):
if repository_sub:
line = repository_sub(line)
search_dn = path.realpath(path.join(dirpath, line.strip()))
for dn in search(search_dn):
yield dn
fullconfig = path.join(dirpath, configname)
if path.exists(fullconfig):
yield fullconfig
return list(search(dirpath))
class ReposConf():
def __init__(self, conffile, confdir, prefix="/"):
self.conffile = conffile
self.prefix = prefix
self.confdir = confdir
def add_default(self, config):
if config["DEFAULT"].get("main-repo") is None:
config["DEFAULT"]["main-repo"] = "gentoo"
def add(self, rname, rurl, rpath):
if not self.conffile:
return
conffile = pathJoin(self.prefix, self.conffile)
config = ConfigParser(strict=False)
config.read(conffile, encoding="utf-8")
if not config.has_section(rname):
config.add_section(rname)
for k, v in {'auto-sync': 'Yes',
'priority': '50',
'sync-uri': rurl,
'sync-type': 'calculate',
'location': rpath}.items():
config.set(rname, k, v)
self.add_default(config)
with open(conffile, 'w') as f:
config.write(f)
def remove(self, rname):
if not self.conffile:
return
conffile = pathJoin(self.prefix, self.conffile)
config = ConfigParser(strict=False)
config.read(conffile)
if config.has_section(rname):
config.remove_section(rname)
self.add_default(config)
with open(conffile, 'w') as f:
config.write(f)
def get_calculate_repositories(self):
"""
Возвращает Calculate, не Portage репозитории (название, путь)
"""
conffile = pathJoin(self.prefix, self.conffile)
config = ConfigParser(strict=False)
config.read(conffile)
for rep in config.sections():
if rep != "gentoo":
location = config[rep]["location"]
if location:
yield rep, location
def get_other_repositories(self):
"""
Возвращает не-Calculate, не Portage репозитории (название, путь)
"""
confdir = pathJoin(self.prefix, self.confdir)
conffn = pathJoin(self.prefix, self.conffile)
skip_reps = {x[0] for x in self.get_calculate_repositories()}
skip_reps.add("gentoo")
for conffile in glob.glob("%s/*.conf" % confdir):
if conffile == conffn:
continue
config = ConfigParser(strict=False)
config.read(conffile)
for rep in config.sections():
if rep not in skip_reps:
location = config[rep].get("location")
synctype = config[rep].get("sync-type")
if location and synctype:
yield rep, location
class Layman():
"""
Объект для управления репозиториями Layman
Args:
installed: путь до installed.xml
laymanconf: путь до layman.conf
makeconf: путь до makeconf
"""
layman_package = "app-portage/layman"
portage_package = "sys-apps/portage"
def __init__(self, installed, makeconf, laymanconf=None, prefix="/"):
self.installed = installed
self.makeconf = makeconf
self.laymanconf = laymanconf
self.prefix = prefix
def _add_to_laymanconf(self, rname, rurl, rpath):
if not self.laymanconf:
return
config = ConfigParser(strict=False)
config.read(self.laymanconf, encoding="utf-8")
if not config.has_section(rname):
config.add_section(rname)
for k, v in {'auto-sync': 'Yes',
'layman-type': 'git',
'priority': '50',
'sync-uri': rurl,
'sync-type': 'laymansync',
'location': rpath}.items():
config.set(rname, k, v)
with open(self.laymanconf, 'wb') as f:
config.write(f)
def _remove_from_laymanconf(self, rname):
if not self.laymanconf:
return
config = ConfigParser(strict=False)
config.read(self.laymanconf, encoding="utf-8")
if config.has_section(rname):
config.remove_section(rname)
with open(self.laymanconf, 'wb') as f:
config.write(f)
def _add_to_installed(self, rname, rurl):
"""
Добавить репозиторий в installed.xml
"""
if path.exists(self.installed) and readFile(self.installed).strip():
tree = ET.parse(self.installed)
root = tree.getroottree().getroot()
# если репозиторий уже присутсвует в installed.xml
if root.find("repo[name='%s']" % rname) is not None:
return
else:
root = ET.Element("repositories", version="1.0")
tree = ET.ElementTree(root)
newrepo = ET.SubElement(root, "repo", priority="50",
quality="experimental",
status="unofficial")
name = ET.SubElement(newrepo, "name")
name.text = rname
source = ET.SubElement(newrepo, "source", type="git")
source.text = rurl
try:
from layman.utils import indent
except ImportError:
indent = None
if indent:
indent(root)
with open(self.installed, 'w') as f:
f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
tree.write(f, encoding="utf-8")
portdir_param = "PORTDIR_OVERLAY"
def _add_to_makeconf(self, rpath):
"""
Добавить репозиторий в layman/make.conf
"""
def fixContent(match):
repos = match.group(1).strip().split('\n')
if not rpath in repos:
repos.insert(0, rpath)
return '%s="\n%s"' % (self.portdir_param, "\n".join(repos))
if path.exists(self.makeconf):
content = readFile(self.makeconf)
if self.portdir_param in content:
new_content = re.sub('\A%s="([^\"]+)"' % self.portdir_param,
fixContent, content, re.DOTALL)
if new_content == content:
return
else:
content = new_content
else:
content = '%s="\n%s"\n' % (self.portdir_param, rpath + content)
else:
content = '%s="\n%s"\n' % (self.portdir_param, rpath)
with open(self.makeconf, 'w') as f:
f.write(content)
def _remove_from_makeconf(self, rpath):
"""
Удалить путь из make.conf
"""
def fixContent(match):
repos = match.group(1).strip().split('\n')
if rpath in repos:
repos.remove(rpath)
return '%s="\n%s"' % (self.portdir_param, "\n".join(repos))
if path.exists(self.makeconf):
content = readFile(self.makeconf)
if self.portdir_param in content:
new_content = re.sub('\A%s="([^\"]+)"' % self.portdir_param,
fixContent, content, re.DOTALL)
if new_content == content:
return
with open(self.makeconf, 'w') as f:
f.write(new_content)
def is_new_layman(self):
layman_pkg_info = isPkgInstalled(self.layman_package,
prefix=self.prefix)
portage_pkg_info = isPkgInstalled(self.portage_package,
prefix=self.prefix)
if layman_pkg_info and portage_pkg_info:
layman_ver = layman_pkg_info[0].get('PV')
portage_ver = portage_pkg_info[0].get('PV')
if (cmpVersion(layman_ver, "2.3") >= 0 and
cmpVersion(portage_ver, "2.2.18") >= 0):
return True
return False
def add(self, rname, rurl, rpath):
"""
Добавить репозиторий в installed.xml и layman/make.conf
"""
self._add_to_installed(rname, rurl)
if self.is_new_layman():
self._add_to_laymanconf(rname, rurl, rpath)
self._add_to_makeconf(rpath)
return True
def _remove_from_installed(self, rname):
"""
Удалить репозиторий в installed.xml
"""
if path.exists(self.installed) and readFile(self.installed).strip():
tree = ET.parse(self.installed)
root = tree.getroottree().getroottree().getroottree().getroottree().getroottree().getroottree().getroot()
el = root.find("repo[name='%s']" % rname)
if el is not None:
root.remove(el)
try:
from layman.utils import indent
except ImportError:
indent = None
if indent:
indent(root)
with open(self.installed, 'w') as f:
f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
tree.write(f, encoding="utf-8")
def remove(self, rname, rpath):
"""
Удалить репозиторий из настроек layman
"""
self._remove_from_installed(rname)
if self.is_new_layman():
self._remove_from_laymanconf(rname)
self._remove_from_makeconf(rpath)
return True
def get_installed(self):
"""
Получить список установленных репозиториев
"""
if path.exists(self.installed) and readFile(self.installed).strip():
tree = ET.parse(self.installed)
return [x.text for x in tree.findall("repo/name")]
return []
@total_ordering
class EmergePackage(Mapping):
"""
Данные о пакете
Item keys: CATEGORY, P, PN, PV, P, PF, PR, PVR
Поддерживает сравнение объекта с другим таким же объектом по версии, либо
со строкой, содержащей версию. Сравнение выполняется по категория/имя, затем
по версии
"""
default_repo = 'gentoo'
prefix = r"(?:.*/%s/|=)?" % VDB_PATH
category = r"(?:(\w+(?:-\w+)?)/)?"
pn = "([^/:]*?)"
pv = r"(?:-(\d[^:-]*?))?"
pr = r"(?:-(r\d+))?"
tbz = r"(?:.(tbz2))?"
slot = r'(?::([^:/]+(?:/[^:/]+)?))?'
repo = r'(?:::([\w-]+))?'
reParse = re.compile(
r'^{prefix}{category}(({pn}{pv}){pr}){slot}{repo}{tbz}$'.format(
prefix=prefix, category=category, pn=pn, pv=pv, pr=pr, tbz=tbz,
slot=slot, repo=repo))
attrs = ('CATEGORY', 'PN', 'PF', 'SLOT', 'REPO', 'P', 'PV', 'PR', 'PVR',
'CATEGORY/PN', 'SLOT!', 'SLOTONLY', 'CATEGORY/PF')
def _parsePackageString(self, s):
"""
Преобразовать строка в части названия пакета
"""
x = self.reParse.search(s)
if x:
CATEGORY, PF, P, PN, PV, PR, SLOT, REPO, TBZ = range(0, 9)
x = x.groups()
d = {'CATEGORY': x[CATEGORY] or "",
'PN': x[PN],
'PV': x[PV] or '0',
'PF': x[PF],
'P': x[P],
'SLOT': (x[SLOT] or '0').strip(),
'SLOTONLY': (x[SLOT] or '0').partition('/')[0].strip(),
'SLOT!': (x[SLOT] or '').strip(),
'REPO': x[REPO] or self.default_repo,
'CATEGORY/PN': "%s/%s" % (x[CATEGORY], x[PN]),
'PR': x[PR] or 'r0'}
if x[PR]:
d['PVR'] = "%s-%s" % (d['PV'], d['PR'])
else:
d['PVR'] = d['PV']
if d['PF'].endswith('-r0'):
d['PF'] = d['PF'][:-3]
d['CATEGORY/PF'] = "%s/%s" % (x[CATEGORY], d['PF'])
return d.copy()
else:
return {k: '' for k in self.attrs}
def __iter__(self):
return iter(self.attrs)
def __len__(self):
if not self['PN']:
return 0
else:
return len(self.attrs)
def __lt__(self, version):
"""
В объектах сравнивается совпадение категории и PF
"""
if "CATEGORY/PN" in version and "PVR" in version:
if self['CATEGORY/PN'] < version['CATEGORY/PN']:
return True
elif self['CATEGORY/PN'] > version['CATEGORY/PN']:
return False
version = "%s-%s" % (version['PV'], version['PR'])
currentVersion = "%s-%s" % (self['PV'], self['PR'])
return cmpVersion(currentVersion, version) == -1
def __eq__(self, version):
if "CATEGORY" in version and "PF" in version:
return ("%s/%s" % (self['CATEGORY'], self['PF']) ==
"%s/%s" % (version['CATEGORY'], version['PF']))
else:
currentVersion = "%s-%s" % (self['PV'], self['PR'])
return cmpVersion(currentVersion, version) == 0
def __init__(self, package):
if isinstance(package, EmergePackage):
self.__result = package.__result
self._package = package._package
else:
self._package = package
self.__result = None
def __getitem__(self, item):
if not self.__result:
self.__result = self._parsePackageString(self._package)
return self.__result[item]
def __repr__(self):
return "EmergePackage(%s/%s)" % (self['CATEGORY'], self['PF'])
def __str__(self):
return self['CATEGORY/PF']
def __hash__(self):
return hash(str(self))
class PackageInformation:
"""
Объект позволяет получать информацию о пакете из eix
"""
eix_cmd = getProgPath("/usr/bin/eix")
query_packages = []
information_cache = defaultdict(dict)
fields = ["DESCRIPTION"]
def __init__(self, pkg):
self._pkg = pkg
if not pkg in self.query_packages:
self.query_packages.append(pkg)
def __getitem__(self, item):
if not self._pkg['CATEGORY/PN'] in self.information_cache and \
self.query_packages:
self.query_information()
try:
return self.information_cache[self._pkg['CATEGORY/PN']][item]
except KeyError:
return ""
def query_information(self):
pkg_list = "|".join(
[x['CATEGORY/PN'].replace("+", r"\+") for x in self.query_packages])
try:
output = pexpect.spawn(self.eix_cmd, ["--xml", pkg_list],
timeout=60).read()
except pexpect.TIMEOUT:
output = b""
re_cut = re.compile(b"^.*?(?=<\?xml version)", re.S)
with ignore(ET.ParseError):
xml = ET.fromstring(re_cut.sub(b'', output))
for pkg in self.query_packages:
cat_pn = pkg['CATEGORY/PN']
if not cat_pn in self.information_cache:
descr_node = xml.find(
'category[@name="%s"]/package[@name="%s"]/description'
% (pkg['CATEGORY'], pkg['PN']))
if descr_node is not None:
self.information_cache[cat_pn]['DESCRIPTION'] = \
descr_node.text
while self.query_packages:
self.query_packages.pop()
@classmethod
def add_info(cls, pkg):
pkg.info = cls(pkg)
return pkg
class UnmergePackage(EmergePackage):
"""
Информация об обновлении одного пакета
"""
re_pkg_info = re.compile("^\s(\S+)\n\s+selected:\s(\S+)", re.MULTILINE)
def __init__(self, package):
super().__init__(package)
if not isinstance(package, EmergePackage):
self._package = self.convert_package_info(package)
def convert_package_info(self, package):
match = self.re_pkg_info.search(package)
if match:
return "%s-%s" % match.groups()
return ""
def recalculate_update_info(cls):
"""
Добавить
"""
cls.update_info = re.compile(
r"^({install_info})\s+({atom_info})\s*(?:{prev_version})?"
r"\s*({use_info})?.*?({pkg_size})?$".format(
install_info=cls.install_info,
atom_info=cls.atom_info,
prev_version=cls.prev_version,
use_info=cls.use_info,
pkg_size=cls.pkg_size), re.MULTILINE)
return cls
@recalculate_update_info
@total_ordering
class EmergeUpdateInfo(Mapping):
"""
Информация об обновлении одного пакета
"""
install_info = "\[(binary|ebuild)([^\]]+)\]"
atom_info = r"\S+"
use_info = 'USE="[^"]+"'
prev_version = "\[(?:[^,\]]+, )*([^\]]+)\]"
pkg_size = r"[\d,]+ \w+"
attrs = ['binary', 'REPLACING_VERSIONS', 'SIZE', 'new', 'newslot',
'updating', 'downgrading', 'reinstall']
def __init__(self, data):
self._data = data
self._package = None
self._info = {}
def _parseData(self):
r = self.update_info.search(self._data)
if r:
self._info['binary'] = r.group(2) == 'binary'
install_flag = r.group(3)
self._info['newslot'] = "S" in install_flag
self._info['new'] = "N" in install_flag and not "S" in install_flag
self._info['updating'] = ("U" in install_flag and
not "D" in install_flag)
self._info['downgrading'] = "D" in install_flag
self._info['reinstall'] = "R" in install_flag
self._package = EmergePackage(r.group(4))
self._info['REPLACING_VERSIONS'] = r.group(5) or ""
self._info['SIZE'] = r.group(7) or ""
def __iter__(self):
return chain(EmergePackage.attrs, self.attrs)
def __len__(self):
if not self['PN']:
return 0
else:
return len(EmergePackage.attrs) + len(self.attrs)
def __getitem__(self, item):
if not self._info:
self._parseData()
if item in self._info:
return self._info[item]
if self._package:
return self._package[item]
return None
def __lt__(self, version):
if not self._info:
self._parseData()
return self._package < version
def __eq__(self, version):
if not self._info:
self._parseData()
return self._package == version
def __contains__(self, item):
if not self._info:
self._parseData()
return item in self.attrs or item in self._package
def __repr__(self):
return "EmergeUpdateInfo(%s/%s,%s)" % (
self['CATEGORY'], self['PF'],
"binary" if self['binary'] else "ebuild")
def __str__(self):
return "%s/%s" % (self['CATEGORY'], self['PF'])
@recalculate_update_info
class EmergeRemoveInfo(EmergeUpdateInfo):
"""
Информация об удалении одного пакета (в списке обновляемых пакетов)
"""
install_info = "\[(uninstall)([^\]]+)\]"
class Eix():
"""
Вызов eix
package : пакет или список пакетов
*options : параметры eix
all_versions : отобразить все версии пакета или наилучшую
"""
cmd = getProgPath("/usr/bin/eix")
class Option():
Installed = '--installed'
Xml = '--xml'
Upgrade = '--upgrade'
TestObsolete = '--test-obsolete'
Exact = "--exact"
default_options = [Option.Xml]
def __init__(self, package, *options, **kwargs):
if type(package) in (tuple, list):
self.package = list(package)
else:
self.package = [package]
self._options = options
self.no_default = kwargs.get('no_default', False)
if not kwargs.get('all_versions', False):
self.parser = EixFullnameParserBestVersion()
else:
self.parser = EixFullnameParser()
@property
def options(self):
if self.no_default:
return list(self._options) + self.package
else:
return list(self._options) + self.package + self.default_options
def _process(self):
return process(self.cmd, *self.options, env={"LANG": "C"})
def get_output(self):
"""
Получить вывод eix
"""
with closing(self._process()) as p:
return p.read()
def get_packages(self):
"""
Получить список пакетов
"""
return list(self.parser.parseXml(self.get_output()))
class EixParser():
"""
Парсер XML вывода от eix
"""
def parseXml(self, buffer):
try:
eix_xml = str_to_xml_doc(buffer)
return self.get_categories(eix_xml)
except ET.ParseError:
return iter(())
def get_categories(self, et):
raise StopIteration()
class EixFullnameParser(EixParser):
"""
Получить все версии пакета
"""
def get_versions(self, et):
for ver in et.iterfind('version'):
yield ver.attrib['id']
def get_packages(self, et):
for pkg in et:
for version in self.get_versions(pkg):
if version:
yield "%s-%s" % (pkg.attrib['name'], version)
else:
yield pkg.attrib['name']
def get_categories(self, et):
for category in et:
for pkg in self.get_packages(category):
yield EmergePackage("%s/%s" % (category.attrib['name'], pkg))
class EixFullnameParserBestVersion(EixFullnameParser):
"""
Получить только одну максимальную версию пакета
"""
def get_versions(self, et):
ret = None
for ver in (x for x in et.iterfind('version') if x.find('mask') is None):
ret = ver.attrib['id']
yield ret
class PackageVersionInfo():
"""
Информация о версии пакета
"""
class Mask():
Keyword = 1
MissingKeyword = 2
Hardmask = 3
def __init__(self, pn, version):
self.pn = pn
self.slot = ""
self.version = version
self.mask = []
self.installed = False
def clone(self):
obj = PackageVersionInfo(self.pn, self.version)
obj.mask = list(self.mask)
obj.installed = self.installed
obj.slot = self.slot
return obj
@property
def stable(self):
return not self.mask
@property
def hardmask(self):
return PackageVersionInfo.Mask.Hardmask in self.mask
@property
def unstable(self):
return any(x in (PackageVersionInfo.Mask.Keyword,
PackageVersionInfo.Mask.MissingKeyword)
for x in self.mask)
@property
def unstable_keyword(self):
return PackageVersionInfo.Mask.Keyword in self.mask
@property
def missing_keyword(self):
return PackageVersionInfo.Mask.MissingKeyword in self.mask
def __repr__(self):
return "<{pn}-{pv}:{slot} {attrs}>".format(
pn=self.pn,
pv=self.version,
slot=self.slot,
attrs=" ".join(x for x in ("stable", "hardmask",
"unstable", "unstable_keyword",
"installed",
"missing_keyword") if getattr(self, x)))
class EixVersionParser(EixParser):
"""
Возвращает информацию о версии
"""
mask_map = {
'hard': PackageVersionInfo.Mask.Hardmask,
'keyword': PackageVersionInfo.Mask.Keyword,
'missing_keyword': PackageVersionInfo.Mask.MissingKeyword,
}
def get_categories(self, et):
for category in et:
for version in self.get_packages(category):
yield version
def get_packages(self, et):
for pkg in et:
pvi = PackageVersionInfo("%s/%s"%
(et.attrib['name'],
pkg.attrib['name']), None)
for version in self.get_versions(pkg, pvi):
yield version
def get_versions(self, et, pvi):
for ver in et.iterfind('version'):
retver = pvi.clone()
retver.version = ver.attrib['id']
if "slot" in ver.attrib:
retver.slot = ver.attrib['slot']
else:
retver.slot = "0"
if "installed" in ver.attrib:
retver.installed = True
for verattr in ver.iterfind('mask'):
_type = verattr.get('type')
if _type in self.mask_map:
retver.mask.append(self.mask_map[_type])
yield retver
class ChrootEix(Eix):
"""
Eix выполняемый в chroot
"""
def __init__(self, chroot_path, package, *options, **kwargs):
self.chroot_cmd = getProgPath("/usr/bin/chroot")
self.chroot_path = chroot_path
self.cmd = getProgPath("/usr/bin/eix", prefix=chroot_path)
super().__init__(package, *options, **kwargs)
def _process(self):
return process(self.chroot_cmd, *([self.chroot_path, self.cmd]
+ self.options))
class EmergeLogTask():
def has_marker(self, line):
"""
Определить есть ли в строке маркер задачи
"""
return False
def get_begin_marker(self):
"""
Получить маркер начала задачи
"""
return ""
def get_end_marker(self):
"""
Получить маркер завершения задачи
"""
return ""
class EmergeLogNamedTask(EmergeLogTask):
date_format = "%b %d, %Y %T"
def __init__(self, taskname):
self.taskname = taskname
def has_marker(self, line):
"""
Определить есть ли в строке маркер задачи
"""
return self.get_end_marker() in line
def get_begin_marker(self):
"""
Получить маркер начала задачи
"""
return "Calculate: Started {taskname} on: {date}".format(
taskname=self.taskname,
date=datetime.datetime.now().strftime(self.date_format))
def get_end_marker(self):
"""
Получить маркер завершения задачи
"""
return " *** Calculate: Finished %s" % self.taskname
class EmergeLogPackageTask(EmergeLogTask):
def has_marker(self, line):
"""
Определить есть ли в строке маркер завершения сборки пакета
"""
return ") Merging " in line
class EmergeLog():
"""
EmergeLog(after).get_update(package_pattern)
"""
_emerge_log = "var/log/emerge.log"
re_complete_emerge = re.compile(r":::\scompleted (emerge) \(.*?\) (\S+)",
re.M)
re_complete_unmerge = re.compile(r">>>\s(unmerge) success: (\S+)", re.M)
def __init__(self, emerge_task=EmergeLogTask(), prefix='/'):
"""
@type emerge_task: EmergeLogTask
"""
self.emerge_task = emerge_task
self._list = None
self._remove_list = None
self.emerge_log = path.join(prefix, self._emerge_log)
def _get_last_changes(self):
"""
Получить список измений по логу, от последней записи маркера
"""
log_data = SaveableIterator(iter(readLinesFile(self.emerge_log)))
for line in log_data.save():
if self.emerge_task.has_marker(line):
log_data.save()
return log_data.restore()
def get_last_time(self):
"""
Получить время послдней записи маркера
"""
last_line = ""
for line in readLinesFile(self.emerge_log):
if self.emerge_task.has_marker(line):
last_line = line
return last_line
@property
def list(self):
if self._list is None:
self.get_packages()
return self._list
@property
def remove_list(self):
if self._remove_list is None:
self.get_packages()
return self._remove_list
def get_packages(self):
"""
Получить список пакетов
"""
self._list = []
self._remove_list = []
for line in self._get_last_changes():
m = self.re_complete_emerge.search(line)
if m:
self._list.append(m.group(2))
m = self.re_complete_unmerge.search(line)
if m:
self._remove_list.append(m.group(2))
def did_update_world_happen(self):
latest_log = '\n'.join(line for line in self._get_last_changes())
reg_emerge = r"(?s)(?<=[\s|@]world\n).*?(?=\*\*\* terminating\.)"
reg_success = r"\w*(?=\: \*\*\* exiting successfully\.)"
matches = re.finditer(reg_emerge, latest_log)
match_successes = [re.search(reg_success, match.group()) for match in matches]
return any(match_successes)
def _set_marker(self, text_marker):
with open(self.emerge_log, 'a') as f:
f.write("{0:.0f}: {1}\n".format(time.time(), text_marker))
def mark_begin_task(self):
"""
Отметить в emerge.log начало выполнения задачи
"""
marker = self.emerge_task.get_begin_marker()
if marker:
self._set_marker(marker)
def mark_end_task(self):
"""
Отметить в emerge.log завершение выполнения задачи
"""
marker = self.emerge_task.get_end_marker()
if marker:
self._set_marker(marker)
class EmergeLogFiltered(EmergeLog):
"""
Возвращает только список пакетов установленных указанным образом
"""
class EmergeType():
Source = "source"
Binary = "binary"
re_merge_type = re.compile(r"Merging (Binary )?\((.*?)::", re.M)
emerge_type = EmergeType.Source
def get_packages(self):
"""
Получить список пакетов
"""
merge_types = {}
self._list = []
self._remove_list = []
for line in self._get_last_changes():
if "Started emerge" in line:
merge_types = {}
m_type = self.re_merge_type.search(line)
if m_type:
if m_type.group(1) is None:
merge_types[m_type.group(2)] = self.EmergeType.Source
else:
merge_types[m_type.group(2)] = self.EmergeType.Binary
m = self.re_complete_emerge.search(line)
if m and merge_types.get(m.group(2),'') is self.emerge_type:
self._list.append(m.group(2))
m = self.re_complete_unmerge.search(line)
if m:
self._remove_list.append(m.group(2))
class PackageList():
"""
Список пакетов с возможностью среза и сравнением с версией
"""
def __init__(self, packages):
self._raw_list = packages
self.result = None
def _packages(self):
if self.result is None:
self.result = [x for x in [(y if isinstance(y, Mapping) else EmergePackage(y)) for y in self._raw_list] if x['PN']]
return self.result
def __getitem__(self, item):
re_item = re.compile(item)
return PackageList([pkg for pkg in self._packages() if
re_item.search(pkg['CATEGORY/PN'])])
def __iter__(self):
return iter(self._packages())
def __len__(self):
return len(list(self._packages()))
def __lt__(self, other):
return any(pkg < other for pkg in self._packages())
def __le__(self, other):
return any(pkg <= other for pkg in self._packages())
def __gt__(self, other):
return any(pkg > other for pkg in self._packages())
def __ge__(self, other):
return any(pkg >= other for pkg in self._packages())
def __eq__(self, other):
return any(pkg == other for pkg in self._packages())
def __ne__(self, other):
return any(pkg != other for pkg in self._packages())
class Manifest:
"""
Объект используется для получения данных из Manifest
файлов портежей
"""
re_dist = re.compile("^DIST\s*(\S+)\s*")
def __init__(self, manifest):
self._manifest = manifest
def get_dist_files(self):
"""
Получить список файлов из Manifest
"""
return [x.group(1) for x in map(self.re_dist.search, readLinesFile(self._manifest)) if x]
class Ebuild:
"""
Объект используемый для получения данных из ebuild файлов
"""
def __init__(self, ebuild):
self._ebuild = ebuild
self._category = path.basename(
path.dirname(
path.dirname(ebuild)))
self._pkgname = path.basename(self._ebuild)[:-7]
def get_tbz2(self):
"""
Получить имя бинарного пакета с каталогом категории
"""
return "%s/%s.tbz2" % (self._category, self._pkgname)
def get_packages_files_directory(*directories):
"""
Получить список бинарных пакетов по ebuild файлам находящихся в директории
(sys-apps/portage-2.2.0.tbz2)
"""
for directory in directories:
directory = path.normpath(directory)
for ebuild in glob.glob("%s/*/*/*.ebuild" % directory):
yield Ebuild(ebuild).get_tbz2()
def get_manifest_files_directory(*directories):
"""
Получить список файлов из всех Manifest находящихся в директориях
и поддиректориях (portage-2.2.0.tar.bz2)
"""
for directory in directories:
directory = path.normpath(directory)
for manifest in glob.glob("%s/*/*/Manifest" % directory):
for fn in Manifest(manifest).get_dist_files():
yield fn
def get_remove_list(directory, filelist, depth=1):
"""
Получить все файлы из директории, которых нет в filelist
filelist может содержать как отдельный файл, так и с
директорией
"""
directory = path.normpath(directory)
l = len(directory) + 1
for fn in (x for x
in find(directory,onefilesystem=True,fullpath=True, depth=depth)
if not x[l:] in filelist):
if path.isfile(fn):
yield fn
class SimpleRepositoryMapper(Mapping):
"""
Определение пути до репозитория
"""
map_repository = {'gentoo': RepositoryPath.Gentoo,
'calculate': RepositoryPath.Calculate,
'distros': RepositoryPath.Distros}
layman_path = RepositoryPath.Layman
def __init__(self, prefix='/'):
self.prefix = prefix
def __getitem__(self, item):
if item in self.map_repository:
return pathJoin(self.prefix, self.map_repository[item])
return pathJoin(self.prefix, self.layman_path, item)
def __iter__(self):
return iter(self.map_repository)
def __len__(self):
return len(self.map_repository)
class EbuildInfoError(Exception):
pass
class EbuildInfo():
"""
Информация о ebuild (DEPEND) из metadata
"""
meta_suffix_path = 'metadata/md5-cache'
support_keys = ('SLOT', 'IUSE', 'DEPEND', 'RDEPEND', 'PDEPEND')
def __init__(self, atom, rpath):
meta_path = path.join(rpath,
self.meta_suffix_path)
self._meta_info_path = path.join(meta_path, atom)
if not path.exists(self._meta_info_path):
raise EbuildInfoError("Package is not found")
self._info = self.prepare_use_flags(self._get_info())
@staticmethod
def prepare_use_flags(d):
d["IUSE"] = tuple(sorted(x for x in d.get("IUSE","").split(' ') if x ))
return d
def _get_info(self):
with open(self._meta_info_path, 'r') as f:
return {k.strip(): v.strip() for k, v in (line.partition('=')[::2]
for line in f.readlines())
if k in self.support_keys}
def __getitem__(self, item):
if item in self.support_keys:
return self._info.get(item, '')
raise KeyError(item)
def __eq__(self, other):
return all(other[k] == self[k] for k in self.support_keys)
def __ne__(self, other):
return any(other[k] != self[k] for k in self.support_keys)
class InstalledPackageInfoError(Exception):
pass
class InstalledPackageInfo():
"""
Информация об установленном пакете (DEPEND) из /var/db/pkg
"""
depend_pattern = 'declare (?:-x )?({0})="([^"]+)"'
re_depend = re.compile(depend_pattern.format(
"|".join(EbuildInfo.support_keys)).encode('UTF-8'), re.DOTALL)
re_multispace = re.compile(b"\s+", re.DOTALL)
def __init__(self, atom, pkg_dir):
self.atom = atom
self._pkg_path = path.join(pkg_dir, atom)
if not path.exists(self._pkg_path):
raise InstalledPackageInfoError("Package is not found")
self._info = self._get_info()
def _get_info(self):
info = {k: "" for k in EbuildInfo.support_keys}
env_path = path.join(self._pkg_path, 'environment.bz2')
if path.exists(env_path):
with bz2.BZ2File(env_path, 'r') as f:
for r in self.re_depend.finditer(f.read()):
key, value = r.groups()
value = self.re_multispace.sub(b" ", value)
info[key.decode('UTF-8')] = value.strip().decode('UTF-8')
rep_path = path.join(self._pkg_path, 'repository')
info['repository'] = readFile(rep_path).strip()
return EbuildInfo.prepare_use_flags(info)
def __getitem__(self, item):
return self._info[item]
@classmethod
def get_install_packages(cls, pkg_dir='/%s' % VDB_PATH):
for category in listDirectory(pkg_dir):
for pkg in listDirectory(path.join(pkg_dir, category)):
if "MERGING" not in pkg and "lockfile" not in pkg:
yield InstalledPackageInfo("%s/%s" % (category, pkg),
pkg_dir=pkg_dir)
def __str__(self):
return self.atom
def __repr__(self):
return "InstalledPackageInfo(%s)" % self.atom
def makeCfgName(origfilename):
"""
Сгенерировать имя cfg0000 для файла
"""
directory, filename = path.split(origfilename)
for i in range(0, 9999):
fn = path.join(directory, "._cfg%04d_%s" % (i, filename))
if not path.exists(fn):
return fn
return origfilename
def getlibpaths(prefix="/"):
"""
Возвращает список путей
"""
ld_so_include_re = re.compile(r'^include\s+(\S.*)')
def read_ld_so_conf(fn):
for l in readLinesFile(fn, grab=True):
include_match = ld_so_include_re.match(l)
if include_match is not None:
subpath = path.join(path.dirname(fn),
include_match.group(1))
for p in glob.glob(subpath):
for r in read_ld_so_conf(p):
yield r
else:
yield l
rval = list(read_ld_so_conf(path.join(prefix, "etc", "ld.so.conf")))
systempath = SystemPath(prefix)
rval.append(systempath.lib)
rval.append(systempath.usrlib)
rval.append(systempath.clib)
rval.append(systempath.cusrlib)
return sorted(list(set([path.normpath(x) for x in rval if x])))
_re_req_arches = re.compile("(\w+:\s.*?)(?=\s\w+:|$)")
class LibraryProviders(Mapping):
"""
Пакеты, которым принадлежат библиотеки
TODO: sync to portage.py
"""
def __init__(self, vdb_path=VDB_PATH, prefix="/"):
self.data = {}
vdb = path.join(prefix, vdb_path)
lvdb = len(vdb) + 1
libpath = getlibpaths(prefix)
pattern_libpath = "|".join(re.escape(x) for x in libpath)
for provides in glob.glob(path.join(vdb, "*/*/PROVIDES"))[1:]:
p = path.dirname(provides[lvdb:])
slot = readFile(provides.replace(
"PROVIDES", "SLOT")).strip().partition('/')[0]
package = EmergePackage("%s:%s" % (p, slot))
content = readFile(provides.replace(
"PROVIDES", "CONTENTS"))
for arch, libraries in (
x.partition(": ")[::2]
for x in _re_req_arches.findall(readFile(provides))):
if arch not in self.data:
self.data[arch] = defaultdict(list)
for library in list(set(
re.findall("^(?:sym|obj)\s+(?:%s)/(%s)\s+" % (
pattern_libpath, "|".join(
re.escape(x) for x in libraries.split())),
content, flags=re.M))):
self.data[arch][library].append(package)
def __getitem__(self, item):
return self.data[item]
def __iter__(self):
return iter(self.data)
def __len__(self):
return len(self.data)
def getRequires(package, vdb_path=VDB_PATH, prefix='/'):
"""
Получить список библиотек требуемых для работы пакета
:param package:
:param prefix:
:return:
"""
package = EmergePackage(package)
vdb = path.join(prefix, vdb_path)
requires = "%s/%s/REQUIRES"%(vdb, package["CATEGORY/PF"])
for arch, libraries in (x.partition(": ")[::2]
for x in _re_req_arches.findall(readFile(requires))):
for require in libraries.split():
yield arch, require
class PackageError(Exception):
pass
def hide_packages(*packages, **kw):
"""
Спрятать пакет (для исключение автозависимостей при сборки некоторых
пакетов) prefix (/), force_remove=False
:param package:
:param prefix:
:return:
"""
prefix = kw.get('prefix', '/')
force_remove = kw.get('force_remove', False)
from .content import PkgContents
base_dn = path.join(prefix, 'var/lib/calculate/calculate-builder')
hide_dn = path.join(base_dn, "automagic-requires")
if path.exists(hide_dn):
if force_remove:
removeDir(hide_dn)
else:
raise PackageError(
_("Failed to hide {packages}: {error}").format(
packages=",".join(str(x) for x in packages),
error=_("package is already hidden")))
makeDirectory(hide_dn)
def arch_list():
for package in packages:
content = PkgContents(package, prefix=prefix)
for fn, obj_data in content.content.items():
if obj_data['type'] in ("obj", "sym"):
yield fn
arch_files = list(arch_list())
if not arch_files:
return
try:
arch_files = [x for x in arch_files
if path.lexists(pathJoin(prefix, x))]
rsync_files(prefix, hide_dn, *arch_files,
opts=RsyncOptions.HidePackageOpts)
for fn in (pathJoin(prefix, x) for x in arch_files):
os.unlink(fn)
except FilesError as e:
raise PackageError(
_("Failed to hide {package}: {error}".format(
package=",".join(str(x) for x in packages),
error=str(e))))
def unhide_packages(prefix='/', force=False):
"""
Восстановить спрятанные файлы пакета, prefix (/)
:param prefix:
:param force:
:return:
"""
# /var/lib/calculate/calculate-builder/automagic-requires
base_dn = path.join(prefix, 'var/lib/calculate/calculate-builder')
hide_dn = path.join(base_dn, "automagic-requires")
if not path.exists(hide_dn):
if not force:
raise PackageError(
_("Failed to unhide packages: {error}").format(
error=_("directory not found")))
return
try:
rsync_files(hide_dn, prefix, opts=RsyncOptions.HidePackageOpts)
removeDir(hide_dn)
removeFileWithEmptyDirectory(hide_dn, stopDirectory=base_dn)
except FilesError as e:
raise PackageError(
_("Failed to unhide: {error}").format(
error=str(e)))
XPAK_TMP = "/dev/shm/xpak"
class tbz2():
"""
Python3 portage.xpak.tbz2 simple wrapper
"""
def __init__(self, tbzfile):
self.python3 = getPortagePython()
self.tbzfile = tbzfile
def _escape(self, s):
return s.replace("'","\\'")
def decompose(self, workdir):
p = process(self.python3)
p.write("from portage.xpak import tbz2\n"
"tbz2('%s').decompose('%s')" % (
self._escape(self.tbzfile),
self._escape(workdir)))
p.stdin.close()
if not p.success():
raise PackageError(_("Failed to decompose %s") % self.tbzfile)
def compose(self, workdir):
p = process(self.python3)
p.write("from portage.xpak import tbz2\n"
"tbz2('%s').compose('%s')" % (
self._escape(self.tbzfile),
self._escape(workdir)))
p.stdin.close()
if not p.success():
raise PackageError(_("Failed to compose %s") % self.tbzfile)
class BinaryPackage():
"""
Объект для модификации var/db файлов бинарного пакета
"""
def __init__(self, xpak_fn, work_dn=XPAK_TMP):
self.work_dn = work_dn
self.tbz2 = tbz2(xpak_fn)
self.tbz2.decompose(self.work_dn)
def __setitem__(self, item, value):
fn = path.join(self.work_dn, item)
with open(fn, 'w') as f:
f.write("%s\n" % value.rstrip())
def __getitem__(self, item):
fn = path.join(self.work_dn, item)
try:
with open(fn, 'r') as f:
return f.read().strip()
except (IOError, OSError):
return ""
def save(self):
self.tbz2.compose(self.work_dn)
def clear(self):
removeDir(self.work_dn)
class WorldPackage():
def __init__(self, package):
self.pkg = EmergePackage(package)
def __hash__(self):
return hash("%s:%s" % (self.pkg["CATEGORY/PN"], self.pkg["SLOT!"]))
def __eq__(self, obj):
return (self.pkg["CATEGORY/PN"] == obj["CATEGORY/PN"] and
self.pkg["SLOT!"] == obj["SLOT!"])
def __getitem__(self, item):
return self.pkg[item]
def __repr__(self):
return str("%s:%s"%(self.pkg["CATEGORY/PN"], self.pkg["SLOT"])
if self.pkg["SLOT!"] else self.pkg["CATEGORY/PN"])
class WorldFile():
class DiffType():
Added = 0
Removed = 1
Omitted = 2
def __init__(self, data):
self.data = data
self.packages = {
WorldPackage(x)
for x in (x.strip() for x in self.data.split('\n')
if x.strip() and not x.startswith("#"))}
def diff_new(self, worldfile):
return worldfile.packages - self.packages
def diff_removed(self, worldfile):
return self.packages - worldfile.packages
def diff_omitted(self, worldfile):
return worldfile.packages & self.packages
def category_diff(self, worldfile):
for group, data in groupby(sorted(chain(
((WorldFile.DiffType.Added, x)
for x in self.diff_new(worldfile)),
((WorldFile.DiffType.Removed, x)
for x in self.diff_removed(worldfile)),
((WorldFile.DiffType.Omitted, x)
for x in self.diff_omitted(worldfile))),
key=lambda x: x[1]["CATEGORY/PN"]),
lambda x: x[1]["CATEGORY/PN"]):
data = list(data)
added = sorted((pkg for _type, pkg in data
if _type == WorldFile.DiffType.Added),
key=lambda x: str(x))
removed = sorted((pkg for _type, pkg in data
if _type == WorldFile.DiffType.Removed),
key=lambda x: str(x))
omitted = sorted((pkg for _type, pkg in data
if _type == WorldFile.DiffType.Omitted),
key=lambda x: str(x))
if added or removed:
yield (group, added, removed, omitted)
def get_binary_file(pkg, pkgdir):
"""
Получить имя файла бинарного пакета
:param pkg:
:param pkgdir:
:return:
"""
return pathJoin(pkgdir, "%s.tbz2" % str(pkg))
def clear_binhost_garbage(dn):
"""
Очистить директорию binhost от временных файлов
:param dn:
:return:
"""
for fn in find(dn, filetype=FindFileType.RegularFile,
fullpath=True):
if fn.endswith(".portage.lockfile") or fn.endswith(".partial"):
os.unlink(fn)
class PortageState():
"""
Подсчёт контрольной суммы для определения изменений /etc/portage
"""
paths = ["/etc/portage", "/var/log/emerge.log"]
def __init__(self, prefix='/'):
self.prefix = prefix
def readFileContent(self, fn):
if fn.endswith("/var/log/emerge.log"):
return readFile(fn)[:-200]
return readFile(fn)
def getpathdata(self, fn):
yield "%s"%fn
if path.lexists(fn):
if path.islink(fn):
yield os.readlink(fn)
elif path.isdir(fn):
for x in os.listdir(fn):
dfn = pathJoin(fn, x)
for dirdata in self.getpathdata(dfn):
yield dirdata
else:
try:
yield readFile(fn)
except UnicodeDecodeError as e:
pass
def get_state(self):
m = hashlib.md5()
for dn in self.paths:
fdn = pathJoin(self.prefix, dn)
for data in self.getpathdata(fdn):
m.update(data.encode("UTF-8"))
return m.hexdigest()