You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-update/update/update.py

517 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import path
from calculate.lib.utils.tools import AddonError
from calculate.lib.utils.colortext.palette import TextState
from calculate.lib.utils.colortext import get_color_print
import pexpect
from package_tools import Git, Layman,\
EmergeLogNamedTask, EmergeLog, GitError, \
PackageInformation
Colors = TextState.Colors
from calculate.lib.utils.files import (getProgPath, STDOUT, removeDir,
PercentProgress, process)
from calculate.lib.cl_lang import (setLocalTranslate, getLazyLocalTranslate,
RegexpLocalization, _)
import emerge_parser
from emerge_parser import EmergeParser, EmergeCommand, EmergeError, EmergeCache
setLocalTranslate('cl_update3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class UpdateError(AddonError):
"""Update Error"""
class Update:
"""Основной объект для выполнения действий связанных с обновлением системы
"""
def init(self):
commandLog = path.join(self.clVars.Get('core.cl_log_path'),
'lastcommand.log')
emerge_parser.CommandExecutor.logfile = commandLog
self.color_print = get_color_print()
self.emerge_cache = EmergeCache()
self.emerge_cache.check_list = (
self.emerge_cache.check_list +
map(emerge_parser.GitCheckvalue,
self.clVars.Get('update.cl_update_rep_path')))
def _syncRepository(self, name, url, rpath, revision, branch,
cb_progress=None):
"""
Синхронизировать репозитори
"""
dv = self.clVars
git = Git()
needMeta = False
if not git.checkExistsRep(rpath):
if revision == "last":
git.cloneRepository(url, rpath, branch,
cb_progress=cb_progress)
else:
git.cloneRevRepository(url, rpath, branch, revision,
cb_progress=cb_progress)
needMeta = True
else:
# если нужно обновиться до конкретной ревизии
if revision != "last":
if revision == git.getCurrentCommit(rpath):
if git.getBranch(rpath) == branch:
return False
# получить изменения из удаленного репозитория
git.fetchRepository(rpath, cb_progress=cb_progress)
# если текущая ветка не соответствует нужной
repInfo = git.getStatusInfo(rpath)
if repInfo['branch'] != branch:
# меняем ветку
needMeta = True
git.checkoutBranch(rpath, branch)
if revision == "last":
if git.resetRepository(rpath, to_origin=True):
needMeta = True
else:
git.resetRepository(rpath, to_rev=revision)
needMeta = True
if needMeta:
dv.Set('cl_update_outdate_set', 'on', force=True)
layman = Layman(dv.Get('cl_update_layman_installed'),
dv.Get('cl_update_layman_make'))
if name != "portage":
layman.add(name, url, rpath)
return True
def syncRepositories(self, repname, clean_on_error=True):
"""
Синхронизировать репозитории
"""
dv = self.clVars
url, rpath, revision, branch = (
dv.Select(["cl_update_rep_url", "cl_update_rep_path",
"cl_update_rep_rev", "cl_update_branch_name"],
where="cl_update_rep_name", eq=repname, limit=1))
if not url or not rpath:
raise UpdateError(_("Repositories variables is not configured"))
self.addProgress()
if clean_on_error:
try:
if not self._syncRepository(repname, url, rpath, revision, branch,
cb_progress=self.setProgress):
return "skip"
return True
except GitError as e:
if e.addon:
self.printWARNING(str(e.addon))
self.printWARNING(str(e))
self.printWARNING(_("Re-fetch {name} repository"
).format(name=repname))
try:
removeDir(rpath)
except OSError as e:
raise UpdateError(_("Permission denied to change "
"{repname} repository").format(
repname=repname))
self._syncRepository(repname, url, rpath, revision, branch)
return True
def syncLaymanRepository(self, repname):
"""
Обновить репозиторий через layman
"""
layman = getProgPath('/usr/bin/layman')
if not layman:
raise UpdateError(_("Layman utility is not found"))
rpath = self.clVars.Select('cl_update_other_rep_path',
where='cl_update_other_rep_name', eq=repname,
limit=1)
laymanname = path.basename(rpath)
if Git.is_git(rpath):
self.addProgress()
p = PercentProgress(layman, "-s", laymanname, part=1, atty=True)
for perc in p.progress():
self.setProgress(perc)
else:
p = process(layman, "-s", repname, stderr=STDOUT)
if p.failed():
raise UpdateError(
_("Failed to update repository {rname}").format(rname=repname),
addon=p.read())
return True
def regenCache(self, repname):
"""
Обновить кэш метаданных репозитория
"""
egenCache = getProgPath('/usr/bin/egencache')
if not egenCache:
raise UpdateError(_("Portage utility is not found"))
cpu_num = self.clVars.Get('hr_cpu_num')
p = process(egenCache, "--repo=%s" % repname, "--update",
"--jobs=%s" % cpu_num, stderr=STDOUT)
if p.failed():
raise UpdateError(_("Failed to update cache of {rname} "
"repository").format(rname=repname),
addon=p.read())
return True
def emergeMetadata(self):
"""
Выполнить egencache и emerge --metadata
"""
emerge = getProgPath("/usr/bin/emerge")
if not emerge:
raise UpdateError(_("Emerge utility is not found"))
self.addProgress()
p = PercentProgress(emerge, "--metadata", part=1, atty=True)
for perc in p.progress():
self.setProgress(perc)
if p.failed():
raise UpdateError(_("Failed to update metadata"), addon=p.read())
return True
def eixUpdate(self):
"""
Выполенине eix-update для репозиторием
eix-update выполнятется только для тех репозиториев, которые
обновлялись, если cl_update_eixsync_force==auto, либо
все, если cl_update_eixupdate_force==force
"""
eixupdate = getProgPath("/usr/bin/eix-update")
if not eixupdate:
raise UpdateError(_("Eix utility is not found"))
self.addProgress()
excludeList = []
if self.clVars.Get('cl_update_eixupdate_force') == 'force':
countRep = len(self.clVars.Get('cl_update_rep_name'))
else:
for rep in self.clVars.Get('cl_update_rep_name'):
# подстановка имен
mapNames = {'portage': 'gentoo'}
if not rep in self.clVars.Get('cl_update_sync_rep'):
excludeList.extend(["-x", mapNames.get(rep, rep)])
countRep = len(self.clVars.Get('cl_update_sync_rep'))
if (self.clVars.Get('cl_update_other_set') == 'on' or
self.clVars.Get('cl_update_eixupdate_force') == 'force'):
countRep += len(self.clVars.Get('update.cl_update_other_rep_name'))
else:
for rep in self.clVars.Get('update.cl_update_other_rep_name'):
excludeList.extend(['-x', rep])
p = PercentProgress(eixupdate, "-F", *excludeList, part=countRep or 1,
atty=True)
for perc in p.progress():
self.setProgress(perc)
if p.failed():
raise UpdateError(_("Failed to update eix cache"), addon=p.read())
return True
def _printEmergePackage(self, pkg, binary=False, num=1, max_num=1):
self.endTask()
_print = self.color_print
if max_num > 1:
one = _print.foreground(Colors.YELLOW).bold("{0}", num)
two = _print.foreground(Colors.YELLOW).bold("{0}", max_num)
part = " (%s of %s)" % (one, two)
else:
part = ""
if binary:
_print = _print.foreground(Colors.PURPLE)
else:
_print = _print.foreground(Colors.GREEN)
self.startTask(_("Emerging%s %s") % (part, _print(str(pkg))))
def _printInstallPackage(self, pkg, binary=False):
self.endTask()
_print = self.color_print
if binary:
_print = _print.foreground(Colors.PURPLE)
else:
_print = _print.foreground(Colors.GREEN)
self.startTask(_("Installing %s") %
_print(str(pkg)))
def _printUninstallPackage(self, pkg, num=1, max_num=1):
self.endTask()
_print = self.color_print
if max_num > 1:
one = _print.foreground(Colors.YELLOW).bold("{0}", num)
two = _print.foreground(Colors.YELLOW).bold("{0}", max_num)
part = " (%s of %s)" % (one, two)
else:
part = ""
_print = _print.foreground(Colors.RED)
self.startTask(_("Unmerging%s %s") % (part, _print.bold(str(pkg))))
def emergelike(self, cmd, *params):
cmd_path = getProgPath(cmd)
if not cmd_path:
raise UpdateError(_("Failed to find %s command") % cmd)
with EmergeParser(
emerge_parser.CommandExecutor(cmd_path, params)) as emerge:
emerge.emerging.add_observer(self._printEmergePackage)
emerge.installing.add_observer(self._printInstallPackage)
emerge.uninstalling.add_observer(self._printUninstallPackage)
try:
emerge.run()
except EmergeError:
self.printPre(self._emerge_translate(emerge.emerging_error.log))
raise
return True
def _display_pretty_package_list(self, pkglist, remove_list=False):
"""
Отобразить список пакетов в "удобночитаемом" виде
"""
_print = self.color_print
ebuild_color = TextState.Colors.GREEN
binary_color = TextState.Colors.PURPLE
remove_color = TextState.Colors.LIGHT_RED
for pkg in sorted([PackageInformation.add_info(x) for x in
pkglist],
key=lambda y: y['CATEGORY/PN']):
if remove_list:
pkgcolor = _print.foreground(remove_color)
else:
if pkg['binary']:
pkgcolor = _print.foreground(binary_color)
else:
pkgcolor = _print.foreground(ebuild_color)
fullname = _(pkg.info['DESCRIPTION']).capitalize()
shortname = pkgcolor("%s-%s" % (pkg["CATEGORY/PN"], pkg["PVR"]))
if "SIZE" in pkg and pkg['SIZE'] and pkg["SIZE"] != "0 kB":
size = " (%s)" % pkg["SIZE"]
else:
size = ""
self.printDefault(" - {fullname} {shortname}{size}".format(
fullname=fullname, shortname=shortname, size=size))
def _display_install_package(self, emerge):
"""
Отобразить список устанавливаемых пакетов
"""
# подробный список пакетов
if self.clVars.Get('cl_verbose_set') == 'on':
self.printPre(str(emerge.install_packages))
else:
_print = self.color_print
pkglist = emerge.install_packages.list
self.printSUCCESS(_print.bold(
_("List packages for installation")))
self._display_pretty_package_list(pkglist)
# TODO: список удаляемых пакетов во время установки
if str(emerge.download_size) != "0 kB":
self.printSUCCESS(_print.bold(
_("{size} will be downloaded").format(
size=emerge.download_size)))
def _display_remove_list(self, emerge):
"""
Отобразить список удаляемых пакетов
"""
# подробный список пакетов
if self.clVars.Get('cl_verbose_set') == 'on':
self.printPre(str(emerge.uninstall_packages))
else:
_print = self.color_print
pkglist = emerge.uninstall_packages.list
self.printSUCCESS(_print.bold(
_("List removal packages")))
self._display_pretty_package_list(pkglist, remove_list=True)
def getCacheOnWorld(self, params, packages, check=False):
if "@world" in packages:
from calculate.update.utils.cl_update import ClUpdateAction
elog = EmergeLog(
EmergeLogNamedTask(ClUpdateAction.log_names['premerge']))
if check and (elog.list or elog.remove_list):
self.emerge_cache.drop_cache()
return params, packages
installed_pkgs = elog.list
new_packages = self.emerge_cache.get_cached_package_list()
if new_packages is not None:
return "-1O", ["=%s" % x for x in new_packages
if not str(x) in installed_pkgs]
return params, packages
def updateCache(self, pkg_list):
self.emerge_cache.set_cache(pkg_list)
from calculate.update.utils.cl_update import ClUpdateAction
elog = EmergeLog(
EmergeLogNamedTask(ClUpdateAction.log_names['premerge']))
elog.mark_end_task()
def premerge(self, param, *packages):
"""
Вывести информацию об обновлении
"""
class MockEmergeCommand(EmergeCommand):
"""
Заглушка, для выполнения команд
"""
def __init__(self, packages, *args, **kwargs):
EmergeCommand.__init__(self, packages, *args, **kwargs)
def execute(self):
if self.child is None:
filename = '/tmp/mylog.log'
self.child = pexpect.spawn("/bin/cat",
[filename], maxread=20000, searchwindowsize=10000)
if not path.exists(filename):
raise EmergeError(_("File %s not found" % filename))
return self.child
param, packages = self.getCacheOnWorld(param, packages, check=True)
param = [param, "-pv"]
#print "PREMERGE",packages,param
if not packages:
self.printSUCCESS(_("The system is up to date"))
return True
with EmergeParser(EmergeCommand(list(packages),
extra_params=param)) as emerge:
try:
emerge.run()
if "@world" in packages:
if emerge.install_packages.remove_list:
self.emerge_cache.drop_cache()
else:
self.updateCache(emerge.install_packages.list)
if not emerge.install_packages.list:
self.printSUCCESS(_("The system is up to date"))
return True
self._display_install_package(emerge)
except EmergeError:
self.emerge_cache.drop_cache()
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
return self.askConfirm(
_("Would you like to merge these packages?"), "yes")
return True
def _emerge_translate(self, s):
return RegexpLocalization('cl_emerge').translate(str(s))
def emerge(self, param, *packages):
"""
Выполнить сборку пакета
"""
#TODO: проверить ошибку при depclean
class MockEmergeCommand(EmergeCommand):
"""
Заглушка, для выполнения команд
"""
def __init__(self, packages, *args, **kwargs):
EmergeCommand.__init__(self, packages, *args, **kwargs)
def execute(self):
filename = '/tmp/ppp.log'
if self.child is None:
self.child = pexpect.spawn("/bin/cat",
#['/tmp/emerge.noupdate'])
[filename])
if not path.exists(filename):
raise EmergeError(_("File %s not found" % filename))
return self.child
param, packages = self.getCacheOnWorld(param, packages)
#print "EMERGE",packages,param
ask_emerge = self.clVars.Get('cl_update_precheck_set') == 'off'
with EmergeParser(EmergeCommand(list(packages),
extra_params=[param])) as emerge:
try:
emerge.question.action = lambda x: False
emerge.run()
if not emerge.install_packages.list:
#self.printSUCCESS(_("Nothing to merge"))
return True
#if ask_emerge:
# self.printPre(str(emerge.install_packages))
except EmergeError:
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
#if (ask_emerge and self.askConfirm(
# _("Would you like to merge these packages?")) == 'no'):
# raise KeyboardInterrupt
emerge.command.send("yes\n")
emerge.emerging.add_observer(self._printEmergePackage)
emerge.installing.add_observer(self._printInstallPackage)
emerge.uninstalling.add_observer(self._printUninstallPackage)
try:
emerge.run()
except EmergeError as e:
self.printPre(self._emerge_translate(emerge.emerging_error.log))
raise
return True
def depclean(self):
"""
Выполнить очистку системы от лишних пакетов
"""
with EmergeParser(EmergeCommand(["--depclean"])) as emerge:
try:
emerge.question.action = lambda x: False
emerge.run()
if not emerge.uninstall_packages.list:
return True
self._display_remove_list(emerge)
except EmergeError:
self.printPre(self._emerge_translate(emerge.prepare_error))
raise
if (self.askConfirm(
_("Would you like to unmerge these packages?")) == 'no'):
return False
emerge.command.send("yes\n")
emerge.uninstalling.add_observer(self._printUninstallPackage)
try:
emerge.run()
except EmergeError:
self.printPre(self._emerge_translate(emerge.emerging_error.log))
raise
return True
def update_task(self, task_name):
"""
Декоратор для добавления меток запуска и останова задачи
"""
def decor(f):
def wrapper(self, *args, **kwargs):
logger = EmergeLog(EmergeLogNamedTask(task_name))
logger.mark_begin_task()
ret = f(self, *args, **kwargs)
if ret:
logger.mark_end_task()
return ret
return wrapper
return decor