|
|
# vim: fileencoding=utf-8
|
|
|
#
|
|
|
import os
|
|
|
import re
|
|
|
import glob
|
|
|
from collections import OrderedDict
|
|
|
from .files import read_file, read_link, join_paths, FilesError
|
|
|
from pyparsing import (
|
|
|
Literal,
|
|
|
Regex,
|
|
|
Word,
|
|
|
nums,
|
|
|
alphanums,
|
|
|
LineEnd,
|
|
|
SkipTo,
|
|
|
)
|
|
|
from jinja2 import PackageLoader, Environment
|
|
|
from calculate.utils.tools import Singleton
|
|
|
import hashlib
|
|
|
|
|
|
|
|
|
class PackageError(Exception):
|
|
|
'''Исключение выбрасываемое при ошибках в объектах Package, работающих
|
|
|
с CONTENTS-файлами пакетов.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
# Коды ошибок ATOM-парсера.
|
|
|
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
|
|
|
|
|
|
|
|
|
class PackageAtomError(Exception):
|
|
|
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.'''
|
|
|
def __init__(self, message='Package atom error', errno=DEFAULT):
|
|
|
self.message = message
|
|
|
self.errno = errno
|
|
|
|
|
|
|
|
|
class VersionError(Exception):
|
|
|
'''Исключение выбрасываемое объектами Version.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
class PackageNotFound(Exception):
|
|
|
'''Специальное исключение выбрасываемое, если не удалось найти пакет, к
|
|
|
которому принадлежит файл.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
class Version:
|
|
|
_suffix_order = {'alpha': 0, 'beta': 1, 'pre': 2,
|
|
|
'rc': 3, 'no': 4, 'p': 5}
|
|
|
|
|
|
'''Класс для работы со значениями версий.'''
|
|
|
def __init__(self, version_value=None):
|
|
|
if version_value is None:
|
|
|
self._string = '-1'
|
|
|
self._value = [-1]
|
|
|
self._literal = None
|
|
|
self._suffix = [(4, 0)]
|
|
|
self._revision = 0
|
|
|
elif isinstance(version_value, Version):
|
|
|
self._string = version_value._string
|
|
|
self._value = version_value._value
|
|
|
self._literal = version_value._literal
|
|
|
self._suffix = version_value._suffix
|
|
|
self._revision = version_value._revision
|
|
|
else:
|
|
|
value = self._get_version_value(version_value)
|
|
|
if value is None:
|
|
|
raise VersionError(
|
|
|
"Can't initialize Version object using '{0}'"
|
|
|
" value with type {1}".format(version_value,
|
|
|
type(version_value)))
|
|
|
self._string = value['string']
|
|
|
self._value = value['value']
|
|
|
self._literal = value['literal']
|
|
|
self._suffix = value['suffix']
|
|
|
self._revision = value['revision']
|
|
|
|
|
|
def _get_version_value(self, version):
|
|
|
'''Вспомогательный метод для получения значения версии, представленного
|
|
|
в виде списка.'''
|
|
|
if isinstance(version, Version):
|
|
|
version_value = {'string': version._string,
|
|
|
'value': version._value,
|
|
|
'literal': version._literal,
|
|
|
'suffix': version._suffix,
|
|
|
'revision': version._revision}
|
|
|
|
|
|
elif isinstance(version, int):
|
|
|
version_value = {'string': str(int),
|
|
|
'value': [version],
|
|
|
'literal': '',
|
|
|
'suffix': [(4, 0)],
|
|
|
'revision': 0}
|
|
|
|
|
|
elif isinstance(version, float):
|
|
|
version_list = []
|
|
|
version = str(version).split('.')
|
|
|
for version_part in version:
|
|
|
version_list.append(int(version_part.strip()))
|
|
|
version_value = {'string': str(version),
|
|
|
'value': version_list,
|
|
|
'literal': '',
|
|
|
'suffix': (4, 0),
|
|
|
'revision': 0}
|
|
|
|
|
|
elif isinstance(version, str):
|
|
|
|
|
|
parse_result = PackageAtomParser.version_regex.search(
|
|
|
version.strip('-'))
|
|
|
if not parse_result:
|
|
|
return
|
|
|
result_dict = parse_result.groupdict()
|
|
|
version_value = {'string': version}
|
|
|
|
|
|
version_list = []
|
|
|
for version_part in result_dict['value'].split('.'):
|
|
|
version_list.append(int(version_part.strip('-')))
|
|
|
version_value['value'] = version_list
|
|
|
|
|
|
# Парсим литерал, если он есть.
|
|
|
version_value['literal'] = result_dict['literal'] or ''
|
|
|
|
|
|
# Парсим всю совокупность имеющихся суффиксов.
|
|
|
suffixes = result_dict['suffix']
|
|
|
suffix_list = []
|
|
|
if suffixes is not None:
|
|
|
suffixes = suffixes.strip('_')
|
|
|
suffixes = suffixes.split('_')
|
|
|
for suffix in suffixes:
|
|
|
result = re.search(r'([^\d]+)(\d+)?', suffix)
|
|
|
suffix_list.append((self._suffix_order[result.group(1)],
|
|
|
int(result.group(2) or 0)))
|
|
|
else:
|
|
|
suffix_list = [(self._suffix_order['no'], 0)]
|
|
|
version_value['suffix'] = suffix_list
|
|
|
|
|
|
# Парсим ревизию.
|
|
|
if parse_result['revision'] is not None:
|
|
|
version_value['revision'] = int(
|
|
|
parse_result['revision'].strip('-r'))
|
|
|
else:
|
|
|
version_value['revision'] = 0
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
return version_value
|
|
|
|
|
|
def _compare_lists(self, lversion, rversion, filler=0):
|
|
|
'''Метод для сравнения двух списков, даже если если они не одинаковы.
|
|
|
Возвращает 0, если списки равны, 1 если lversion больше, -1 если
|
|
|
lversion меньше.'''
|
|
|
if lversion == rversion:
|
|
|
return 0
|
|
|
|
|
|
for index in range(0, max(len(lversion), len(rversion))):
|
|
|
lvalue = lversion[index] if len(lversion) > index else filler
|
|
|
rvalue = rversion[index] if len(rversion) > index else filler
|
|
|
if lvalue == rvalue:
|
|
|
continue
|
|
|
|
|
|
if lvalue > rvalue:
|
|
|
return 1
|
|
|
else:
|
|
|
return -1
|
|
|
return 0
|
|
|
|
|
|
@property
|
|
|
def string(self):
|
|
|
return self._string
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
'''Перегрузка x < y.'''
|
|
|
other_version = self._get_version_value(other)
|
|
|
if other_version is None:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
cmp_res = self._compare_lists(self._value, other_version['value'])
|
|
|
if cmp_res != 0:
|
|
|
return cmp_res < 0
|
|
|
|
|
|
if self._literal != other_version['literal']:
|
|
|
return self._literal < other_version['literal']
|
|
|
|
|
|
cmp_res = self._compare_lists(self._suffix,
|
|
|
other_version['suffix'],
|
|
|
filler=(4, 0))
|
|
|
if cmp_res != 0:
|
|
|
return cmp_res < 0
|
|
|
|
|
|
return self._revision < other_version['revision']
|
|
|
|
|
|
def __le__(self, other):
|
|
|
'''Перегрузка x <= y.'''
|
|
|
other_version = self._get_version_value(other)
|
|
|
if other_version is None:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
cmp_res = self._compare_lists(self._value, other_version['value'])
|
|
|
if cmp_res != 0:
|
|
|
return cmp_res < 0
|
|
|
if self._literal != other_version['literal']:
|
|
|
return self._literal < other_version['literal']
|
|
|
cmp_res = self._compare_lists(self._suffix,
|
|
|
other_version['suffix'],
|
|
|
filler=(4, 0))
|
|
|
if cmp_res != 0:
|
|
|
return cmp_res < 0
|
|
|
return self._revision <= other_version['revision']
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
'''Перегрузка x == y.'''
|
|
|
other_version = self._get_version_value(other)
|
|
|
if other_version is None:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
cmp_res = self._compare_lists(self._value,
|
|
|
other_version['value'])
|
|
|
if cmp_res != 0:
|
|
|
return False
|
|
|
if self._literal != other_version['literal']:
|
|
|
return False
|
|
|
cmp_res = self._compare_lists(self._suffix,
|
|
|
other_version['suffix'],
|
|
|
filler=(4, 0))
|
|
|
if cmp_res != 0:
|
|
|
return False
|
|
|
return self._revision == other_version['revision']
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
'''Перегрузка x != y.'''
|
|
|
return not self.__eq__(other)
|
|
|
|
|
|
def __gt__(self, other):
|
|
|
'''Перегрузка x > y.'''
|
|
|
return not self.__le__(other)
|
|
|
|
|
|
def __ge__(self, other):
|
|
|
'''Перегрузка x >= y.'''
|
|
|
return not self.__lt__(other)
|
|
|
|
|
|
def __hash__(self):
|
|
|
return hash(self._string)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Version: {}>'.format(self._string)
|
|
|
|
|
|
def __str__(self):
|
|
|
return self._string
|
|
|
|
|
|
def __bool__(self):
|
|
|
if self._value == [-1]:
|
|
|
return False
|
|
|
else:
|
|
|
return True
|
|
|
|
|
|
def __rshift__(self, other: tuple) -> bool:
|
|
|
"Проверка нахождения значения переменной в указанном диапазоне."
|
|
|
if (not isinstance(other, tuple) or len(other) != 2
|
|
|
or not isinstance(other[0], str) or not isinstance(other[1], str)):
|
|
|
raise VersionError("Versions range must be tuple of two strings,"
|
|
|
f" not '{type(other)}'")
|
|
|
|
|
|
lequal = other[0].startswith('=')
|
|
|
lversion = Version(other[0].strip('='))
|
|
|
|
|
|
requal = other[0].startswith('=')
|
|
|
rversion = Version(other[1].strip('='))
|
|
|
|
|
|
return (((lequal and self >= lversion)
|
|
|
or (not lequal and self > lversion))
|
|
|
and ((requal and self <= rversion)
|
|
|
or (not requal and self < rversion)))
|
|
|
|
|
|
|
|
|
class ContentsParser(metaclass=Singleton):
|
|
|
def __init__(self):
|
|
|
'''Метод для инициализации парсеров.'''
|
|
|
sym_keyword = Literal('sym')
|
|
|
dir_keyword = Literal('dir')
|
|
|
obj_keyword = Literal('obj')
|
|
|
|
|
|
symlink_arrow = Literal('->')
|
|
|
|
|
|
file_path = Regex(r'\S+')
|
|
|
|
|
|
time_value = Word(nums)
|
|
|
|
|
|
md5 = Word(alphanums)
|
|
|
|
|
|
sym_line = (sym_keyword('type') + file_path('path')
|
|
|
+ symlink_arrow.suppress() + file_path('target')
|
|
|
+ time_value('mtime') + LineEnd().suppress())
|
|
|
|
|
|
dir_line = (dir_keyword('type') + file_path('path')
|
|
|
+ LineEnd().suppress())
|
|
|
|
|
|
obj_line = (obj_keyword('type') + file_path('path')
|
|
|
+ md5('md5') + time_value('mtime') + LineEnd().suppress())
|
|
|
unexpected = SkipTo(LineEnd(), include=True)
|
|
|
|
|
|
self._parser = (dir_line | sym_line |
|
|
|
obj_line | unexpected('unexpected')
|
|
|
).setParseAction(self._parser_method)
|
|
|
|
|
|
def _parser_method(self, parse_result):
|
|
|
if parse_result.getName() == 'unexpected':
|
|
|
return [None]
|
|
|
result_dict = parse_result.asDict()
|
|
|
path = result_dict.pop('path')
|
|
|
value = ({path: result_dict})
|
|
|
return value
|
|
|
|
|
|
def parse(self, contents_text: str):
|
|
|
output_dictionary = OrderedDict()
|
|
|
for tokens, start, end in self._parser.scanString(contents_text):
|
|
|
parse_result = tokens[0]
|
|
|
if parse_result is None:
|
|
|
continue
|
|
|
output_dictionary.update(parse_result)
|
|
|
return output_dictionary
|
|
|
|
|
|
def render(self, contents_dictionary: dict):
|
|
|
file_loader = PackageLoader('calculate', 'utils')
|
|
|
environment = Environment(loader=file_loader,
|
|
|
trim_blocks=True,
|
|
|
lstrip_blocks=True)
|
|
|
template = environment.get_template('contents_template')
|
|
|
text = template.render(document_dictionary=contents_dictionary)
|
|
|
return text
|
|
|
|
|
|
|
|
|
class PackageAtomName:
|
|
|
'''Класс для хранения результата определения пакета. Для определения пакета
|
|
|
использует путь к его pkg директории.'''
|
|
|
def __init__(self, atom_dictionary):
|
|
|
self._package_directory = atom_dictionary['pkg_path']
|
|
|
self._version = atom_dictionary['version']
|
|
|
if self._package_directory is not None:
|
|
|
self._name = self.fullname[:-len(self._version.string)]
|
|
|
else:
|
|
|
self._name = 'None'
|
|
|
if self._package_directory is not None:
|
|
|
self._with_slot = atom_dictionary.get('with_slot', False)
|
|
|
else:
|
|
|
self._with_slot = False
|
|
|
|
|
|
@property
|
|
|
def name(self) -> str:
|
|
|
return self._name
|
|
|
|
|
|
@property
|
|
|
def fullname(self) -> str:
|
|
|
if self._package_directory is None:
|
|
|
return 'None'
|
|
|
return os.path.basename(self._package_directory)
|
|
|
|
|
|
@property
|
|
|
def category(self) -> str:
|
|
|
if self._package_directory is None:
|
|
|
return 'None'
|
|
|
return os.path.basename(os.path.dirname(self._package_directory))
|
|
|
|
|
|
@property
|
|
|
def atom(self) -> str:
|
|
|
if self._package_directory is None:
|
|
|
return 'None'
|
|
|
return "{}/{}".format(self.category, self.fullname)
|
|
|
|
|
|
@property
|
|
|
def version(self) -> Version:
|
|
|
if self._package_directory is None:
|
|
|
return Version()
|
|
|
return self._version
|
|
|
|
|
|
@property
|
|
|
def contents_path(self) -> str:
|
|
|
if self._package_directory is None:
|
|
|
return 'None'
|
|
|
return os.path.join(self._package_directory, 'CONTENTS')
|
|
|
|
|
|
@property
|
|
|
def use_flags(self) -> list:
|
|
|
if self._package_directory is None:
|
|
|
return []
|
|
|
use_path = os.path.join(self._package_directory, 'USE')
|
|
|
try:
|
|
|
return read_file(use_path).strip('\n').split(' ')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read use flags for 'package'"
|
|
|
" parameter: {}".format(self.package_atom))
|
|
|
|
|
|
@property
|
|
|
def pkg_path(self) -> str:
|
|
|
return self._package_directory
|
|
|
|
|
|
@property
|
|
|
def slot(self) -> str:
|
|
|
if self._package_directory is None:
|
|
|
return None
|
|
|
slot_path = os.path.join(self._package_directory, 'SLOT')
|
|
|
try:
|
|
|
return read_file(slot_path).strip('\n')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read slot value for"
|
|
|
" 'package': {}".format(self.package_atom))
|
|
|
|
|
|
@property
|
|
|
def slot_specified(self) -> bool:
|
|
|
return self._with_slot
|
|
|
|
|
|
def __eq__(self, other) -> bool:
|
|
|
if isinstance(other, PackageAtomName):
|
|
|
return self._package_directory == other._package_directory
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def __ne__(self, other) -> bool:
|
|
|
if isinstance(other, PackageAtomName):
|
|
|
return self._package_directory != other._package_directory
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def __bool__(self) -> bool:
|
|
|
if self._package_directory is None:
|
|
|
return True
|
|
|
return bool(self._package_directory)
|
|
|
|
|
|
def __repr__(self) -> bool:
|
|
|
if self._package_directory is None:
|
|
|
return '<PackageAtomName: None>'
|
|
|
return '<PackageAtomName: {}/{}>'.format(self.category,
|
|
|
self.fullname)
|
|
|
|
|
|
def __hash__(self) -> bool:
|
|
|
return hash(self._package_directory)
|
|
|
|
|
|
|
|
|
NonePackage = PackageAtomName({'pkg_path': None, 'version': None})
|
|
|
|
|
|
|
|
|
class PackageAtomParser:
|
|
|
'''Класс для парсинга параметра package, его проверки, а также определения
|
|
|
принадлежности файла пакету.'''
|
|
|
_value = r'(?P<value>\d+(\.\d+)*)'
|
|
|
_literal = r'(?P<literal>[a-z])?'
|
|
|
_suffix = r'(?P<suffix>(_(pre|p|beta|alpha|rc)(\d+)?)+)?'
|
|
|
_revision = r'(?P<revision>-r\d+)?'
|
|
|
_version_pattern = _value + _literal + _suffix + _revision
|
|
|
|
|
|
package_name_pattern =\
|
|
|
fr'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-{_version_pattern})?'
|
|
|
|
|
|
atom_name_pattern = r'''(?P<category>[^\s/]*)/
|
|
|
{0}
|
|
|
(?P<slot>:[^\[\s]*)?
|
|
|
(?P<use_flags>\[\S*(?:,\S*)*\])?'''.format(
|
|
|
package_name_pattern)
|
|
|
|
|
|
atom_regex = re.compile(atom_name_pattern, re.VERBOSE)
|
|
|
package_name_regex = re.compile(package_name_pattern)
|
|
|
version_regex = re.compile(_version_pattern)
|
|
|
|
|
|
def __init__(self, pkg_path='/var/db/pkg',
|
|
|
chroot_path='/'):
|
|
|
self.chroot_path = chroot_path
|
|
|
|
|
|
if chroot_path != '/':
|
|
|
self.pkg_path = join_paths(chroot_path, pkg_path)
|
|
|
else:
|
|
|
self.pkg_path = pkg_path
|
|
|
|
|
|
self.package_atom = ''
|
|
|
self._atom_dictionary = {}
|
|
|
|
|
|
def parse_package_parameter(self, package_atom):
|
|
|
'''Метод для разбора значения package, после разбора инициирует
|
|
|
проверку полученных значений. Возвращает объект PackageAtomName.'''
|
|
|
self.package_atom = package_atom
|
|
|
if isinstance(package_atom, str):
|
|
|
self._atom_dictionary = self.parse_atom_name(package_atom)
|
|
|
elif isinstance(package_atom, dict):
|
|
|
self._atom_dictionary = package_atom
|
|
|
|
|
|
self._check_package_existance()
|
|
|
|
|
|
atom_name_object = PackageAtomName(self._atom_dictionary)
|
|
|
self._atom_dictionary.clear()
|
|
|
return atom_name_object
|
|
|
|
|
|
def _check_package_existance(self, package_atom=''):
|
|
|
'''Метод для проверки существования пакета. Существование пакета
|
|
|
определяется наличием соответствующего CONTENTS файла.'''
|
|
|
if package_atom:
|
|
|
self.parse_package_parameter(package_atom)
|
|
|
return True
|
|
|
else:
|
|
|
# Используем glob-паттерн для поиска.
|
|
|
if self._atom_dictionary['version'] is not None:
|
|
|
full_name = self._atom_dictionary['name'] + '-' +\
|
|
|
self._atom_dictionary['version']._string
|
|
|
else:
|
|
|
full_name = self._atom_dictionary['name']
|
|
|
|
|
|
if self._atom_dictionary['version'] is None:
|
|
|
glob_result = glob.glob(
|
|
|
r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
|
|
|
self.pkg_path,
|
|
|
self._atom_dictionary['category'],
|
|
|
full_name))
|
|
|
else:
|
|
|
glob_result = glob.glob(
|
|
|
r'{0}/{1}/{2}*/CONTENTS'.format(
|
|
|
self.pkg_path,
|
|
|
self._atom_dictionary['category'],
|
|
|
full_name))
|
|
|
|
|
|
if not glob_result:
|
|
|
# Если ничего не нашлось.
|
|
|
raise PackageAtomError("Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
if len(glob_result) == 1:
|
|
|
# Если нашелся один пакет.
|
|
|
pkg_path = os.path.dirname(next(iter(glob_result)))
|
|
|
self._check_slot_value(pkg_path)
|
|
|
self._check_use_flags_value(pkg_path)
|
|
|
|
|
|
parsed_name = self._atom_dictionary['name']
|
|
|
full_name = os.path.basename(pkg_path)
|
|
|
self._atom_dictionary['version'] = Version(
|
|
|
full_name[len(parsed_name):])
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
else:
|
|
|
packages = dict()
|
|
|
# Если подходящих пакетов много -- проверяем по use-флагам,
|
|
|
# слотам и версии, если таковые заданы.
|
|
|
for contents_path in glob_result:
|
|
|
pkg_path = os.path.dirname(contents_path)
|
|
|
try:
|
|
|
self._check_slot_value(pkg_path)
|
|
|
self._check_use_flags_value(pkg_path)
|
|
|
parsed_name = self._atom_dictionary['name']
|
|
|
full_name = os.path.basename(pkg_path)
|
|
|
packages[pkg_path] = Version(
|
|
|
full_name[len(parsed_name):])
|
|
|
except PackageAtomError:
|
|
|
continue
|
|
|
|
|
|
if not packages:
|
|
|
# Если после проверки отсеялись все кандидаты.
|
|
|
raise PackageAtomError(
|
|
|
"Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
if len(packages) == 1:
|
|
|
# Если был найден только один кандидат -- выдаем его.
|
|
|
pkg_path = next(iter(packages.keys()))
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
self._atom_dictionary['version'] = packages[pkg_path]
|
|
|
else:
|
|
|
# Если подходящих пакетов много -- берем старшую версию.
|
|
|
pkg_path = sorted(packages.keys(),
|
|
|
key=lambda path: packages[path])[-1]
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
self._atom_dictionary['version'] = packages[pkg_path]
|
|
|
|
|
|
def _check_slot_value(self, pkg_path):
|
|
|
'''Метод для проверки полученного из параметра package значения slot.
|
|
|
'''
|
|
|
if self._atom_dictionary['slot']:
|
|
|
slot = self._get_slot_value(pkg_path)
|
|
|
|
|
|
if slot != self._atom_dictionary['slot']:
|
|
|
raise PackageAtomError("Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
def _check_use_flags_value(self, pkg_path):
|
|
|
'''Метод для проверки полученных из параметра package значений
|
|
|
use-флагов.'''
|
|
|
if self._atom_dictionary['use_flags']:
|
|
|
use_flags = self._get_use_flags_value(pkg_path)
|
|
|
|
|
|
for use_flag in self._atom_dictionary['use_flags']:
|
|
|
if use_flag not in use_flags:
|
|
|
raise PackageAtomError(
|
|
|
"Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
def _get_slot_value(self, pkg_path):
|
|
|
'''Метод для получения значения slot из файла SLOT.'''
|
|
|
slot_path = os.path.join(pkg_path, 'SLOT')
|
|
|
try:
|
|
|
return read_file(slot_path).strip('\n')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read slot value for"
|
|
|
" 'package': {}".format(self.package_atom))
|
|
|
|
|
|
def _get_use_flags_value(self, pkg_path):
|
|
|
'''Метод для получения списка значений use-флагов из файла USE.'''
|
|
|
use_path = os.path.join(pkg_path, 'USE')
|
|
|
try:
|
|
|
return read_file(use_path).strip('\n').split(' ')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read use flags for 'package'"
|
|
|
" parameter: {}".format(self.package_atom))
|
|
|
|
|
|
def _get_category_packages(self, category):
|
|
|
'''Генератор имен категорий, имеющихся в /var/db/pkg'''
|
|
|
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
|
|
|
category)):
|
|
|
yield path
|
|
|
|
|
|
def get_file_package(self, file_path):
|
|
|
'''Метод для определения пакета, которому принадлежит файл.'''
|
|
|
# Удаляем часть пути соответствующую chroot_path
|
|
|
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
|
|
|
for category in os.listdir(self.pkg_path):
|
|
|
for contents_path in self._get_category_packages(category):
|
|
|
try:
|
|
|
with open(contents_path, 'r') as contents_file:
|
|
|
for file_line in contents_file.readlines():
|
|
|
contents_name = file_line.split(' ')[1].strip()
|
|
|
|
|
|
if contents_name == file_path:
|
|
|
package_path = os.path.dirname(contents_path)
|
|
|
|
|
|
package_name = os.path.basename(package_path)
|
|
|
parsing_result = self.package_name_regex.\
|
|
|
search(package_name)
|
|
|
version = parsing_result.groupdict()['version']
|
|
|
version = Version(version)
|
|
|
|
|
|
package_atom = PackageAtomName(
|
|
|
{'pkg_path': package_path,
|
|
|
'version': version})
|
|
|
return package_atom
|
|
|
except (OSError, IOError):
|
|
|
continue
|
|
|
else:
|
|
|
raise PackageNotFound("The file does not belong to any package")
|
|
|
|
|
|
@classmethod
|
|
|
def parse_atom_name(cls, atom_name: str) -> dict:
|
|
|
parsing_result = cls.atom_regex.search(atom_name)
|
|
|
if (not parsing_result or parsing_result.string != atom_name or
|
|
|
not parsing_result.groupdict()['category'] or
|
|
|
not parsing_result.groupdict()['name']):
|
|
|
raise PackageAtomError("'package' parameter value '{}' is not"
|
|
|
" correct".format(atom_name),
|
|
|
errno=NOTCORRECT)
|
|
|
|
|
|
parsing_result = parsing_result.groupdict()
|
|
|
|
|
|
category = parsing_result['category']
|
|
|
name = parsing_result['name']
|
|
|
|
|
|
if parsing_result['version'] is not None:
|
|
|
version = Version(parsing_result['version'].strip('-'))
|
|
|
else:
|
|
|
version = None
|
|
|
|
|
|
if (parsing_result['slot'] is not None
|
|
|
and parsing_result['slot'] != ':'):
|
|
|
slot = parsing_result['slot'].strip(':')
|
|
|
else:
|
|
|
slot = None
|
|
|
|
|
|
if parsing_result['use_flags'] is not None:
|
|
|
use_flags = [use.strip() for use in
|
|
|
parsing_result['use_flags'].strip().
|
|
|
strip('[]').split(',')]
|
|
|
else:
|
|
|
use_flags = None
|
|
|
|
|
|
atom_dict = {'category': category,
|
|
|
'name': name,
|
|
|
'version': version,
|
|
|
'slot': slot,
|
|
|
'use_flags': use_flags,
|
|
|
'with_slot': slot is not None}
|
|
|
return atom_dict
|
|
|
|
|
|
|
|
|
class Package:
|
|
|
'''Класс для работы с принадлежностью файлов пакетам.'''
|
|
|
re_cfg = re.compile(r'/\._cfg\d{4}_')
|
|
|
|
|
|
def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'):
|
|
|
self.chroot_path = chroot_path
|
|
|
|
|
|
self.contents_file_path = self._get_contents_path(package_atom)
|
|
|
self.package_name = package_atom
|
|
|
self.parser = ContentsParser()
|
|
|
if (chroot_path != '/' and
|
|
|
not self.contents_file_path.startswith(chroot_path)):
|
|
|
self.contents_file_path = join_paths(chroot_path,
|
|
|
self.contents_file_path)
|
|
|
|
|
|
if not os.path.exists(self.contents_file_path):
|
|
|
raise PackageError("Can not find CONTENTS file in path: {}".format(
|
|
|
self.contents_file_path
|
|
|
))
|
|
|
self.contents_dictionary = OrderedDict()
|
|
|
self.read_contents_file()
|
|
|
|
|
|
def _get_contents_path(self, package_atom):
|
|
|
'''Метод для получения из ATOM-названия или готового объекта
|
|
|
PackageAtomName пути к файлу CONTENTS.'''
|
|
|
if isinstance(package_atom, str):
|
|
|
package_atom_parser = PackageAtomParser(
|
|
|
chroot_path=self.chroot_path)
|
|
|
atom_name = package_atom_parser.parse_package_parameter(
|
|
|
package_atom)
|
|
|
return os.path.join(atom_name.pkg_path,
|
|
|
'CONTENTS')
|
|
|
elif isinstance(package_atom, PackageAtomName):
|
|
|
return os.path.join(package_atom.pkg_path,
|
|
|
'CONTENTS')
|
|
|
else:
|
|
|
raise PackageError(
|
|
|
"Incorrect 'package_atom' value: '{}', type: '{}''".
|
|
|
format(package_atom, type(package_atom)))
|
|
|
|
|
|
def remove_cfg_prefix(self, file_name):
|
|
|
'''Метод для удаления префикса ._cfg????_.'''
|
|
|
return self.re_cfg.sub('/', file_name)
|
|
|
|
|
|
def remove_chroot_path(self, file_name):
|
|
|
'''Метод для удаления из пути файла корневого пути, если он не
|
|
|
является /.'''
|
|
|
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
|
|
|
return file_name[len(self.chroot_path):]
|
|
|
else:
|
|
|
return file_name
|
|
|
|
|
|
def read_contents_file(self):
|
|
|
'''Метод для чтения файла CONTENTS.'''
|
|
|
try:
|
|
|
contents_text = read_file(self.contents_file_path)
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
if contents_text:
|
|
|
self.contents_dictionary = self.parser.parse(contents_text)
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def write_contents_file(self):
|
|
|
'''Метод для записи файла CONTENTS.'''
|
|
|
with open(self.contents_file_path, 'w') as contents_file:
|
|
|
contents_text = self.render_contents_file()
|
|
|
contents_file.write(contents_text)
|
|
|
|
|
|
def render_contents_file(self):
|
|
|
'''Метод для получения текста файла CONTENTS.'''
|
|
|
return self.parser.render(self.contents_dictionary)
|
|
|
|
|
|
@property
|
|
|
def files(self):
|
|
|
'''Метод для получения списка путей файлов, имеющихся в CONTENTS-файле
|
|
|
пакета.'''
|
|
|
return list(self.contents_dictionary.keys())
|
|
|
|
|
|
def get_file_type(self, file_path: str) -> str:
|
|
|
'''Метод для получения по пути файла типа, указанного для него в
|
|
|
CONTENTS-файле.'''
|
|
|
file_path = self.remove_chroot_path(file_path)
|
|
|
if file_path in self.contents_dictionary:
|
|
|
return self.contents_dictionary[file_path]['type']
|
|
|
return None
|
|
|
|
|
|
def sort_contents_dictionary(self):
|
|
|
tree = {}
|
|
|
for path in self.contents_dictionary.keys():
|
|
|
path = path.strip('/').split('/')
|
|
|
level = tree
|
|
|
for part in path:
|
|
|
if part not in level:
|
|
|
level[part] = {}
|
|
|
level = level[part]
|
|
|
|
|
|
sorted_contents = OrderedDict()
|
|
|
for path in self._make_paths('/', tree):
|
|
|
sorted_contents[path] = self.contents_dictionary[path]
|
|
|
self.contents_dictionary = sorted_contents
|
|
|
|
|
|
def _make_paths(self, path, level):
|
|
|
paths = []
|
|
|
for part in sorted(level.keys()):
|
|
|
part_path = os.path.join(path, part)
|
|
|
paths.append(part_path)
|
|
|
if level[part]:
|
|
|
paths.extend(self._make_paths(part_path, level[part]))
|
|
|
return paths
|
|
|
|
|
|
def add_dir(self, file_name):
|
|
|
'''Метод для добавления в CONTENTS директорий.'''
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
|
|
|
if (file_name != '/' and
|
|
|
(file_name not in self.contents_dictionary
|
|
|
or self.contents_dictionary[file_name]['type'] != 'dir')):
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
contents_item = OrderedDict({'type': 'dir'})
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_sym(self, file_name, target_path=None, mtime=None):
|
|
|
'''Метод для добавления в CONTENTS символьных ссылок.'''
|
|
|
file_name = self.remove_cfg_prefix(file_name)
|
|
|
|
|
|
real_path = file_name
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
|
|
|
if real_path == file_name:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
|
|
|
if target_path is None:
|
|
|
target_path = read_link(real_path)
|
|
|
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
if mtime is None:
|
|
|
mtime = str(int(os.lstat(real_path).st_mtime))
|
|
|
try:
|
|
|
contents_item = OrderedDict({'type': 'sym',
|
|
|
'target': target_path,
|
|
|
'mtime': mtime})
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_obj(self, file_name, file_md5=None, mtime=None):
|
|
|
'''Метод для добавления в CONTENTS обычных файлов как obj.'''
|
|
|
real_path = file_name
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
file_name = self.remove_cfg_prefix(file_name)
|
|
|
|
|
|
if real_path == file_name:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
|
|
|
if file_md5 is None:
|
|
|
try:
|
|
|
file_text = read_file(real_path).encode()
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
file_md5 = hashlib.md5(file_text).hexdigest()
|
|
|
|
|
|
if mtime is None:
|
|
|
mtime = str(int(os.lstat(real_path).st_mtime))
|
|
|
|
|
|
contents_item = OrderedDict({'type': 'obj',
|
|
|
'md5': file_md5,
|
|
|
'mtime': mtime})
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_file(self, file_name):
|
|
|
'''Метод для добавления в CONTENTS файла любого типа.'''
|
|
|
if file_name != '/':
|
|
|
real_path = file_name
|
|
|
if file_name.startswith(self.chroot_path):
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
else:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
|
|
|
if os.path.isdir(real_path):
|
|
|
self.add_dir(file_name)
|
|
|
elif os.path.islink(real_path):
|
|
|
self.add_sym(file_name)
|
|
|
elif os.path.isfile(real_path):
|
|
|
self.add_obj(file_name)
|
|
|
|
|
|
def remove_obj(self, file_path):
|
|
|
'''Метод для удаления файлов и ссылок.'''
|
|
|
file_path = self.remove_chroot_path(file_path)
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
removed = OrderedDict()
|
|
|
|
|
|
if file_path in self.contents_dictionary:
|
|
|
removed.update({file_path:
|
|
|
self.contents_dictionary.pop(file_path)})
|
|
|
return removed
|
|
|
|
|
|
def remove_dir(self, file_path):
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся
|
|
|
внутри удаляемой директории и самой директории.'''
|
|
|
directory_path = self.remove_chroot_path(file_path)
|
|
|
paths_to_remove = []
|
|
|
removed = OrderedDict()
|
|
|
|
|
|
for file_path in self.contents_dictionary:
|
|
|
if file_path.startswith(directory_path):
|
|
|
paths_to_remove.append(file_path)
|
|
|
|
|
|
for file_path in paths_to_remove:
|
|
|
removed.update({file_path:
|
|
|
self.contents_dictionary.pop(file_path)})
|
|
|
|
|
|
return removed
|
|
|
|
|
|
def remove_file(self, file_path):
|
|
|
file_path = self.remove_chroot_path(file_path)
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
removed = OrderedDict()
|
|
|
|
|
|
if file_path not in self.contents_dictionary:
|
|
|
return
|
|
|
if self.contents_dictionary[file_path]['type'] == 'dir':
|
|
|
removed.update(self.remove_dir(file_path))
|
|
|
else:
|
|
|
removed.update({file_path:
|
|
|
self.contents_dictionary.pop(file_path)})
|
|
|
return removed
|
|
|
|
|
|
def clear_dir(self, file_path):
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся
|
|
|
внутри очищаемой директории.'''
|
|
|
directory_path = self.remove_chroot_path(file_path)
|
|
|
paths_to_remove = []
|
|
|
removed = OrderedDict()
|
|
|
|
|
|
for file_path in self.contents_dictionary:
|
|
|
if file_path == directory_path:
|
|
|
continue
|
|
|
if file_path.startswith(directory_path):
|
|
|
paths_to_remove.append(file_path)
|
|
|
|
|
|
for file_path in paths_to_remove:
|
|
|
removed.update({file_path:
|
|
|
self.contents_dictionary.pop(file_path)})
|
|
|
return removed
|
|
|
|
|
|
def remove_empty_directories(self):
|
|
|
'''Метод для удаления из CONTENTS директорий, которые после удаления
|
|
|
тех или иных файлов больше не находятся на пути к тем файлам, которые
|
|
|
по-прежнему принадлежат пакету.'''
|
|
|
used_directories = set()
|
|
|
removed = OrderedDict()
|
|
|
not_directory_list = [path for path, value in
|
|
|
self.contents_dictionary.items()
|
|
|
if value['type'] != 'dir']
|
|
|
for filepath in not_directory_list:
|
|
|
file_directory = os.path.dirname(filepath)
|
|
|
while file_directory != '/':
|
|
|
used_directories.add(file_directory)
|
|
|
file_directory = os.path.dirname(file_directory)
|
|
|
|
|
|
paths_to_delete = [file_path for file_path, value in
|
|
|
self.contents_dictionary.items()
|
|
|
if value['type'] == 'dir' and
|
|
|
file_path not in used_directories]
|
|
|
|
|
|
for file_path in paths_to_delete:
|
|
|
removed.update({file_path:
|
|
|
self.contents_dictionary.pop(file_path)})
|
|
|
return removed
|
|
|
|
|
|
def get_md5(self, file_path):
|
|
|
'''Метод для получения md5 хэш-суммы указанного файла.'''
|
|
|
try:
|
|
|
file_text = read_file(file_path).encode()
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
file_md5 = hashlib.md5(file_text).hexdigest()
|
|
|
return file_md5
|
|
|
|
|
|
def is_md5_equal(self, file_path, file_md5=None):
|
|
|
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана
|
|
|
в файле CONTENTS.'''
|
|
|
if file_md5 is None:
|
|
|
file_md5 = self.get_md5(file_path)
|
|
|
|
|
|
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
|
|
|
contents_md5 = self.contents_dictionary[file_path]['md5']
|
|
|
return file_md5 == contents_md5
|
|
|
|
|
|
def __contains__(self, file_path):
|
|
|
if self.chroot_path != "/":
|
|
|
if file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
return file_path in self.contents_dictionary
|
|
|
else:
|
|
|
return True
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Package: {}/{}>'.format(self.package_name.category,
|
|
|
self.package_name.fullname)
|