You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

604 lines
19 KiB

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from os import path
import re
import sys
from calculate.lib.utils.colortext.palette import TextState
from import ignore
from package_tools import EmergePackage, PackageList, EmergeUpdateInfo, \
EmergeRemoveInfo, Git, GitError
Colors = TextState.Colors
import pexpect
from calculate.lib.utils.files import getProgPath, readLinesFile, listDirectory, \
writeFile, readFile
from calculate.lib.utils.colortext.output import XmlOutput
from calculate.lib.utils.colortext.converter import (ConsoleCodes256Converter,
from calculate.lib.cl_lang import setLocalTranslate, getLazyLocalTranslate, _
setLocalTranslate('cl_update3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class EmergeError(Exception):
Ошибка при сборке пакетов
class EmergeNeedRootError(EmergeError):
class CommandExecutor(object):
Запуск программы для объекта Emerge
logfile = '/var/log/calculate/lastcommand.log'
def __init__(self, cmd, params, env=None, cwd=None, logfile=None):
self.cwd = cwd
self.env = env
self.cmd = cmd
self.params = params
self.child = None
if logfile:
self.logfile = logfile
def execute(self):
if self.child is None:
self.child = pexpect.spawn(self.cmd,
logfile=open(self.logfile, 'w'),
env=self.env, cwd=self.cwd, timeout=None)
return self.child
def close(self):
if self.child is not None:
self.child = None
def success(self):
if self.child:
if self.child.isalive():
return self.child.exitstatus == 0
return False
def failed(self):
return not self.success()
def send(self, s):
if self.child:
class EmergeCommand(CommandExecutor):
Запуск emerge для последующего анализирования
# параметры по умолчанию
default_params = ["-av", "--color=y", "--nospinner"]
cmd = getProgPath("/usr/bin/emerge")
def __init__(self, packages, extra_params=None, env=None, cwd=None,
extra_params = extra_params or []
self.child = None
self.packages = packages
self.params = self.default_params + extra_params
default_env = {'CLEAN_DELAY': '0'}
self.env = env or default_env
self.cwd = cwd
if logfile:
self.logfile = logfile
def execute(self):
if self.child is None:
self.child = pexpect.spawn(self.cmd, self.params + self.packages,
logfile=open(self.logfile, 'w'),
env=self.env, cwd=self.cwd, timeout=None)
return self.child
class EmergeInformationBlock(object):
_color_block = "(?:\033\[[^m]+?m)?"
_new_line = "\r*\n"
token = None
end_token = ["\n"]
re_block = None
action = None
re_match_type = type(re.match("", ""))
re_type = type(re.compile(""))
def __init__(self, parent):
:type parent: EmergeParser
self.result = None
self.text_converter = parent.text_converter
self.parent = parent
self.children = []
def add_element(self, element):
def __str__(self):
if type(self.result) == self.re_match_type:
return self.result or ""
def __nonzero__(self):
return bool(self.result)
def __len__(self):
if self.result is None:
return 0
return len(self.result)
def __contains__(self, item):
if self.result is None:
return 0
return item in self.result
def _get_text(self, result):
Получить результат из регулярки и преобразовать его через self.converter
if result:
return self.text_converter.transform(result.rstrip())
return ""
def get_block(self, child):
token = child.match
if type(self.end_token) == self.re_type:
match =
match = child.match
token + child.before + match))
except pexpect.EOF:
child.buffer = "".join(
[x for x in (child.before, child.after, child.buffer)
if type(x) == str])
def get_data(self, match):
self.result = self._get_text(
class InstallPackagesBlock(EmergeInformationBlock):
Блок emerge содержащий список пакетов для установки
list = PackageList([])
remove_list = PackageList([])
_new_line = EmergeInformationBlock._new_line
token = "\n["
end_token = ["\r\n\r", "\n\n"]
re_block = re.compile(r"((?:^\[.*?{nl})+)".format(nl=_new_line),
def get_data(self, match):
super(InstallPackagesBlock, self).get_data(match)
list_block = XmlConverter().transform(self.result).split('\n')
self.list = PackageList(map(EmergeUpdateInfo, list_block))
self.remove_list = PackageList(map(EmergeRemoveInfo, list_block))
class UninstallPackagesBlock(EmergeInformationBlock):
Блок emerge содержащий список удаляемых пакетов
list = PackageList([])
_new_line = EmergeInformationBlock._new_line
_color_block = EmergeInformationBlock._color_block
token = ["These are the packages that would be unmerged",
"Calculating removal order"]
end_token = re.compile("All selected packages:.*\n")
re_block = re.compile(r"All selected packages: (.*?){nl}".
format(nl=_new_line, c=_color_block), re.DOTALL)
def get_data(self, match):
super(UninstallPackagesBlock, self).get_data(match)
list_block = XmlConverter().transform(self.result).split()
self.list = PackageList(map(EmergePackage, list_block))
class FinishEmergeGroup(EmergeInformationBlock):
Блок завершения команды
token = pexpect.EOF
# регуляреное выражение, определяющее содержит ли блок
# сообщения об ошибках
re_failed = re.compile(
r"Fetch instructions for \S+:|"
r"The following.*are necessary to proceed|"
r"!!! Multiple package .* slot have been pulled|"
r"no ebuilds to satisfy|"
r"Dependencies could not be completely resolved due to",
def get_block(self, child):
if child.isalive():
if child.exitstatus != 0 or
self.result = True
def children_get_block(self, child):
for block in self.children:
def children_action(self, child):
for block in (x for x in self.children if x.result and x.action):
if block.action(child) is False:
def action(self, child):
return False
class PrepareErrorBlock(EmergeInformationBlock):
Блок информации с ошибками при получении списка устанавливаемых пакетов
token = None
re_drop = re.compile("news items need reading|"
"Use eselect news|"
"Calculating dependencies|"
"to read news items|"
"Local copy of remote index is up-to-date|"
"These are the packages that would be merged|"
"Process finished with exit code")
re_multi_empty_line = re.compile("(?:<br/>){3,}", re.DOTALL)
re_strip_br = re.compile("^(?:<br/>)+|(?:<br/>)+$", re.DOTALL)
def remove_needless_data(self, data):
return "\n".join([x for x in data.split('\n')
if not])
def strip_br(self, data):
return self.re_strip_br.sub(
self.re_multi_empty_line.sub("<br/><br/>", data))
def get_block(self, child):
self.result = self.strip_br(
def action(self, child):
raise EmergeError(_("Emerge failed"))
class DownloadSizeBlock(EmergeInformationBlock):
Размер скачиваемых обновлений
token = "Size of downloads:"
re_block = re.compile(r"Size of downloads:\s(\S+\s\S+)")
def __str__(self):
if self.result:
return self.result
return "0 kB"
class QuestionBlock(EmergeInformationBlock):
Блок вопроса
default_answer = "no"
_color_block = EmergeInformationBlock._color_block
token = "Would you like"
end_token = ["]", "\n"]
re_block = re.compile(
"(Would you.*)\[{c}Yes{c}/{c}No{c}".format(c=_color_block))
def action(self, child):
if self.result:
child.send("%s\n" % self.default_answer)
class NeedRootBlock(EmergeInformationBlock):
Пользователь не явеляется root
token = "This action requires superuser access"
def get_data(self, child):
self.result = True
def action(self, child):
raise EmergeNeedRootError(_("This action requires superuser access"))
class NotifierInformationBlock(EmergeInformationBlock):
Информационный блок поддерживающий observing
def __init__(self, parent):
super(NotifierInformationBlock, self).__init__(parent)
self.observers = []
def get_data(self, match):
self.result = match
def add_observer(self, f):
def notify(self, observer, groups):
def action(self, child):
if self.result and self.observers:
groups = self.result.groups()
for observer in self.observers:
self.notify(observer, groups)
class EmergingPackage(NotifierInformationBlock):
Запуск устанавливаемого пакета
ObserverFunc: (package, num=number, max_num=number, binary=binary)
_color_block = EmergeInformationBlock._color_block
token = ">>> Emerging "
re_block = re.compile(
"Emerging (binary )?\({c}(\d+){c} "
"of {c}(\d+){c}\) {c}([^\s\033]+){c}".format(c=_color_block))
def notify(self, observer, groups):
observer(EmergePackage(groups[3]), num=groups[1], max_num=groups[2],
class UnemergingPackage(NotifierInformationBlock):
Запуск устанавливаемого пакета
ObserverFunc: (package, num=number, max_num=number)
_color_block = EmergeInformationBlock._color_block
token = ">>> Unmerging"
re_block = re.compile(
r"Unmerging (?:\({c}(\d+){c} "
r"of {c}(\d+){c}\) )?(\S+)\.\.\.".format(c=_color_block))
def notify(self, observer, groups):
observer(EmergePackage(groups[2]), num=groups[0], max_num=groups[1])
class InstallingPackage(NotifierInformationBlock):
Запуск устанавливаемого пакета
ObserverFunc: (package, binary=binary)
_color_block = EmergeInformationBlock._color_block
binary = None
token = ">>> Installing "
re_block = re.compile(
"Installing \({c}(\d+){c} "
"of {c}(\d+){c}\) {c}([^\s\033]+){c}".format(c=_color_block))
def notify(self, observer, groups):
binary = bool(self.binary and groups[2] in self.binary)
observer(EmergePackage(groups[2]), binary=binary)
def mark_binary(self, package):
if self.binary is None:
self.binary = []
class EmergeingErrorBlock(EmergeInformationBlock):
Блок содержит информацию об ошибке во время сборки пакета
token = ["* ERROR: ", " * \033[39;49;00mERROR: "]
end_token = "Working directory:"
re_block = re.compile("ERROR: (\S*) failed \([^)]+\).*?"
"The complete build log is located at '([^']+)",
package = ""
def get_data(self, match):
self.result = self._get_text(
self.package =
def log(self):
return self.text_converter.transform(readFile(self.result))
def action(self, child):
raise EmergeError(_("Emerge %s is failed") % self.package)
class EmergeParser(object):
Парсер вывода emerge
def __init__(self, command, run=False):
self.text_converter = ConsoleCodes256Converter(XmlOutput())
self.command = command
self.elements = {}
self.install_packages = InstallPackagesBlock(self)
self.uninstall_packages = UninstallPackagesBlock(self)
self.question = QuestionBlock(self)
self.finish_block = FinishEmergeGroup(self)
self.need_root = NeedRootBlock(self)
self.prepare_error = PrepareErrorBlock(self.finish_block)
self.download_size = DownloadSizeBlock(self)
self.emerging_error = EmergeingErrorBlock(self)
self.installing = InstallingPackage(self)
self.uninstalling = UnemergingPackage(self)
self.emerging = EmergingPackage(self)
if run:
def mark_binary(self, package, binary=False, **kw):
if binary:
def add_element(self, element):
if element.token:
if type(element.token) == list:
for token in element.token:
self.elements[token] = element
self.elements[element.token] = element
def run(self):
Запустить команду
child = self.command.execute()
while True:
index = child.expect_exact(self.elements.keys())
element = self.elements.values()[index]
if element.action:
if element.action(child) is False:
def close(self):
def __enter__(self):
return self
def __exit__(self, *exc_info):
class MtimeCheckvalue(object):
def __init__(self, *fname):
self.fname = fname
def value_func(self, fn):
return str(int(os.stat(fn).st_mtime))
def get_check_values(self, file_list):
for fn in file_list:
if path.exists(fn) and not path.isdir(fn):
yield fn, self.value_func(fn)
for k, v in self.get_check_values(
listDirectory(fn, fullPath=True)):
yield k, v
def checkvalues(self):
return self.get_check_values(self.fname)
class GitCheckvalue(object):
def __init__(self, rpath):
self.rpath = rpath
self.git = Git()
def checkvalues(self):
with ignore(GitError):
if self.git.is_git(self.rpath):
yield self.rpath, Git().getCurrentCommit(self.rpath)
class EmergeCache(object):
Кэш пакетов
cache_file = '/var/lib/calculate/calculate-update/world.cache'
# список файлов проверяемый по mtime на изменения
check_list = [MtimeCheckvalue('/etc/make.conf',
def __init__(self):
self.files_control_values = {}
self.pkg_list = PackageList([])
def set_cache(self, package_list):
Установить в кэш список пакетов
with writeFile(self.cache_file) as f:
for fn, val in self.get_control_values().items():
f.write("{fn}={val}\n".format(fn=fn, val=val))
for pkg in package_list:
f.write("%s\n"% str(pkg))
def drop_cache(self):
if path.exists(self.cache_file):
with ignore(OSError):
def get_cached_package_list(self):
if self.check_actuality():
return self.pkg_list
return None
def check_actuality(self):
Кэш считается актуальным если ни один из файлов не менялся
return self.get_control_values() == self.files_control_values
def read_cache(self):
self.files_control_values = {}
cache_file_lines = readLinesFile(self.cache_file)
for line in cache_file_lines:
if not "=" in line:
k, v = line.split('=')
self.files_control_values[k] = v.strip()
self.pkg_list = PackageList(cache_file_lines)
def get_control_values(self):
def generate():
for obj in self.check_list:
for checkvalue in obj.checkvalues():
yield checkvalue
return dict(generate())