You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-update/update/update.py

548 lines
22 KiB

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import path
from calculate.lib.utils.tools import AddonError
from calculate.lib.utils.colortext.palette import TextState
from calculate.lib.utils.colortext import get_color_print
from calculate.update.emerge_parser import RevdepPercentBlock
import pexpect
from package_tools import Git, Layman,\
EmergeLogNamedTask, EmergeLog, GitError, \
PackageInformation, PackageList
Colors = TextState.Colors
from calculate.lib.utils.files import (getProgPath, STDOUT, removeDir,
PercentProgress, process)
from calculate.lib.cl_lang import (setLocalTranslate, getLazyLocalTranslate,
RegexpLocalization, _)
import emerge_parser
from emerge_parser import EmergeParser, EmergeCommand, EmergeError, EmergeCache
setLocalTranslate('cl_update3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class UpdateError(AddonError):
"""Update Error"""
class Update:
"""Основной объект для выполнения действий связанных с обновлением системы
"""
def init(self):
commandLog = path.join(self.clVars.Get('core.cl_log_path'),
'lastcommand.log')
emerge_parser.CommandExecutor.logfile = commandLog
self.color_print = get_color_print()
self.emerge_cache = EmergeCache()
self.emerge_cache.check_list = (
self.emerge_cache.check_list +
map(emerge_parser.GitCheckvalue,
self.clVars.Get('update.cl_update_rep_path')))
def _syncRepository(self, name, url, rpath, revision, branch,
cb_progress=None):
"""
Синхронизировать репозитори
"""
dv = self.clVars
git = Git()
needMeta = False
if not git.checkExistsRep(rpath):
if revision == "last":
git.cloneRepository(url, rpath, branch,
cb_progress=cb_progress)
else:
git.cloneRevRepository(url, rpath, branch, revision,
cb_progress=cb_progress)
needMeta = True
else:
# если нужно обновиться до конкретной ревизии
if revision != "last":
if revision == git.getCurrentCommit(rpath):
if git.getBranch(rpath) == branch:
return False
# получить изменения из удаленного репозитория
git.fetchRepository(rpath, cb_progress=cb_progress)
# если текущая ветка не соответствует нужной
repInfo = git.getStatusInfo(rpath)
if repInfo['branch'] != branch:
# меняем ветку
needMeta = True
git.checkoutBranch(rpath, branch)
if revision == "last":
if git.resetRepository(rpath, to_origin=True):
needMeta = True
else:
git.resetRepository(rpath, to_rev=revision)
needMeta = True
if needMeta:
dv.Set('cl_update_outdate_set', 'on', force=True)
layman = Layman(dv.Get('cl_update_layman_installed'),
dv.Get('cl_update_layman_make'))
if name != "portage":
layman.add(name, url, rpath)
return True
def syncRepositories(self, repname, clean_on_error=True):
"""
Синхронизировать репозитории
"""
dv = self.clVars
url, rpath, revision, branch = (
dv.Select(["cl_update_rep_url", "cl_update_rep_path",
"cl_update_rep_rev", "cl_update_branch_name"],
where="cl_update_rep_name", eq=repname, limit=1))
if not url or not rpath:
raise UpdateError(_("Repositories variables is not configured"))
self.addProgress()
if clean_on_error:
try:
if not self._syncRepository(repname, url, rpath, revision, branch,
cb_progress=self.setProgress):
return "skip"
return True
except GitError as e:
if e.addon:
self.printWARNING(str(e.addon))
self.printWARNING(str(e))
self.printWARNING(_("Re-fetch {name} repository"
).format(name=repname))
try:
removeDir(rpath)
except OSError as e:
raise UpdateError(_("Permission denied to change "
"{repname} repository").format(
repname=repname))
self._syncRepository(repname, url, rpath, revision, branch)
return True
def syncLaymanRepository(self, repname):
"""
Обновить репозиторий через layman
"""
layman = getProgPath('/usr/bin/layman')
if not layman:
raise UpdateError(_("Layman utility is not found"))
rpath = self.clVars.Select('cl_update_other_rep_path',
where='cl_update_other_rep_name', eq=repname,
limit=1)
laymanname = path.basename(rpath)
if Git.is_git(rpath):
self.addProgress()
p = PercentProgress(layman, "-s", laymanname, part=1, atty=True)
for perc in p.progress():
self.setProgress(perc)
else:
p = process(layman, "-s", repname, stderr=STDOUT)
if p.failed():
raise UpdateError(
_("Failed to update repository {rname}").format(rname=repname),
addon=p.read())
return True
def regenCache(self, repname):
"""
Обновить кэш метаданных репозитория
"""
egenCache = getProgPath('/usr/bin/egencache')
if not egenCache:
raise UpdateError(_("Portage utility is not found"))
cpu_num = self.clVars.Get('hr_cpu_num')
p = process(egenCache, "--repo=%s" % repname, "--update",
"--jobs=%s" % cpu_num, stderr=STDOUT)
if p.failed():
raise UpdateError(_("Failed to update cache of {rname} "
"repository").format(rname=repname),
addon=p.read())
return True
def emergeMetadata(self):
"""
Выполнить egencache и emerge --metadata
"""
emerge = getProgPath("/usr/bin/emerge")
if not emerge:
raise UpdateError(_("Emerge utility is not found"))
self.addProgress()
p = PercentProgress(emerge, "--metadata", part=1, atty=True)
for perc in p.progress():
self.setProgress(perc)
if p.failed():
raise UpdateError(_("Failed to update metadata"), addon=p.read())
return True
def eixUpdate(self):
"""
Выполенине eix-update для репозиторием
eix-update выполнятется только для тех репозиториев, которые
обновлялись, если cl_update_eixsync_force==auto, либо
все, если cl_update_eixupdate_force==force
"""
eixupdate = getProgPath("/usr/bin/eix-update")
if not eixupdate:
raise UpdateError(_("Eix utility is not found"))
self.addProgress()
excludeList = []
if self.clVars.Get('cl_update_eixupdate_force') == 'force':
countRep = len(self.clVars.Get('cl_update_rep_name'))
else:
for rep in self.clVars.Get('cl_update_rep_name'):
# подстановка имен
mapNames = {'portage': 'gentoo'}
if not rep in self.clVars.Get('cl_update_sync_rep'):
excludeList.extend(["-x", mapNames.get(rep, rep)])
countRep = len(self.clVars.Get('cl_update_sync_rep'))
if (self.clVars.Get('cl_update_other_set') == 'on' or
self.clVars.Get('cl_update_eixupdate_force') == 'force'):
countRep += len(self.clVars.Get('update.cl_update_other_rep_name'))
else:
for rep in self.clVars.Get('update.cl_update_other_rep_name'):
excludeList.extend(['-x', rep])
p = PercentProgress(eixupdate, "-F", *excludeList, part=countRep or 1,
atty=True)
for perc in p.progress():
self.setProgress(perc)
if p.failed():
raise UpdateError(_("Failed to update eix cache"), addon=p.read())
return True
def is_binary_pkg(self, pkg, binary=None):
"""
Является ли пакет бинарным
"""
if binary:
return True
if 'PN' in pkg and pkg['PN'].endswith('-bin'):
return True
if binary is not None:
return binary
if "binary" in pkg and pkg['binary']:
return True
return False
def _printEmergePackage(self, pkg, binary=False, num=1, max_num=1):
"""
Вывод сообщения сборки пакета
"""
self.endTask()
_print = self.color_print
if max_num > 1:
one = _print.foreground(Colors.YELLOW).bold("{0}", num)
two = _print.foreground(Colors.YELLOW).bold("{0}", max_num)
part = " (%s of %s)" % (one, two)
else:
part = ""
if self.is_binary_pkg(pkg,binary):
_print = _print.foreground(Colors.PURPLE)
else:
_print = _print.foreground(Colors.GREEN)
self.startTask(_("Emerging%s %s") % (part, _print(str(pkg))))
def _printInstallPackage(self, pkg, binary=False):
"""
Вывод сообщения установки пакета
"""
self.endTask()
_print = self.color_print
if self.is_binary_pkg(pkg,binary):
_print = _print.foreground(Colors.PURPLE)
else:
_print = _print.foreground(Colors.GREEN)
self.startTask(_("Installing %s") %
_print(str(pkg)))
def _printUninstallPackage(self, pkg, num=1, max_num=1):
"""
Вывод сообщения удаления пакета
"""
self.endTask()
_print = self.color_print
if max_num > 1:
one = _print.foreground(Colors.YELLOW).bold("{0}", num)
two = _print.foreground(Colors.YELLOW).bold("{0}", max_num)
part = " (%s of %s)" % (one, two)
else:
part = ""
_print = _print.foreground(Colors.RED)
self.startTask(_("Unmerging%s %s") % (part, _print.bold(str(pkg))))
def emergelike(self, cmd, *params):
"""
Запуск команды, которая подразумевает выполнение emerge
"""
cmd_path = getProgPath(cmd)
if not cmd_path:
raise UpdateError(_("Failed to find %s command") % cmd)
with EmergeParser(
emerge_parser.CommandExecutor(cmd_path, params)) as emerge:
self._startEmerging(emerge)
return True
def revdep_rebuild(self, cmd, *params):
"""
Запуск revdep-rebulid
"""
cmd_path = getProgPath(cmd)
if not cmd_path:
raise UpdateError(_("Failed to find %s command") % cmd)
with EmergeParser(
emerge_parser.CommandExecutor(cmd_path, params)) as emerge:
revdep = RevdepPercentBlock(emerge)
self.addProgress()
revdep.add_observer(self.setProgress)
revdep.action = lambda x: (
self.endTask(), self.startTask(_("Assigning files to packages"))
if "Assign" in revdep else None)
self._startEmerging(emerge)
return True
def _display_pretty_package_list(self, pkglist, remove_list=False):
"""
Отобразить список пакетов в "удобночитаемом" виде
"""
_print = self.color_print
ebuild_color = TextState.Colors.GREEN
binary_color = TextState.Colors.PURPLE
remove_color = TextState.Colors.LIGHT_RED
for pkg in sorted([PackageInformation.add_info(x) for x in
pkglist],
key=lambda y: y['CATEGORY/PN']):
if remove_list:
pkgcolor = _print.foreground(remove_color)
else:
if self.is_binary_pkg(pkg):
pkgcolor = _print.foreground(binary_color)
else:
pkgcolor = _print.foreground(ebuild_color)
fullname = _(pkg.info['DESCRIPTION']).capitalize()
shortname = pkgcolor("%s-%s" % (pkg["CATEGORY/PN"], pkg["PVR"]))
if "SIZE" in pkg and pkg['SIZE'] and pkg["SIZE"] != "0 kB":
size = " (%s)" % pkg["SIZE"]
else:
size = ""
mult = _print.bold("*")
10 years ago
self.printDefault(" {mult} {fullname} {shortname}{size}".format(
mult=mult, fullname=fullname, shortname=shortname, size=size))
def _display_install_package(self, emerge):
"""
Отобразить список устанавливаемых пакетов
"""
# подробный список пакетов
_print = self.color_print
highlight = TextState.Colors.WHITE
if self.clVars.Get('cl_verbose_set') == 'on':
self.printPre(str(emerge.install_packages))
else:
pkglist = emerge.install_packages.list
self.printSUCCESS(_print(
_("List packages for installation")))
self._display_pretty_package_list(pkglist)
if emerge.install_packages.remove_list:
self.printSUCCESS(_print(
_("List removal packages")))
self._display_pretty_package_list(
emerge.install_packages.remove_list, remove_list=True)
if str(emerge.download_size) != "0 kB":
self.printSUCCESS(_("{size} will be downloaded").format(
size=_print.foreground(highlight)(
str(emerge.download_size))))
def _display_remove_list(self, emerge):
"""
Отобразить список удаляемых пакетов
"""
# подробный список пакетов
if self.clVars.Get('cl_verbose_set') == 'on':
self.printPre(str(emerge.uninstall_packages))
else:
_print = self.color_print
pkglist = emerge.uninstall_packages.list
self.printSUCCESS(_print.bold(
_("List removal packages")))
self._display_pretty_package_list(pkglist, remove_list=True)
def getCacheOnWorld(self, params, packages, check=False):
"""
Получить список обновляемых пакетов @world из кэша
"""
if "@world" in packages:
from calculate.update.utils.cl_update import ClUpdateAction
elog = EmergeLog(
EmergeLogNamedTask(ClUpdateAction.log_names['premerge']))
if check and (elog.list or elog.remove_list):
self.emerge_cache.drop_cache()
return params, packages
installed_pkgs = elog.list
new_packages = self.emerge_cache.get_cached_package_list()
if new_packages is not None:
return "-1O", ["=%s" % x for x in new_packages
if not str(x) in installed_pkgs]
return params, packages
def updateCache(self, pkg_list):
"""
Обновить кэш. Оставить отметку в emerge.log о том, выполнено действие
premerge
"""
self.emerge_cache.set_cache(pkg_list)
from calculate.update.utils.cl_update import ClUpdateAction
elog = EmergeLog(
EmergeLogNamedTask(ClUpdateAction.log_names['premerge']))
elog.mark_end_task(),
def premerge(self, param, *packages):
"""
Вывести информацию об обновлении
"""
param, packages = self.getCacheOnWorld(param, packages, check=True)
param = [param, "-pv"]
if not packages:
self.printSUCCESS(_("Installed packages are up to date"))
return True
with EmergeParser(EmergeCommand(list(packages),
extra_params=param)) as emerge:
try:
emerge.run()
if "@world" in packages:
if emerge.install_packages.remove_list:
self.emerge_cache.drop_cache()
else:
self.updateCache(emerge.install_packages.list)
if not emerge.install_packages.list:
self.printSUCCESS(_("The system is up to date"))
return True
self._display_install_package(emerge)
except EmergeError:
self.emerge_cache.drop_cache()
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
if self.clVars.Get('cl_update_pretend_set') == 'on':
return True
answer = self.askConfirm(
_("Would you like to merge these packages?"), "yes")
if answer == "no":
raise KeyboardInterrupt
return "yes"
return True
def _emerge_translate(self, s):
"""
Перевести текст из emerge
"""
return RegexpLocalization('cl_emerge').translate(str(s))
def setUpToDateCache(self):
"""
Установить кэш - "нет пакетов для обновления"
"""
self.updateCache(PackageList([]))
return True
def _startEmerging(self, emerge):
"""
Настроить и выполнить emerge
"""
emerge.command.send("yes\n")
emerge.emerging.add_observer(self._printEmergePackage)
emerge.installing.add_observer(self._printInstallPackage)
emerge.uninstalling.add_observer(self._printUninstallPackage)
try:
emerge.run()
except EmergeError:
self.emerge_cache.drop_cache()
if emerge.emerging_error:
self.printPre(
self._emerge_translate(emerge.emerging_error.log))
else:
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
def emerge(self, param, *packages):
"""
Выполнить сборку пакета
"""
if not packages:
packages = [param]
extra_params = None
else:
param, packages = self.getCacheOnWorld(param, packages)
if not packages:
return True
extra_params = [param]
with EmergeParser(EmergeCommand(list(packages),
extra_params=extra_params)) as emerge:
try:
emerge.question.action = lambda x: False
emerge.run()
if not emerge.install_packages.list:
return True
except EmergeError:
self.emerge_cache.drop_cache()
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
self._startEmerging(emerge)
return True
def depclean(self):
"""
Выполнить очистку системы от лишних пакетов
"""
with EmergeParser(EmergeCommand(["--depclean"])) as emerge:
try:
emerge.question.action = lambda x: False
emerge.run()
if not emerge.uninstall_packages.list:
return True
self._display_remove_list(emerge)
except EmergeError:
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
if (self.askConfirm(
_("Would you like to unmerge these packages?")) == 'no'):
return False
self._startEmerging(emerge)
return True
def update_task(self, task_name):
"""
Декоратор для добавления меток запуска и останова задачи
"""
def decor(f):
def wrapper(*args, **kwargs):
logger = EmergeLog(EmergeLogNamedTask(task_name))
logger.mark_begin_task()
ret = f(*args, **kwargs)
if ret:
logger.mark_end_task()
return ret
return wrapper
return decor