You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-update/update/update.py

331 lines
14 KiB

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import sys
import time
from os import path
from calculate.lib.utils.files import process,getProgPath,STDOUT,removeDir
class UpdateError(Exception):
"""Update Error"""
class CloneRepError(Exception):
"""Clone repository error"""
from calculate.lib.cl_lang import setLocalTranslate,getLazyLocalTranslate
setLocalTranslate('cl_update3',sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class Update:
"""Основной объект для выполнения действий связанных с обновлением системы
"""
def _checkExistsRep(self,rpath):
"""
Проверить путь на наличие репозитория
"""
if path.exists(rpath):
if not path.isdir(rpath):
raise UpdateError(
_("Repository {path} is not directory").format(
path=rpath))
if not path.isdir(self.__gitDir(rpath)):
raise UpdateError(
_("Repository {path} is not git").format(
path=rpath))
return True
return False
def __getGit(self):
"""
Получить утилиту git
"""
git = getProgPath("/usr/bin/git")
if not git:
raise UpdateError(_("Git utility is not found"))
return git
def __gitDir(self,rpath):
return path.join(rpath,".git")
def _cloneRepository(self, url, rpath, branch):
"""
Сделать локальную копию репозитория
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
"""
git = self.__getGit()
gitClone = process(git,"clone","-q","--no-single-branch",
"--depth=1","-b",branch,url,rpath,stderr=STDOUT)
if gitClone.failed():
error = gitClone.read()
if "Remote branch %s not found"%branch in error:
raise UpdateError(
_("Branch {branch} not found in {url} repository").format(
branch=branch,url=url))
self.printERROR(error)
raise UpdateError(_("Failed to clone {url} repository").format(
url=url))
return True
def _cloneRevRepository(self, url, rpath, branch,revision):
"""
Сделать локальную копию репозитория с указанной ревизией
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
revision: если указана - сделать ревизию текущей
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
for cmd in (["init",rpath],
["remote","add","origin",url],
["fetch","--depth=1"],
["checkout","-b",branch,revision],
["branch",branch,"-u","origin/%s"%branch]):
if cmd[0] == "init":
gitCmd = process(*[git]+cmd,stderr=STDOUT)
else:
gitCmd = process(*[git,"--git-dir",git_dir,
"--work-tree",rpath]+cmd,stderr=STDOUT)
if gitCmd.failed():
error = gitCmd.read()
if "reference is not a tree" in error:
raise UpdateError(
_("Commit {revision} not found in {url} repository"
).format(revision=revision,url=url))
elif "upstream branch '%s' does not exist"%("origin/%s"%branch):
raise UpdateError(
_("Branch {branch} not found in {url} repository"
).format(branch=branch,url=url))
else:
self.printERROR(error)
raise UpdateError(_("Failed to clone {url} repository").format(
url=url))
return True
def _pullRepository(self,rpath,quiet_error=False):
"""
Обновить репозиторий до последней версии
"""
git = self.__getGit()
gitPull = process(git,"--git-dir",self.__gitDir(rpath),
"pull","--ff-only",stderr=STDOUT)
if gitPull.failed():
if not quiet_error:
self.printERROR(gitPull.read())
raise UpdateError(
_("Failed to update repository in {rpath}").format(
rpath=rpath))
return False
return True
def _fetchRepository(self,rpath):
"""
Получить изменения из удаленно репозитория
"""
git = self.__getGit()
gitFetch = process(git,"--git-dir",self.__gitDir(rpath),
"fetch",stderr=STDOUT)
if gitFetch.failed():
self.printERROR(gitFetch.read())
raise UpdateError(
_("Failed to update repository in {rpath}").format(
rpath=rpath))
return True
def _checkChanges(self,rpath):
"""
Проверить наличие изменений пользователем файлов в репозитории
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
git_status = process(git,"--git-dir",git_dir,"--work-tree",rpath,
"status","--porcelain",stderr=STDOUT)
if git_status.success():
for i in git_status:
if i.strip():
return False
else:
return True
else:
raise UpdateError(
_("Wrong repository in {rpath} directory").format(
rpath=rpath))
def _parseStatusInfo(self,data):
"""
Разобрать информацию полученную через git status -b --porcelain
Returns:
Словарь
# есть ли измененные файлы пользователем
{'files':True/False,
# есть коммиты после текущего
'ahead':True/False,
# есть коммиты перед текущим (означает, что обновление
# с основной веткой не осуществляется через fast-forward
'behind':True/False,
# текущая ветка
'branch':'',
# оригинальная ветка
'origin':'origin/master'}
"""
reStatus = re.compile("^## (\w+)(?:\.\.\.(\S+)\s+\[(ahead \d+)?"
"(?:, )?(behind \d+)?\])?\n?(.*|$)",re.S)
match = reStatus.search(data)
if not match:
return {}
return {'files':True if match.group(5) else False,
'ahead':True if match.group(3) else False,
'behind':True if match.group(4) else False,
'origin':match.group(2) or "",
'branch':match.group(1)}
def _getCurrentCommit(self,rpath):
"""
Получить текущий коммит в репозитории
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
git_show = process(git,"--git-dir",git_dir,"show","--format=format:%H",
"--quiet",stderr=STDOUT)
if git_show.success():
return git_show.read().strip()
else:
raise UpdateError(
_("Failed to get status of repository in "
"{rpath} directory").format(
rpath=rpath))
def _getStatusInfo(self,rpath):
"""
Получить информацию об изменениях в репозитории
Returns:
Словарь выдаваемый функцией _parseStatusInfo
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
git_status = process(git,"--git-dir",git_dir,"--work-tree",rpath,
"status","-b","--porcelain",stderr=STDOUT)
if git_status.success():
retDict = self._parseStatusInfo(git_status.read())
if not retDict:
raise UpdateError(
_("Failed to get status of repository in "
"{rpath} directory").format(
rpath=rpath))
return retDict
else:
raise UpdateError(
_("Wrong repository in {rpath} directory").format(
rpath=rpath))
def _resetRepository(self, rpath, to_origin=False, to_rev=None, info=None):
"""
Удалить неиндексированные изменения в репозитории
Args:
to_origin: откатить все изменения до удаленного репозитория
to_rev: откатить все изменения до определенной ревизии
info: использовать уже полученную информация об изменения в репозитории
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
if to_origin and not info:
info = self._getStatusInfo(rpath)
commit = (info['origin'] if to_origin else to_rev) or "HEAD"
git_reset = process(git,"--git-dir",git_dir,"--work-tree",rpath,
"reset","--hard", commit, stderr=STDOUT)
git_clean = process(git,"--git-dir",git_dir,"--work-tree",rpath,
"clean","-fd",stderr=STDOUT)
if git_reset.failed() or git_clean.failed():
raise UpdateError(_("Failed to clean {rpath} repository").format(
rpath=rpath))
def _getBranch(self,rpath):
"""
Получить текущую ветку
"""
return self._getStatusInfo(rpath)['branch']
def _checkoutBranch(self,rpath,branch):
"""
Сменить ветку
"""
git = self.__getGit()
git_dir = self.__gitDir(rpath)
git_checkout = process(git,"--git-dir",git_dir,
"--work-tree", rpath,
"checkout","-f",branch, stderr=STDOUT)
if git_checkout.failed():
error = git_checkout.read()
if "pathspec '%s' did not match"%branch in error:
raise UpdateError(
_("Branch {branch} not found in {rpath} repository").format(
branch=branch,rpath=rpath))
self.printERROR(error)
raise UpdateError(
_("Failed to change branch to {branch} in "
"{rpath} repository").format(branch=branch,
rpath=rpath))
return True
def syncRepositories(self,repositories):
"""
Синхронизировать репозитории
"""
# TODO: удалять не git репозиторий
# TODO: прогресс скачивания
dv = self.clVars
for name, url, rpath, revision, branch in (reversed(
filter(lambda x:x[0] in repositories,
dv.Get('cl_update_rep_data')))):
if not self._checkExistsRep(rpath):
if revision == "last":
self._cloneRepository(url, rpath, branch)
else:
self._cloneRevRepository(url, rpath, branch, revision)
else:
# если нужно обновиться до конкретной ревизии
if revision != "last":
if revision == self._getCurrentCommit(rpath):
if self._getBranch(rpath) == branch:
continue
# получить изменения из удаленного репозитория
self._fetchRepository(rpath)
# если текущая ветка не соответствует нужной
repInfo = self._getStatusInfo(rpath)
if repInfo['branch'] != branch:
# меняем ветку
self._checkoutBranch(rpath,branch)
if revision == "last":
self._resetRepository(rpath, to_origin=True,
info=repInfo)
else:
self._resetRepository(rpath, to_rev=revision,
info=repInfo)
return True