You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/git.py

795 lines
33 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2015-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
import os
from os import path
from calculate.lib.configparser import ConfigParser
import shutil
from calculate.lib.utils.files import (getProgPath, STDOUT,
PercentProgress, process, readFile,
writeFile, listDirectory, removeDir)
from calculate.lib.utils.tools import AddonError
from calculate.lib.utils.ip import check_port
_ = lambda x: x
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
class GitError(AddonError):
"""Git Error"""
class Git:
"""
Объект для управление git репозиторием
"""
skip_files = ["metadata/md5-cache",
"metadata/cache"]
def __init__(self):
self._git = self.__getGit()
def checkExistsRep(self, rpath):
"""
Проверить путь на наличие репозитория
"""
if path.exists(rpath):
if not path.isdir(rpath):
raise GitError(
_("Repository {path} is not a directory").format(
path=rpath))
if not path.isdir(self._gitDir(rpath)):
raise GitError(
_("Repository {path} is not Git").format(
path=rpath))
return True
return False
def __getGit(self):
"""
Получить утилиту git
"""
git = getProgPath("/usr/bin/git")
if not git:
raise GitError(_("The Git tool is not found"))
return git
@staticmethod
def _gitDir(rpath):
return path.join(rpath, ".git")
@staticmethod
def is_git(gitpath):
return path.isdir(Git._gitDir(gitpath))
def get_url(self, rpath, remote_name):
config = ConfigParser(strict=False)
config.read(path.join(Git._gitDir(rpath), "config"), encoding="utf-8")
res = config.get('remote "%s"' % remote_name, "url",
raw=True, fallback="")
if res:
return res
fetch_info = self._getFetchHeadInfo(Git._gitDir(rpath))
if "url" in fetch_info:
return fetch_info['url']
return ""
def cloneRepository(self, url, rpath, branch, cb_progress=None):
"""
Сделать локальную копию репозитория
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
"""
if cb_progress:
gitClone = PercentProgress(self._git, "clone", "-q",
"--no-single-branch", "--progress",
"--verbose",
"--depth=1", "-b", branch, url, rpath,
part=4, stderr=STDOUT)
for perc in gitClone.progress():
cb_progress(perc)
else:
gitClone = process(self._git, "clone", "-q", "--no-single-branch",
"--depth=1", "-b", branch, url, rpath,
stderr=STDOUT)
if gitClone.failed():
error = gitClone.read()
if "Remote branch %s not found" % branch in error:
raise GitError(
_("Branch {branch} not found in repository {url}").format(
branch=branch, url=url))
raise GitError(_("Failed to clone repository {url}").format(
url=url), error)
return True
def _git_command(self, git_dir, rpath, error, cb_progress=None):
def command(*args, **kw):
if "init" == args[0]:
wholeCommand = [self._git, "init", rpath] + list(args[1:])
else:
wholeCommand = [self._git, "--git-dir", git_dir,
"--work-tree", rpath] + list(args)
if cb_progress and args[0] in ("reset", "fetch", "checkout"):
gitCmd = PercentProgress(
*wholeCommand,
stderr=STDOUT, atty=True, **kw)
for perc in gitCmd.progress():
cb_progress(perc)
else:
gitCmd = process(*wholeCommand, stderr=STDOUT)
if gitCmd.failed():
error.append(gitCmd.read())
return False
return True
return command
def _getFetchHeadInfo(self, git_dir):
data = readFile(path.join(git_dir, "FETCH_HEAD")).strip()
re_fetch = re.compile(r"^(\w+)\s+(?:(branch|tag) '([^']+)' of )?(\S+)$")
m = re_fetch.search(data)
if m:
d = {'commit': m.group(1),
'url': m.group(4)}
if m.group(2):
type_map = {'tag': self.Reference.Tag,
'branch': self.Reference.Branch}
d['type'] = type_map.get(m.group(2), self.Reference.Unknown)
d['reference'] = m.group(3)
return d
return {}
def _copy_file(self, git_dir, file_url, fn):
"""
Переместить index файл из одного репозитория в другой
:param git_dir:
:param file_url:
:return:
"""
old_index = path.join(file_url[7:], fn)
new_index = path.join(git_dir, fn)
if path.exists(old_index):
try:
with open(old_index, 'r') as in_f:
with writeFile(new_index) as out_f:
out_f.write(in_f.read())
return True
except IOError:
pass
return False
def cloneTagRepository(self, url, rpath, reference, cb_progress=None,
copy_index=False):
"""
Сделать локальную копию репозитория для переключения по тэгам
в отличии от обычного репозитория не содерит не remote, не ветки,
только срез ревизии и тэги
:param url: откуда качать репозиторий
:param rpath: куда сохранять репозиторий
:param reference: метка на которую нужно перейти
:param cb_progress: callback для прогрессбара
:return: True в случае успешного клонирования на указанную метку
:raise: GitError не удалось скачать, получить данные, метка не найдена
"""
git_dir = self._gitDir(rpath)
error = []
command = self._git_command(git_dir, rpath, error, cb_progress)
def command_fetch(**kw):
"""
Получить данные репозитория (если shallow.lock - удалит и повторить)
"""
res = command("fetch", "--depth=1", url, reference, **kw)
if not res:
if self._clear_lock(git_dir, "shallow.lock", error):
res = command("fetch", "--depth=1", url, reference, **kw)
if not res:
raise GitError(
_("Reference {mark} not found in repository {url}"
).format(mark=reference, url=url))
return res
# получить последние коммиты из удаленного репозитория
if command("init"):
if copy_index:
copy_index = self._copy_file(git_dir, url, "index")
if copy_index:
command_fetch(part=3)
fetch_type = self.saveFetchHead(git_dir)
if fetch_type == self.Reference.Tag:
# создать master ветку с ссылкой на HEAD
self.saveFetchHead(
git_dir, force_type=self.Reference.Branch,
force_ref=self.Reference.Master)
reference = self.Reference.Master
command("symbolic-ref", "HEAD", "refs/heads/%s" % reference)
return True
else:
command_fetch(part=4, end=False)
if command("reset", "--hard", "FETCH_HEAD", part=4,
startpart=3):
fetch_type = self.saveFetchHead(git_dir)
if fetch_type == self.Reference.Branch:
command("symbolic-ref", "HEAD",
"refs/heads/%s" % reference)
return True
raise GitError(_("Failed to clone repository {url}").format(
url=url), error[-1])
# progressParam = {'fetch': {'part': 4, 'end': False},
# 'checkout': {'part': 4, 'startpart': 3}}
def trimRepository(self, rpath, cb_progress=None):
"""
Удалить в репозитории лишнии объекты
"""
git_dir = self._gitDir(rpath)
head_info = self._getFetchHeadInfo(git_dir)
if not head_info or not head_info.get('reference', ''):
return False
reference = head_info['reference']
git_dir_old = "%s.old" % git_dir
try:
shutil.move(git_dir, git_dir_old)
except OSError:
raise GitError(_("Failed to optimize repository %s") % rpath)
file_url = "file://%s" % git_dir_old
self.cloneTagRepository(file_url,
rpath, reference, cb_progress,
copy_index=True)
self._copy_file(git_dir, file_url, "config")
self._copy_file(git_dir, file_url, "FETCH_HEAD")
try:
removeDir(git_dir_old)
except Exception:
raise GitError(_("Failed to optimize repository %s") % rpath)
def isTagRepository(self, rpath):
"""
:param rpath: путь до репозитория
:return: True репозиторий переключается по тэгам
"""
git_path = self._gitDir(rpath)
data = self._getFetchHeadInfo(git_path)
if "type" in data and data['type'] == self.Reference.Tag:
return True
return False
class Reference(object):
Branch = "branch"
Tag = "tag"
# RemoteBranch = "remote_branch"
Unknown = "unknown"
Master = "master"
Develop = "develop"
Update = "update"
def reference_type(self, rpath, reference):
"""
Получить тип ссылки на коммит
:param rpath: путь до локального репозитория
:param reference: имя ссылки
:return: Reference (Branch - ветка, Tag - тэг,
Unknown - всё остальное)
"""
refs_dir = path.join(rpath, ".git/refs")
if reference in listDirectory(path.join(refs_dir, 'tags')):
return self.Reference.Tag
elif reference in listDirectory(path.join(refs_dir, 'heads')):
return self.Reference.Branch
return self.Reference.Unknown
def _clear_lock(self, git_dir, lock_fn, error):
if lock_fn in error[-1]:
fn = path.join(git_dir, lock_fn)
if path.exists(fn):
try:
os.unlink(fn)
return True
except OSError:
pass
return False
def updateTagRepository(self, url, rpath, reference, cb_progress=None,
clean=False):
"""
Обновить данные до указанного тега репозитория
:param url: откуда качать репозиторий
:param rpath: куда сохранять репозиторий
:param reference: метка на которую нужно перейти
:param cb_progress: callback для прогрессбара
:return: True в случае успешного клонирования на указанную метку
:raise: GitError не удалось скачать, получить данные, метка не найдена
"""
git_dir = self._gitDir(rpath)
error = []
command = self._git_command(git_dir, rpath, error, cb_progress)
# получить последние коммиты из удаленного репозитория
try:
if (self.reference_type(rpath, reference) == Git.Reference.Tag and
self.isTagRepository(rpath) and
self.getCurrentCommit(rpath) == self.getCommit(
rpath, reference)):
if clean:
status = self.getStatusInfo(rpath)
if status and not status["files"]:
return False
else:
return False
except GitError:
pass
res = command("fetch", "--depth=1", url, reference, part=4, end=False)
if not res:
if self._clear_lock(git_dir, "shallow.lock", error):
res = command("fetch", "--depth=1",
url, reference, part=4, end=False)
if not res:
raise GitError(
_("Reference {mark} not found in repository {url}"
).format(mark=reference, url=url))
res = command("reset", "--hard", "FETCH_HEAD", part=4, startpart=3)
if not res:
if self._clear_lock(git_dir, "index.lock", error):
res = command("reset", "--hard", "FETCH_HEAD",
part=4, startpart=3)
if res:
if clean:
command("clean", "-df")
if self.saveFetchHead(git_dir) == self.Reference.Branch:
command("symbolic-ref", "HEAD", "refs/heads/%s" % reference)
return True
raise GitError(_("Failed to clone repository {url}").format(
url=url), error[-1])
# TODO: метод удаление лишних объектов, оставляя только текущий коммит
def saveFetchHead(self, git_dir, force_type=None, force_ref=None):
"""
Сохранить FEACH_HEAD как объект (tag или branch)
:param git_dir: каталог .git репозитория
:return: Reference
"""
fetch_info = self._getFetchHeadInfo(git_dir)
if force_type is not None:
fetch_info['type'] = force_type
if force_ref is not None:
fetch_info['reference'] = force_ref
tag_file = None
if fetch_info.get('type', '') == self.Reference.Tag:
tag_file = path.join(git_dir, "refs/tags/%s"
% fetch_info['reference'])
elif fetch_info.get('type', '') == self.Reference.Branch:
tag_file = path.join(git_dir, "refs/heads/%s"
% fetch_info['reference'])
if tag_file:
with writeFile(tag_file) as f:
f.write(fetch_info['commit'])
return fetch_info['type']
return self.Reference.Unknown
class GitProtocol(object):
SSH = 22
Git = 9418
HTTP = 80
HTTPS = 443
FTP = 21
FTPS = 990
Rsync = 873
Unknown = 0
def _parse_url(self, url):
"""
Разобрать url на составляющие: тип и хост
:param url: разбираемый URL
:return: (тип, хост)
"""
net_protocols = {
'ssh': self.GitProtocol.SSH,
'ssh+git': self.GitProtocol.SSH,
'git+ssh': self.GitProtocol.SSH,
'http': self.GitProtocol.HTTP,
'https': self.GitProtocol.HTTPS,
'ftp': self.GitProtocol.FTP,
'ftps': self.GitProtocol.FTPS,
'rsync': self.GitProtocol.Rsync,
'git': self.GitProtocol.Git}
re_host = re.compile(
"(%s)://(?:[^@]+@)?([^:/]+)" %
"|".join(x.replace("+", r"\+") for x in net_protocols.keys())
)
net_match = re_host.search(url)
if net_match:
return net_protocols[net_match.group(1)], net_match.group(2)
re_scp = re.compile("(?:[^@]+@)?([^:]+):(//)?")
scp_match = re_scp.search(url)
if scp_match and scp_match.group(2) is None:
return self.GitProtocol.SSH, scp_match.group(1)
return self.GitProtocol.Unknown, ""
def checkUrl(self, url):
"""
Проверить доступность указанного URL
ssh://[user@]host.xz[:port]/path/to/repo.git/
git://host.xz[:port]/path/to/repo.git/
http[s]://host.xz[:port]/path/to/repo.git/
ftp[s]://host.xz[:port]/path/to/repo.git/
rsync://host.xz/path/to/repo.git/
[user@]host.xz:path/to/repo.git/
/path/to/repo.git/
file:///path/to/repo.git/
:param url: проверяемый URL
:return:
"""
git_port, target = self._parse_url(url)
if git_port != self.GitProtocol.Unknown:
return check_port(target, git_port)
return True
def isNeedUnpack(self, rpath, fns=(".gitignore",
"metadata/.gitignore")):
"""
Проверить репозиторий на то, что он еще не распакован
:param rpath: путь до репозитория
:param fns: имя файла
:return:
"""
for fn in fns:
if path.exists(path.join(rpath, fn)):
return False
else:
git_dir = self._gitDir(rpath)
git_log = process(self._git, "--git-dir", git_dir, "log",
"-n1", "--format=format:%H",
"--quiet", "--", *fns, stderr=STDOUT)
if git_log.success() and git_log.read().strip():
return True
status = self.getStatusInfo(rpath)
if not status or status['files']:
return True
return False
def cloneRevRepository(self, url, rpath, branch, revision,
cb_progress=None):
"""
Сделать локальную копию репозитория с указанной ревизией
Args:
url: откуда качать репозиторий
rpath: куда сохранять репозиторий
branch: ветка на которую необходимо переключиться
revision: если указана - сделать ревизию текущей
Return:
Возвращает True если клонирование произведено с установкой на
указанную ревизию. False если клонирование произведено с
установкой на последнюю ревизию.
Raises:
GitError: Выполнение ключевых команд выполнено с ошибками (не
удалось скачать и получить данные из удаленного репозитория)
"""
git_dir = self._gitDir(rpath)
error = []
def command(cmd, startpart=0):
"""
Выполнить одну из команд необходимой для клонирования репозитория
"""
commands = { # инициализация пустого репозитория
'init': ["init", rpath],
# подключить указанный удаленный как origin
'add_remote': ["remote", "add", "origin", url],
# скачать последние коммиты веток
'fetchshallow': ["fetch", "--depth=1"],
# проверить есть указанный коммит
'has_revision': ["log", "-n1", revision],
# проверить есть указанный коммит
'has_branch': ["log", "-n1",
"remotes/origin/%s" % branch],
# получить ревизию из revs тэгов
'get_rev_tag': ["fetch", "--depth=1", "origin",
"+refs/tags/%s:refs/remotes/origin/%s" %
(revision, branch)],
# переключиться на указанную ревизию указанной веткой
'checkout_revision': ["checkout", "-b", branch,
revision],
# переключить на указанную ветку
'checkout': ["checkout", branch],
# установить upstream для локальной ветки
'set_track': ["branch", branch, '-u',
"origin/%s" % branch]
}
if cmd == "init":
wholeCommand = [self._git] + commands[cmd]
else:
wholeCommand = [self._git, "--git-dir", git_dir,
"--work-tree", rpath] + commands[cmd]
if cb_progress and commands[cmd][0] in ("fetch", "checkout"):
progressParam = {'fetch': {'part': 4, 'end': False},
'checkout': {'part': 4, 'startpart': 3}}
gitCmd = PercentProgress(
*wholeCommand + ["--progress", "--verbose"],
stderr=STDOUT, **progressParam)
for perc in gitCmd.progress():
cb_progress(perc)
else:
gitCmd = process(*wholeCommand, stderr=STDOUT)
if gitCmd.failed():
error.append(gitCmd.read())
return False
return True
# получить последние коммиты из удаленного репозитория
if command("init") and command("add_remote"):
if command("get_rev_tag") or command("fetchshallow"):
if not command("has_branch"):
raise GitError(
_("Branch {branch} not found in repository {url}"
).format(branch=branch, url=url))
# если среди коммитов есть указанный коммит
if command("has_revision"):
# переключаемся на нужный коммита, устанавливаем связь
if command("checkout_revision") and command("set_track"):
return True
elif command("checkout"):
return False
raise GitError(_("Failed to clone repository {url}").format(
url=url), error[-1])
def pullRepository(self, rpath, quiet_error=False, cb_progress=None):
"""
Обновить репозиторий до последней версии
"""
gitPull = process(self._git, "--git-dir", self._gitDir(rpath),
"pull", "--ff-only", stderr=STDOUT)
if gitPull.failed():
if not quiet_error:
error = gitPull.read()
raise GitError(
_("Failed to update the repository in {rpath}").format(
rpath=rpath), error)
return False
return True
def fetchRepository(self, rpath, cb_progress=None):
"""
Получить изменения из удаленно репозитория
"""
if cb_progress:
gitFetch = PercentProgress(self._git, "--git-dir",
self._gitDir(rpath),
"fetch", "--progress", "--verbose",
part=3, stderr=STDOUT)
for perc in gitFetch.progress():
cb_progress(perc)
else:
gitFetch = process(self._git, "--git-dir", self._gitDir(rpath),
"fetch", stderr=STDOUT)
if gitFetch.failed():
error = gitFetch.read()
raise GitError(
_("Failed to update the repository in {rpath}").format(
rpath=rpath), error)
return True
def checkChanges(self, rpath):
"""
Проверить наличие изменений пользователем файлов в репозитории
"""
git_dir = self._gitDir(rpath)
git_status = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"status", "--porcelain", stderr=STDOUT)
if git_status.success():
return not any(x.strip() for x in git_status)
else:
raise GitError(
_("Wrong repository in the {rpath} directory").format(
rpath=rpath))
def parseStatusInfo(self, data):
"""
Разобрать информацию полученную через git status -b --porcelain
Returns:
Словарь
# есть ли измененные файлы пользователем
{'files':True/False,
# есть коммиты после текущего
'ahead':True/False,
# есть коммиты перед текущим (означает, что обновление
# с основной веткой не осуществляется через fast-forward
'behind':True/False,
# текущая ветка
'branch':'',
# оригинальная ветка
'origin':'origin/master'}
"""
reStatus = re.compile(
"^## (\w+)\s*(\(no branch\))?(?:\.\.\.(\S+))?(?:\s+\[(ahead \d+)?"
"(?:, )?(behind \d+)?\])?\n?(.*|$)", re.S)
match = reStatus.search(data)
if not match:
return {}
return {'files': True if match.group(6) else False,
'ahead': True if match.group(4) else False,
'behind': True if match.group(5) else False,
'origin': match.group(3) or "",
'branch': "" if match.group(2) else match.group(1)}
def getCurrentCommit(self, rpath):
"""
Получить текущий коммит в репозитории
"""
return self.getCommit(rpath, "HEAD")
def getCommit(self, rpath, reference):
git_dir = self._gitDir(rpath)
git_show = process(self._git, "--git-dir", git_dir, "log",
"-n1", "--format=format:%H",
"--quiet", reference, stderr=STDOUT)
if git_show.success():
return git_show.read().strip()
else:
raise GitError(
_("Failed to get the repository status for {rpath}").format(
rpath=rpath))
def getStatusInfo(self, rpath):
"""
Получить информацию об изменениях в репозитории
Returns:
Словарь выдаваемый функцией _parseStatusInfo
"""
git_dir = self._gitDir(rpath)
git_status = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"status", "-b", "--porcelain", stderr=STDOUT)
if git_status.success():
retDict = self.parseStatusInfo(git_status.read())
if not retDict:
raise GitError(
_("Failed to get the repository status for {rpath}").format(
rpath=rpath))
return retDict
else:
raise GitError(
_("Wrong repository in the {rpath} directory").format(
rpath=rpath))
def resetRepository(self, rpath, to_origin=False, to_rev=None, info=None):
"""
Удалить неиндексированные изменения в репозитории
Args:
to_origin: откатить все изменения до удаленного репозитория
to_rev: откатить все изменения до определенной ревизии
info: использовать уже полученную информация об изменения
в репозитории
Return:
True - успешное выполнение
False - нет необходимости выполнять reset
Raises:
GitError: выполнение комманд reset и clean прошло с ошибкой
"""
git_dir = self._gitDir(rpath)
if not info:
info = self.getStatusInfo(rpath)
if (all(not info[x] for x in ("files", "ahead", "behind") if x in info)
and (not info["origin"] or
"origin/%s" % info["branch"] == info["origin"])):
return False
commit = (info['origin'] if to_origin else to_rev) or "HEAD"
git_reset = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"reset", "--hard", commit, stderr=STDOUT)
git_clean = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"clean", "-fd", stderr=STDOUT)
if git_reset.failed() or git_clean.failed():
raise GitError(_("Failed to clean the {rpath} repository").format(
rpath=rpath))
return True
def getBranch(self, rpath):
"""
Получить текущую ветку
"""
return self.getStatusInfo(rpath)['branch']
def getTags(self, rpath):
"""
Получить список тэгов, отсортированных по версии
"""
git_dir = self._gitDir(rpath)
git_tag = process(self._git, "--git-dir", git_dir, "--work-tree",
rpath,
"tag", "--sort=version:refname", stderr=STDOUT)
if git_tag.success():
return [x.strip() for x in git_tag if x.strip()]
return []
def checkoutBranch(self, rpath, branch):
"""
Сменить ветку
"""
git_dir = self._gitDir(rpath)
git_checkout = process(self._git, "--git-dir", git_dir,
"--work-tree", rpath,
"checkout", "-f", branch, stderr=STDOUT)
if git_checkout.failed():
error = git_checkout.read()
if "pathspec '%s' did not match" % branch in error:
raise GitError(
_("Branch {branch} not found in repository {rpath}").format(
branch=branch, rpath=rpath))
raise GitError(
_("Failed to move to branch {branch} in the {rpath} "
"repository").format(branch=branch,
rpath=rpath), error)
return True
class MTimeKeeper(object):
"""
Сохранить и восстановить mtime на файлы
"""
def __init__(self, dn):
self.data = {}
self.dn = dn
def save(self):
for fn in listDirectory(self.dn, fullPath=True):
try:
stat_data = os.stat(fn)
self.data[fn] = {'size': stat_data.st_size,
'mtime': stat_data.st_mtime,
'atime': stat_data.st_atime,
}
except OSError:
pass
def restore(self):
for fn in self.data.keys():
try:
if path.exists(fn):
stat_data = os.stat(fn)
if stat_data.st_size == self.data[fn]['size']:
os.utime(fn, (self.data[fn]['atime'],
self.data[fn]['mtime']))
except OSError:
pass