You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-update/update/package_tools.py

1140 lines
43 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Mapping
import re
import sys
from calculate.lib.utils.colortext.palette import TextState
from calculate.lib.utils.tools import AddonError, SavableIterator
import time
import datetime
Colors = TextState.Colors
import pexpect
from os import path
from calculate.lib.utils.files import (getProgPath, STDOUT,
PercentProgress, process, readFile,
readLinesFile)
from calculate.lib.utils.colortext.output import XmlOutput
from calculate.lib.utils.colortext.converter import (ConsoleCodes256Converter,
XmlConverter)
from calculate.lib.utils.common import cmpVersion
from contextlib import closing
import xml.etree.ElementTree as ET
from calculate.lib.cl_lang import setLocalTranslate, getLazyLocalTranslate
from functools import total_ordering
from itertools import ifilter, imap, chain
setLocalTranslate('cl_update3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class GitError(AddonError):
"""Git Error"""
class Layman:
"""
Объект для управления репозиториями Layman
Args:
installed: путь до installed.xml
makeconf: путь до makeconf
"""
def __init__(self, installed, makeconf):
self.installed = installed
self.makeconf = makeconf
def _add_to_installed(self, rname, rurl):
"""
Добавить репозиторий в installed.xml
"""
if path.exists(self.installed) and readFile(self.installed).strip():
tree = ET.parse(self.installed)
root = tree.getroot()
# если репозиторий уже присутсвует в installed.xml
if root.find("repo[name='%s']" % rname) is not None:
return
else:
root = ET.Element("repositories", version="1.0")
tree = ET.ElementTree(root)
newrepo = ET.SubElement(root, "repo", priority="50",
quality="experimental",
status="unofficial")
name = ET.SubElement(newrepo, "name")
name.text = rname
source = ET.SubElement(newrepo, "source", type="git")
source.text = rurl
try:
from layman.utils import indent
indent(root)
except ImportError as e:
pass
with open(self.installed, 'w') as f:
f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
tree.write(f, encoding="utf-8")
def _add_to_makeconf(self, rpath):
"""
Добавить репозиторий в layman/make.conf
"""
def fixContent(match):
repos = match.group(1).strip().split('\n')
if not rpath in repos:
repos.insert(0, rpath)
return 'PORTDIR_OVERLAY="\n%s"' % "\n".join(repos);
if path.exists(self.makeconf):
content = readFile(self.makeconf)
if "PORTDIR_OVERLAY" in content:
new_content = re.sub("\APORTDIR_OVERLAY=\"([^\"]+)\"",
fixContent, content, re.DOTALL)
if new_content == content:
return
else:
content = new_content
else:
content = 'PORTDIR_OVERLAY="\n%s"\n' % rpath + content
else:
content = 'PORTDIR_OVERLAY="\n%s"\n' % rpath
with open(self.makeconf, 'w') as f:
f.write(content)
def add(self, rname, rurl, rpath):
"""
Добавить репозиторий в installed.xml и layman/make.conf
"""
self._add_to_installed(rname, rurl)
self._add_to_makeconf(rpath)
return True
class Git:
"""
Объект для управление git репозиторием
"""
def __init__(self):
self._git = self.__getGit()
def checkExistsRep(self, rpath):
"""
Проверить путь на наличие репозитория
"""
if path.exists(rpath):
if not path.isdir(rpath):
raise GitError(
_("Repository {path} is not directory").format(
path=rpath))
if not path.isdir(self._gitDir(rpath)):
raise GitError(
_("Repository {path} is not git").format(
path=rpath))
return True
return False
def __getGit(self):
"""
Получить утилиту git
"""
git = getProgPath("/usr/bin/git")
if not git:
raise GitError(_("Git utility is not found"))
return git
@staticmethod
def _gitDir(rpath):
return path.join(rpath, ".git")
@staticmethod
def is_git(gitpath):
return path.isdir(Git._gitDir(gitpath))
def cloneRepository(self, url, rpath, branch, cb_progress=None):
"""
Сделать локальную копию репозитория
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
"""
if cb_progress:
gitClone = PercentProgress(self._git, "clone", "-q",
"--no-single-branch", "--progress",
"--verbose",
"--depth=1", "-b", branch, url, rpath,
part=4, stderr=STDOUT)
for perc in gitClone.progress():
cb_progress(perc)
else:
gitClone = process(self._git, "clone", "-q", "--no-single-branch",
"--depth=1", "-b", branch, url, rpath,
stderr=STDOUT)
if gitClone.failed():
error = gitClone.read()
if "Remote branch %s not found" % branch in error:
raise GitError(
_("Branch {branch} not found in {url} repository").format(
branch=branch, url=url))
raise GitError(_("Failed to clone {url} repository").format(
url=url), error)
return True
def cloneRevRepository(self, url, rpath, branch, revision,
cb_progress=None):
"""
Сделать локальную копию репозитория с указанной ревизией
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
revision: если указана - сделать ревизию текущей
Return:
Возвращает True если клонирование произведено с установкой на
указанную ревизию. False если клонирование произведено с
установкой на последнюю ревизию.
Raises:
GitError: Выполнение ключевых команд выполнено с ошибками (не
удалось скачать и получить данные из удаленного репозитория)
"""
git_dir = self._gitDir(rpath)
error = []
def command(cmd, startpart=0):
"""
Выполнить одну из команд необходимой для клонирования репозитория
"""
commands = { # инициализация пустого репозитория
'init': ["init", rpath],
# подключить указанный удаленный как origin
'add_remote': ["remote", "add", "origin", url],
# скачать последние коммиты веток
'fetchshallow': ["fetch", "--depth=1"],
# проверить есть указанный коммит
'has_revision': ["log", "-n1", revision],
# проверить есть указанный коммит
'has_branch': ["log", "-n1",
"remotes/origin/%s" % branch],
# получить ревизию из revs тэгов
'get_rev_tag': ["fetch", "--depth=1", "origin",
"+refs/revs/%s:refs/remotes/origin/%s" %
(revision, branch)],
# переключиться на указанную ревизию указанной веткой
'checkout_revision': ["checkout", "-b", branch,
revision],
# переключить на указанную ветку
'checkout': ["checkout", branch],
# установить upstream для локальной ветки
'set_track': ["branch", branch, '-u',
"origin/%s" % branch]
}
if cmd == "init":
wholeCommand = [self._git] + commands[cmd]
else:
wholeCommand = [self._git, "--git-dir", git_dir,
"--work-tree", rpath] + commands[cmd]
if cb_progress and commands[cmd][0] in ("fetch", "checkout"):
progressParam = {'fetch': {'part': 4, 'end': False},
'checkout': {'part': 4, 'startpart': 3}}
gitClone = PercentProgress(
*wholeCommand + ["--progress", "--verbose"],
stderr=STDOUT, **progressParam)
for perc in gitClone.progress():
cb_progress(perc)
else:
gitCmd = process(*wholeCommand, stderr=STDOUT)
if gitCmd.failed():
error.append(gitCmd.read())
return False
return True
# получить последние коммиты из удаленного репозитория
if command("init") and command("add_remote"):
if command("get_rev_tag") or command("fetchshallow"):
if not command("has_branch"):
raise GitError(
_("Branch {branch} not found in {url} repository"
).format(branch=branch, url=url))
# если среди коммитов есть указанный коммит
if command("has_revision"):
# переключаемся на нужный коммита, устанавливаем связь
if command("checkout_revision") and command("set_track"):
return True
elif command("checkout"):
return False
raise GitError(_("Failed to clone {url} repository").format(
url=url), error[-1])
def pullRepository(self, rpath, quiet_error=False, cb_progress=None):
"""
Обновить репозиторий до последней версии
"""
gitPull = process(self._git, "--git-dir", self._gitDir(rpath),
"pull", "--ff-only", stderr=STDOUT)
if gitPull.failed():
if not quiet_error:
error = gitPull.read()
raise GitError(
_("Failed to update repository in {rpath}").format(
rpath=rpath), error)
return False
return True
def fetchRepository(self, rpath, cb_progress=None):
"""
Получить изменения из удаленно репозитория
"""
if cb_progress:
gitFetch = PercentProgress(self._git, "--git-dir",
self._gitDir(rpath),
"fetch", "--progress", "--verbose",
part=3, stderr=STDOUT)
for perc in gitFetch.progress():
cb_progress(perc)
else:
gitFetch = process(self._git, "--git-dir", self._gitDir(rpath),
"fetch", stderr=STDOUT)
if gitFetch.failed():
error = gitFetch.read()
raise GitError(
_("Failed to update repository in {rpath}").format(
rpath=rpath), error)
return True
def checkChanges(self, rpath):
"""
Проверить наличие изменений пользователем файлов в репозитории
"""
git_dir = self._gitDir(rpath)
git_status = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"status", "--porcelain", stderr=STDOUT)
if git_status.success():
return False
return not any(x.strip() for x in git_status)
else:
raise GitError(
_("Wrong repository in {rpath} directory").format(
rpath=rpath))
def parseStatusInfo(self, data):
"""
Разобрать информацию полученную через git status -b --porcelain
Returns:
Словарь
# есть ли измененные файлы пользователем
{'files':True/False,
# есть коммиты после текущего
'ahead':True/False,
# есть коммиты перед текущим (означает, что обновление
# с основной веткой не осуществляется через fast-forward
'behind':True/False,
# текущая ветка
'branch':'',
# оригинальная ветка
'origin':'origin/master'}
"""
reStatus = re.compile("^## (\w+)(?:\.\.\.(\S+)\s+\[(ahead \d+)?"
"(?:, )?(behind \d+)?\])?\n?(.*|$)", re.S)
match = reStatus.search(data)
if not match:
return {}
return {'files': True if match.group(5) else False,
'ahead': True if match.group(3) else False,
'behind': True if match.group(4) else False,
'origin': match.group(2) or "",
'branch': match.group(1)}
def getCurrentCommit(self, rpath):
"""
Получить текущий коммит в репозитории
"""
git_dir = self._gitDir(rpath)
git_show = process(self._git, "--git-dir", git_dir, "show",
"--format=format:%H",
"--quiet", stderr=STDOUT)
if git_show.success():
return git_show.read().strip()
else:
raise GitError(
_("Failed to get status of repository in "
"{rpath} directory").format(
rpath=rpath))
def getStatusInfo(self, rpath):
"""
Получить информацию об изменениях в репозитории
Returns:
Словарь выдаваемый функцией _parseStatusInfo
"""
git_dir = self._gitDir(rpath)
git_status = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"status", "-b", "--porcelain", stderr=STDOUT)
if git_status.success():
retDict = self.parseStatusInfo(git_status.read())
if not retDict:
raise GitError(
_("Failed to get status of repository in "
"{rpath} directory").format(
rpath=rpath))
return retDict
else:
raise GitError(
_("Wrong repository in {rpath} directory").format(
rpath=rpath))
def resetRepository(self, rpath, to_origin=False, to_rev=None, info=None):
"""
Удалить неиндексированные изменения в репозитории
Args:
to_origin: откатить все изменения до удаленного репозитория
to_rev: откатить все изменения до определенной ревизии
info: использовать уже полученную информация об изменения в репозитории
Return:
True - успешное выполнение
False - нет необходимости выполнять reset
Raises:
GitError: выполнение комманд reset и clean прошло с ошибкой
"""
git_dir = self._gitDir(rpath)
if not info:
info = self.getStatusInfo(rpath)
if (all(not info[x] for x in ("files", "ahead", "behind") if x in info)
and (not info["origin"] or
"origin/%s" % info["branch"] == info["origin"])):
return False
commit = (info['origin'] if to_origin else to_rev) or "HEAD"
git_reset = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"reset", "--hard", commit, stderr=STDOUT)
git_clean = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"clean", "-fd", stderr=STDOUT)
if git_reset.failed() or git_clean.failed():
raise GitError(_("Failed to clean {rpath} repository").format(
rpath=rpath))
return True
def getBranch(self, rpath):
"""
Получить текущую ветку
"""
return self.getStatusInfo(rpath)['branch']
def checkoutBranch(self, rpath, branch):
"""
Сменить ветку
"""
git_dir = self._gitDir(rpath)
git_checkout = process(self._git, "--git-dir", git_dir,
"--work-tree", rpath,
"checkout", "-f", branch, stderr=STDOUT)
if git_checkout.failed():
error = git_checkout.read()
if "pathspec '%s' did not match" % branch in error:
raise GitError(
_("Branch {branch} not found in {rpath} repository").format(
branch=branch, rpath=rpath))
raise GitError(
_("Failed to change branch to {branch} in "
"{rpath} repository").format(branch=branch,
rpath=rpath), error)
return True
@total_ordering
class EmergePackage(Mapping):
"""
Данные о пакете
Item keys: CATEGORY, P, PN, PV, P, PF, PR, PVR
Поддерживает сравнение объекта с другим таким же объектом по версии, либо
со строкой, содержащей версию. Сравнение выполняется по категория/имя, затем
по версии
"""
default_repo = 'gentoo'
prefix = r"(?:.*/var/db/pkg/|=)?"
category = r"(?:(\w+(?:-\w+)?)/)?"
pn = "([^/]*?)"
pv = r"(?:-(\d[^-]*?))?"
pr = r"(?:-(r\d+))?"
tbz = r"(?:.(tbz2))?"
slot = r'(?::(\d+(?:\.\d+)*(?:/\d+(?:\.\d+))?))?'
repo = r'(?:::(\w+))?'
reParse = re.compile(
r'^{prefix}{category}(({pn}{pv}){pr}){slot}{repo}{tbz}$'.format(
prefix=prefix, category=category, pn=pn, pv=pv, pr=pr, tbz=tbz,
slot=slot, repo=repo))
attrs = ('CATEGORY', 'PN', 'PF', 'SLOT', 'REPO', 'P', 'PV', 'PR', 'PVR',
'CATEGORY/PN')
def _parsePackageString(self, s):
"""
Преобразовать строка в части названия пакета
"""
x = self.reParse.search(s)
if x:
CATEGORY, PF, P, PN, PV, PR, SLOT, REPO, TBZ = range(0, 9)
x = x.groups()
d = {'CATEGORY': x[CATEGORY] or "",
'PN': x[PN],
'PV': x[PV] or '0',
'PF': x[PF],
'P': x[P],
'SLOT': x[SLOT] or '0',
'REPO': x[REPO] or self.default_repo,
'CATEGORY/PN': "%s/%s" % (x[CATEGORY], x[PN]),
'PR': x[PR] or 'r0'}
d['PVR'] = "%s-%s" % (d['PV'], d['PR'])
if d['PF'].endswith('-r0'):
d['PF'] = d['PF'][:-3]
return d.copy()
else:
return {k: '' for k in self.attrs}
def __iter__(self):
return iter(self.attrs)
def __len__(self):
if not self['PN']:
return 0
else:
return len(self.attrs)
def __lt__(self, version):
"""
В объектах сравнивается совпадение категории и PF
"""
if "CATEGORY/PN" in version and "PVR" in version:
if self['CATEGORY/PN'] < version['CATEGORY/PN']:
return True
version = version['PVR']
return cmpVersion(self['PVR'], version) == -1
def __eq__(self, version):
if "CATEGORY" in version and "PF" in version:
return ("%s/%s" % (self['CATEGORY'], self['PF']) ==
"%s/%s" % (version['CATEGORY'], version['PF']))
else:
return cmpVersion(self['PVR'], version) == 0
def __init__(self, package):
if isinstance(package, EmergePackage):
self.__result = package.__result
self.__package = package.__package
else:
self.__package = package
self.__result = None
def __getitem__(self, item):
if not self.__result:
self.__result = self._parsePackageString(self.__package)
return self.__result[item]
def __repr__(self):
return "EmergePackage(%s/%s)" % (self['CATEGORY'], self['PF'])
def __str__(self):
return "%s/%s" % (self['CATEGORY'], self['PF'])
@total_ordering
class EmergeUpdateInfo(Mapping):
"""
Информация об обновлении одного пакета
"""
install_info = "\[(binary|ebuild)[^\]]+\]"
atom_info = r"\S+"
use_info = 'USE="[^"]+"'
prev_version = "\[([^\]]+)\]"
update_info = re.compile(
r"^({install_info})\s+({atom_info})\s+(?:{prev_version})?"
r"\s*({use_info})?".format(install_info=install_info,
atom_info=atom_info,
prev_version=prev_version,
use_info=use_info))
attrs = ['binary', 'REPLACING_VERSIONS']
def __init__(self, data):
self._data = data
self._package = None
self._info = {}
def _parseData(self):
r = self.update_info.search(self._data)
if r:
self._info['binary'] = r.group(2) == 'binary'
self._package = EmergePackage(r.group(3))
self._info['REPLACING_VERSIONS'] = r.group(4) or ""
def __iter__(self):
return chain(EmergePackage.attrs, self.attrs)
def __len__(self):
if not self['PN']:
return 0
else:
return len(EmergePackage.attrs) + len(self.attrs)
def __getitem__(self, item):
if not self._info:
self._parseData()
if item in self._info:
return self._info[item]
if self._package:
return self._package[item]
return None
def __lt__(self, version):
if not self._info:
self._parseData()
return self._package < version
def __eq__(self, version):
if not self._info:
self._parseData()
return self._package == version
def __contains__(self, item):
return item in self._package
def __repr__(self):
return "EmergeUpdateInfo(%s/%s,%s)" % (
self['CATEGORY'], self['PF'],
"binary" if self.binary else "ebuild")
def __str__(self):
return "%s/%s" % (self['CATEGORY'], self['PF'])
class EmergeError(Exception):
"""
Ошибка при сборке пакетов
"""
(NEED_ROOT, NO_EBUILD, CONFLICT, CUSTOM) = range(0, 4)
def __init__(self, msg, errno=CUSTOM):
Exception.__init__(self, msg)
self.errno = errno
class Eix:
"""
Вызов eix
package : пакет или список пакетов
*options : параметры eix
all_versions : отобразить все версии пакета или наилучшую
"""
cmd = getProgPath("/usr/bin/eix")
class Option:
Installed = '--installed'
Xml = '--xml'
Upgrade = '--upgrade'
default_options = [Option.Xml]
def __init__(self, package, *options, **kwargs):
if type(package) in (tuple, list):
self.package = list(package)
else:
self.package = [package]
self.options = list(options) + self.package + self.default_options
if not kwargs.get('all_versions', False):
self.__get_versions = self._get_versions
self._get_versions = self._get_best_version
def _get_best_version(self, et):
ret = None
for ver in ifilter(lambda x: x.find('mask') is None,
et.iterfind('version')):
ret = ver.attrib['id']
yield ret
def get_output(self):
"""
Получить вывод eix
"""
with closing(process(self.cmd, *self.options)) as p:
return p.read()
def get_packages(self):
"""
Получить список пакетов
"""
return list(self._parseXml(self.get_output()))
def _get_versions(self, et):
for ver in et.iterfind('version'):
yield ver.attrib['id']
def _get_packages(self, et):
for pkg in et:
for version in self._get_versions(pkg):
yield "%s-%s" % (pkg.attrib['name'], version)
def _get_categories(self, et):
for category in et:
for pkg in self._get_packages(category):
yield "%s/%s" % (category.attrib['name'], pkg)
def _parseXml(self, buffer):
try:
eix_xml = ET.fromstring(buffer)
return self._get_categories(eix_xml)
except ET.ParseError:
return iter(())
class Emerge:
"""
Объект для выполнения сборки пакетов
"""
# параметры по умолчанию
default_params = ["-av", "--color=y", "--nospinner"]
cmd = getProgPath("/usr/bin/emerge")
# шаблон выборки списка пакетов
install_pkgs_pattern = \
"(\\[[^\\]]+\\]\\s*(\\S+).*(?:{nl}|$))".format(nl="\r*\n")
re_install_pkgs = re.compile("^%s+" % install_pkgs_pattern,
re.MULTILINE)
# конфликтный пакет
_conflict_pkg = r"[\w-]+/[\w-]+:\d+"
# конкретная версия конфликтного пакета
_conflict_candidate = r" \(.*"
# пакеты в которым необходим конфликтный пакет
_conflict_req = r" \S.*"
# шаблон конфликтного блока
conflict_block = \
"({nl}{nl}({0}{nl}({1}{nl})+{nl})+)".format(_conflict_candidate,
_conflict_req,
nl="\r*\n")
re_conflict_block = re.compile(
"^{0}{conflict_block}([\w\W]+{conflict_block})*".format(
_conflict_pkg, conflict_block=conflict_block),
re.MULTILINE)
# шаблон устновки консольного цвета
_color_block = "(?:\033\[[^m]+?m)?"
re_custom_error = re.compile(
r"^Calculating dependencies.*{nl}".format(nl="\r*\n") +
"%s*" % install_pkgs_pattern +
r"(?P<result>[\w\W]+)", re.MULTILINE)
# regexp для удаления неиспользуемой информации
re_cut_blocks = \
re.compile(r"(It may be possible to solve.*?"
"solve this conflict automatically\.|"
"For more information, see.*?Gentoo Handbook\.)\s*", re.S)
expect_need_root = "This action requires superuser access"
expect_conflict = "Multiple package[ \w]+pulled"
expect_to_merge = "Would you like to merge"
expect_nothing_merge = ("Nothing to merge|No outdated packages|"
"\s0 packages")
expect_no_ebuild = "no ebuilds to satisfy \""
expect_custom_question = "{c}Yes{c}/{c}No{c}".format(c=_color_block)
expect_emerge_package = ("Emerging (binary )?\({c}(\d+){c} "
"of {c}(\d+){c}\) {c}([^\s\033]+){c}".format(
c=_color_block))
expect_install_package = ("Installing (binary )?\({c}(\d+){c} "
"of {c}(\d+){c}\) {c}([^\s\033]+){c}".format(
c=_color_block))
expect_error_on_install = "ERROR: \S* failed \([^)]+\)"
def __init__(self, packages, extra_params=[], env=None, cwd=None,
command_runner=None,
converter=ConsoleCodes256Converter(XmlOutput())):
self.text_converter = converter
self.packages = packages
self.params = self.default_params + extra_params
self.child = None
self.update_block = None
self.conflict_block = None
self.update_packages = None
self.custom_error = ""
self.env = env
self.cwd = cwd
# событие начало сборки
self.event_emerging = []
# событие начало установки
self.event_installing = []
self.error_log = None
def handle_emerging(self, f):
"""
Добавить обработку начала сборки пакета
"""
if not f in self.event_emerging:
self.event_emerging.append(f)
def handle_installing(self, f):
"""
Добавить обработку начала установки пакета
"""
if not f in self.event_installing:
self.event_installing.append(f)
def _get_text(self, re_result, already_text=False):
"""
Получить результат из регулярки и преобразовать его через self.converter
"""
if re_result:
if not already_text:
re_result = re_result.group()
result = self.re_cut_blocks.sub("", re_result)
return self.text_converter.transform(result.strip())
return ""
def _get_custom_error_output(self, buf):
"""
Получить вывод об прочих ошибках в ходе вычисления зависимостей
"""
r = self.re_custom_error.search(buf)
if r:
return self._get_text(r.groupdict()["result"], already_text=True)
return ""
def _get_package_list_output(self, buf):
"""
Получить из вывода emerge часть текста со списком пакетов
"""
return self._get_text(self.re_install_pkgs.search(buf))
def _get_conflict_list_output(self, buf):
"""
Разобрать вывод о конфликтах среди пакетов
"""
return self._get_text(self.re_conflict_block.search(buf))
def get_update_block(self):
"""
Получить список устанавливаемых пакетов
"""
if self.update_block is None:
self.execute()
events = {v: k for k, v in enumerate(sorted([
self.expect_need_root,
self.expect_conflict,
self.expect_to_merge,
self.expect_nothing_merge,
self.expect_no_ebuild,
self.expect_custom_question,
pexpect.EOF]))}
res = self.child.expect(sorted(events.keys()))
if res == events[self.expect_need_root]:
self.process_need_root()
elif res == events[self.expect_to_merge]:
self.update_block = (
self._get_package_list_output(self.child.before))
elif res == events[self.expect_nothing_merge]:
self.update_block = ""
elif res == events[self.expect_no_ebuild]:
raise EmergeError(_("No ebuilds to satisfy %s") %
self.child.buffer.split('"', 1)[0],
errno=EmergeError.NO_EBUILD)
elif res == events[self.expect_conflict]:
self.update_block = \
self._get_package_list_output(self.child.before)
self.conflict_block = \
self._get_conflict_list_output(self.child.read())
raise EmergeError(_("Multiple package instances within "
"a single package slot"),
errno=EmergeError.CONFLICT)
elif res == events[self.expect_custom_question]:
self.custom_error = self._get_custom_error_output(
self.child.before)
self.child.sendline("no")
raise EmergeError(_("Unexpected question"))
elif res == events[pexpect.EOF]:
if re.search("Total:\s+\d+ package", self.child.before):
self.update_block = (
self._get_package_list_output(self.child.before))
else:
self.custom_error = self._get_custom_error_output(
self.child.before)
raise EmergeError(_("Failed to emerge"))
return self.update_block
def notifyEmergingPackage(self, package, num=1, max_num=2, binary=False):
[f(EmergePackage(package), num=num, max_num=max_num, binary=binary)
for f in self.event_emerging]
def notifyInstallingPackage(self, package):
[f(EmergePackage(package)) for f in self.event_installing]
def install(self):
"""
Install packages
"""
self.execute()
self.child.sendline("yes")
pkg_name = ""
events = {v: k for k, v in enumerate(sorted([
self.expect_need_root,
self.expect_emerge_package,
self.expect_install_package,
self.expect_error_on_install,
pexpect.EOF]))}
while True:
res = self.child.expect(sorted(events.keys()))
if res == events[self.expect_need_root]:
self.process_need_root()
elif res in (events[self.expect_install_package],
events[self.expect_emerge_package]):
pkg_name = self.child.match.group(4).strip()
if res == events[self.expect_emerge_package]:
binary_pkg = bool(self.child.match.group(1))
current_pkg_num = int(self.child.match.group(2))
max_pkg_num = int(self.child.match.group(3))
self.notifyEmergingPackage(pkg_name, num=current_pkg_num,
max_num=max_pkg_num,
binary=binary_pkg)
else:
self.notifyInstallingPackage(pkg_name)
elif res == events[self.expect_error_on_install]:
if self.child.expect(
["The complete build log is located at '[^']+",
pexpect.EOF]) == 0:
self.error_log = self.child.after.rpartition("'")[-1]
raise EmergeError(_("Emerge %s is failed") % pkg_name)
else:
if self.child.isalive():
self.child.wait()
return self.child.exitstatus == 0
def execute(self):
# TODO: убрать лог в /tmp
if self.child is None:
self.child = pexpect.spawn(self.cmd, self.params + self.packages,
logfile=open('/tmp/emerge.log', 'w'),
env=self.env, cwd=self.cwd, timeout=None)
return self.child
def close(self):
if self.child is not None:
self.child.close()
self.child = None
def get_error_log(self):
"""
Получить путь до журнала сборки
"""
return self.error_log
def process_need_root(self):
raise EmergeError(
_("This action requires superuser access"),
errno=EmergeError.NEED_ROOT)
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def get_update_list(self):
"""
Проверить есть ли в обновлениях пакет совпадающий с pkg_pattern
полученный пакет можно проверить указанной check функцией
"""
if self.update_packages is None:
list_block = XmlConverter().transform(self.get_update_block())
self.update_packages = self._parse_update_list(list_block)
return PackageList(self.update_packages)
def _parse_update_list(self, list_block):
return filter(lambda x: x['PN'] is not None,
map(EmergeUpdateInfo,
list_block.split('\n')))
class RevdepRebuild(Emerge):
def __init__(self):
pass
def execute(self):
pass
class EmergeLogTask(object):
def has_marker(self, line):
"""
Определить есть ли в строке маркер задачи
"""
return False
def get_begin_marker(self):
"""
Получить маркер начала задачи
"""
return ""
def get_end_marker(self):
"""
Получить маркер завершения задачи
"""
return ""
class EmergeLogNamedTask(EmergeLogTask):
date_format = "%b %d, %Y %T"
def __init__(self, taskname):
self.taskname = taskname
def has_marker(self, line):
"""
Определить есть ли в строке маркер задачи
"""
return self.get_end_marker() in line
def get_begin_marker(self):
"""
Получить маркер начала задачи
"""
return "Started {taskname} on: {date}".format(
taskname=self.taskname,
date=datetime.datetime.now().strftime(self.date_format))
def get_end_marker(self):
"""
Получить маркер завершения задачи
"""
return "*** Finished %s" % self.taskname
class EmergeLog:
"""
EmergeLog(after).get_update(package_pattern)
"""
emerge_log = "/var/log/emerge.log"
re_complete_emerge = re.compile(r":::\scompleted emerge \(.*?\) (\S+)",
re.M)
def __init__(self, emerge_task=EmergeLogTask()):
"""
@type emerge_task: EmergeLogTask
"""
self.emerge_task = emerge_task
def _get_last_changes(self):
"""
Получить список измений по логу, от последней записи маркера
"""
log_data = SavableIterator(readLinesFile(self.emerge_log))
for line in log_data.save():
if self.emerge_task.has_marker(line):
log_data.save()
return log_data.restore()
def get_packages(self):
"""
Получить список пакетов
"""
return list(self._parse_log(self._get_last_changes()))
def _parse_log(self, data):
for re_match in ifilter(None,
imap(self.re_complete_emerge.search, data)):
yield re_match.group(1)
def _set_marker(self, text_marker):
with open(self.emerge_log, 'a') as f:
f.write("{0:.0f}: {1}\n".format(time.time(), text_marker))
def mark_begin_task(self):
"""
Отметить в emerge.log начало выполнения задачи
"""
marker = self.emerge_task.get_begin_marker()
if marker:
self._set_marker(marker)
def mark_end_task(self):
"""
Отметить в emerge.log завершение выполнения задачи
"""
marker = self.emerge_task.get_end_marker()
if marker:
self._set_marker(marker)
class PackageList(object):
"""
Список пакетов с возможностью среза и сравнением с версией
"""
def __init__(self, packages):
self._raw_list = packages
def _packages(self):
return ifilter(lambda x: x['PN'] is not None,
imap(lambda x: x if isinstance(x, Mapping) else EmergePackage(x),
self._raw_list))
def __getitem__(self, item):
re_item = re.compile(item)
return PackageList([pkg for pkg in self._packages() if
re_item.search(pkg['CATEGORY/PN'])])
def __iter__(self):
return iter(self._packages())
def __len__(self):
return len(self._raw_list)
def __lt__(self, other):
return any(pkg < other for pkg in self._packages())
def __le__(self, other):
return any(pkg <= other for pkg in self._packages())
def __gt__(self, other):
return any(pkg > other for pkg in self._packages())
def __ge__(self, other):
return any(pkg >= other for pkg in self._packages())
def __eq__(self, other):
for pkg in self._packages():
if pkg == other:
return True
else:
return False
return any(pkg == other for pkg in self._packages())
def __ne__(self, other):
return any(pkg != other for pkg in self._packages())