|
|
@ -4,8 +4,13 @@ import os |
|
|
|
import re |
|
|
|
import glob |
|
|
|
from collections import OrderedDict |
|
|
|
from typing import Union |
|
|
|
from .files import read_file, read_link, join_paths, FilesError |
|
|
|
from typing import ( |
|
|
|
Generator, |
|
|
|
Union, |
|
|
|
List, |
|
|
|
Any |
|
|
|
) |
|
|
|
from pyparsing import ( |
|
|
|
Literal, |
|
|
|
Regex, |
|
|
@ -32,7 +37,8 @@ DEFAULT, NOTEXIST, NOTCORRECT = range(3) |
|
|
|
|
|
|
|
class PackageAtomError(Exception): |
|
|
|
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.''' |
|
|
|
def __init__(self, message='Package atom error', errno=DEFAULT): |
|
|
|
def __init__(self, message: str = 'Package atom error', |
|
|
|
errno: int = DEFAULT): |
|
|
|
self.message = message |
|
|
|
self.errno = errno |
|
|
|
|
|
|
@ -151,7 +157,8 @@ class Version: |
|
|
|
|
|
|
|
return version_value |
|
|
|
|
|
|
|
def _compare_lists(self, lversion, rversion, filler=0): |
|
|
|
def _compare_lists(self, lversion: list, rversion: list, filler: Any = 0 |
|
|
|
) -> int: |
|
|
|
'''Метод для сравнения двух списков, даже если если они не одинаковы |
|
|
|
по длине. Возвращает 0, если списки равны, 1 если lversion больше, -1 |
|
|
|
если lversion меньше.''' |
|
|
@ -323,7 +330,7 @@ class ContentsParser(metaclass=Singleton): |
|
|
|
value = ({path: result_dict}) |
|
|
|
return value |
|
|
|
|
|
|
|
def parse(self, contents_text: str): |
|
|
|
def parse(self, contents_text: str) -> OrderedDict: |
|
|
|
output_dictionary = OrderedDict() |
|
|
|
for tokens, start, end in self._parser.scanString(contents_text): |
|
|
|
parse_result = tokens[0] |
|
|
@ -332,7 +339,7 @@ class ContentsParser(metaclass=Singleton): |
|
|
|
output_dictionary.update(parse_result) |
|
|
|
return output_dictionary |
|
|
|
|
|
|
|
def render(self, contents_dictionary: dict): |
|
|
|
def render(self, contents_dictionary: dict) -> str: |
|
|
|
file_loader = PackageLoader('calculate', 'utils') |
|
|
|
environment = Environment(loader=file_loader, |
|
|
|
trim_blocks=True, |
|
|
@ -345,7 +352,7 @@ class ContentsParser(metaclass=Singleton): |
|
|
|
class PackageAtomName: |
|
|
|
'''Класс для хранения результата определения пакета. Для определения пакета |
|
|
|
использует путь к его pkg директории.''' |
|
|
|
def __init__(self, atom_dictionary): |
|
|
|
def __init__(self, atom_dictionary: dict): |
|
|
|
self._package_directory = atom_dictionary['pkg_path'] |
|
|
|
self._version = atom_dictionary['version'] |
|
|
|
if self._package_directory is not None: |
|
|
@ -421,13 +428,13 @@ class PackageAtomName: |
|
|
|
def slot_specified(self) -> bool: |
|
|
|
return self._with_slot |
|
|
|
|
|
|
|
def __eq__(self, other) -> bool: |
|
|
|
def __eq__(self, other: Any) -> bool: |
|
|
|
if isinstance(other, PackageAtomName): |
|
|
|
return self._package_directory == other._package_directory |
|
|
|
else: |
|
|
|
return False |
|
|
|
|
|
|
|
def __ne__(self, other) -> bool: |
|
|
|
def __ne__(self, other: Any) -> bool: |
|
|
|
if isinstance(other, PackageAtomName): |
|
|
|
return self._package_directory != other._package_directory |
|
|
|
else: |
|
|
@ -456,19 +463,23 @@ class PackageAtomName: |
|
|
|
NonePackage = PackageAtomName({'pkg_path': None, 'version': None}) |
|
|
|
|
|
|
|
|
|
|
|
class PackageAtomParser: |
|
|
|
'''Класс для парсинга параметра package, его проверки, а также определения |
|
|
|
принадлежности файла пакету.''' |
|
|
|
def make_version_pattern() -> str: |
|
|
|
_value = r'(?P<value>\d+(\.\d+)*)' |
|
|
|
_literal = r'(?P<literal>[a-z])?' |
|
|
|
_suffix = r'(?P<suffix>(_(pre|p|beta|alpha|rc)(\d+)?)+)?' |
|
|
|
_revision = r'(?P<revision>-r\d+)?' |
|
|
|
_version_pattern = _value + _literal + _suffix + _revision |
|
|
|
return _value + _literal + _suffix + _revision |
|
|
|
|
|
|
|
|
|
|
|
package_name_pattern =\ |
|
|
|
class PackageAtomParser: |
|
|
|
'''Класс для парсинга параметра package, его проверки, а также определения |
|
|
|
принадлежности файла пакету.''' |
|
|
|
_version_pattern: str = make_version_pattern() |
|
|
|
|
|
|
|
package_name_pattern: str =\ |
|
|
|
fr'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-{_version_pattern})?' |
|
|
|
|
|
|
|
atom_name_pattern = r'''(?P<condition>[=~><])? |
|
|
|
atom_name_pattern: str = r'''(?P<condition>[=~><])? |
|
|
|
(?P<category>[^\s/]*)/ |
|
|
|
{0} |
|
|
|
(?P<slot>:[^\[\s]*)? |
|
|
@ -479,19 +490,20 @@ class PackageAtomParser: |
|
|
|
package_name_regex = re.compile(package_name_pattern) |
|
|
|
version_regex = re.compile(_version_pattern) |
|
|
|
|
|
|
|
atom_dict_fields = ['category', 'name', 'version', 'slot', 'use_flags', |
|
|
|
'with_slot', 'condition'] |
|
|
|
atom_dict_fields: List[str] = ['category', 'name', 'version', 'slot', |
|
|
|
'use_flags', 'with_slot', 'condition'] |
|
|
|
|
|
|
|
def __init__(self, pkg_path='/var/db/pkg', |
|
|
|
chroot_path='/'): |
|
|
|
self.chroot_path = chroot_path |
|
|
|
def __init__(self, pkg_path: str = '/var/db/pkg', |
|
|
|
chroot_path: str = '/'): |
|
|
|
self.chroot_path: str = chroot_path |
|
|
|
|
|
|
|
self.pkg_path: str |
|
|
|
if chroot_path != '/': |
|
|
|
self.pkg_path = join_paths(chroot_path, pkg_path) |
|
|
|
else: |
|
|
|
self.pkg_path = pkg_path |
|
|
|
|
|
|
|
self.package_atom = '' |
|
|
|
self.package_atom: str = '' |
|
|
|
|
|
|
|
def parse_package_parameter(self, package_atom: Union[str, dict] |
|
|
|
) -> PackageAtomName: |
|
|
@ -533,8 +545,7 @@ class PackageAtomParser: |
|
|
|
|
|
|
|
if not glob_result: |
|
|
|
# Если ничего не нашлось. |
|
|
|
raise PackageAtomError("Package from 'package' parameter value" |
|
|
|
" '{}' does not exist".format( |
|
|
|
raise PackageAtomError("Package '{}' is not found.".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
|
|
|
@ -605,8 +616,7 @@ class PackageAtomParser: |
|
|
|
atom_dictionary['package_atom']) |
|
|
|
|
|
|
|
if slot != atom_dictionary['slot']: |
|
|
|
raise PackageAtomError("Package from 'package' parameter value" |
|
|
|
" '{}' does not exist".format( |
|
|
|
raise PackageAtomError("Package '{}' is not found.".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
|
|
|
@ -625,11 +635,9 @@ class PackageAtomParser: |
|
|
|
continue |
|
|
|
elif use_flag in use_flags: |
|
|
|
continue |
|
|
|
raise PackageAtomError( |
|
|
|
"Package from 'package' parameter value" |
|
|
|
" '{}' does not exist".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
raise PackageAtomError("Package '{}' is not found".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
|
|
|
|
def _get_slot_value(self, pkg_path: str, package_atom: str) -> str: |
|
|
|
'''Метод для получения значения slot из файла SLOT.''' |
|
|
@ -646,8 +654,8 @@ class PackageAtomParser: |
|
|
|
try: |
|
|
|
return read_file(use_path).strip('\n').split(' ') |
|
|
|
except FilesError: |
|
|
|
raise PackageAtomError("could not read use flags for 'package'" |
|
|
|
" parameter: {}".format(package_atom)) |
|
|
|
raise PackageAtomError("could not read use flags in atom name: {}". |
|
|
|
format(package_atom)) |
|
|
|
|
|
|
|
def _get_category_packages(self, category: str) -> str: |
|
|
|
'''Генератор имен категорий, имеющихся в /var/db/pkg''' |
|
|
@ -655,7 +663,8 @@ class PackageAtomParser: |
|
|
|
category)): |
|
|
|
yield path |
|
|
|
|
|
|
|
def _check_version(self, atom_dictionary: dict, pkg_version: Version): |
|
|
|
def _check_version(self, atom_dictionary: dict, pkg_version: Version |
|
|
|
) -> None: |
|
|
|
condition = atom_dictionary['condition'] |
|
|
|
|
|
|
|
if condition == '=': |
|
|
@ -675,11 +684,9 @@ class PackageAtomParser: |
|
|
|
condition_result = False |
|
|
|
|
|
|
|
if not condition_result: |
|
|
|
raise PackageAtomError( |
|
|
|
"Package from 'package' parameter value" |
|
|
|
" '{}' does not exist".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
raise PackageAtomError("Package '{}' is not found".format( |
|
|
|
atom_dictionary['package_atom']), |
|
|
|
errno=NOTEXIST) |
|
|
|
|
|
|
|
def get_file_package(self, file_path: str) -> PackageAtomName: |
|
|
|
'''Метод для определения пакета, которому принадлежит файл.''' |
|
|
@ -718,7 +725,7 @@ class PackageAtomParser: |
|
|
|
if (not parsing_result or parsing_result.string != atom_name or |
|
|
|
not parsing_result.groupdict()['category'] or |
|
|
|
not parsing_result.groupdict()['name']): |
|
|
|
raise PackageAtomError("'package' parameter value '{}' is not" |
|
|
|
raise PackageAtomError("atom name '{}' is not" |
|
|
|
" correct".format(atom_name), |
|
|
|
errno=NOTCORRECT) |
|
|
|
|
|
|
@ -734,9 +741,8 @@ class PackageAtomParser: |
|
|
|
|
|
|
|
if parsing_result['condition'] is not None: |
|
|
|
if parsing_result['version'] is None: |
|
|
|
raise PackageAtomError("'package' parameter value" |
|
|
|
f" '{atom_name}' is not correct." |
|
|
|
" Version value is missed", |
|
|
|
raise PackageAtomError(f"Atom name '{atom_name}' is not" |
|
|
|
" correct. Version value is missed", |
|
|
|
errno=NOTCORRECT) |
|
|
|
elif parsing_result['version'] is not None: |
|
|
|
parsing_result['condition'] = '=' |
|
|
@ -768,8 +774,10 @@ class Package: |
|
|
|
'''Класс для работы с принадлежностью файлов пакетам.''' |
|
|
|
re_cfg = re.compile(r'/\._cfg\d{4}_') |
|
|
|
|
|
|
|
def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'): |
|
|
|
self.chroot_path = chroot_path |
|
|
|
def __init__(self, package_atom: Union[str, PackageAtomName], |
|
|
|
pkg_path: str = '/var/db/pkg', |
|
|
|
chroot_path: str = '/'): |
|
|
|
self.chroot_path: str = chroot_path |
|
|
|
|
|
|
|
self.contents_file_path = self._get_contents_path(package_atom) |
|
|
|
self.package_name = package_atom |
|
|
@ -786,7 +794,8 @@ class Package: |
|
|
|
self.contents_dictionary = OrderedDict() |
|
|
|
self.read_contents_file() |
|
|
|
|
|
|
|
def _get_contents_path(self, package_atom): |
|
|
|
def _get_contents_path(self, package_atom: Union[str, PackageAtomName] |
|
|
|
) -> str: |
|
|
|
'''Метод для получения из ATOM-названия или готового объекта |
|
|
|
PackageAtomName пути к файлу CONTENTS.''' |
|
|
|
if isinstance(package_atom, str): |
|
|
@ -804,11 +813,11 @@ class Package: |
|
|
|
"Incorrect 'package_atom' value: '{}', type: '{}''". |
|
|
|
format(package_atom, type(package_atom))) |
|
|
|
|
|
|
|
def remove_cfg_prefix(self, file_name): |
|
|
|
def remove_cfg_prefix(self, file_name: str) -> str: |
|
|
|
'''Метод для удаления префикса ._cfg????_.''' |
|
|
|
return self.re_cfg.sub('/', file_name) |
|
|
|
|
|
|
|
def remove_chroot_path(self, file_name): |
|
|
|
def remove_chroot_path(self, file_name: str) -> str: |
|
|
|
'''Метод для удаления из пути файла корневого пути, если он не |
|
|
|
является /.''' |
|
|
|
if self.chroot_path != '/' and file_name.startswith(self.chroot_path): |
|
|
@ -853,7 +862,9 @@ class Package: |
|
|
|
return self.contents_dictionary[file_path]['type'] |
|
|
|
return None |
|
|
|
|
|
|
|
def sort_contents_dictionary(self): |
|
|
|
def sort_contents_dictionary(self) -> None: |
|
|
|
'''Метод для сортировки словаря, полученного в результате разбора и |
|
|
|
изменения CONTENTS-файла.''' |
|
|
|
tree = {} |
|
|
|
for path in self.contents_dictionary.keys(): |
|
|
|
path = path.strip('/').split('/') |
|
|
@ -866,18 +877,29 @@ class Package: |
|
|
|
sorted_contents = OrderedDict() |
|
|
|
for path in self._make_paths('/', tree): |
|
|
|
sorted_contents[path] = self.contents_dictionary[path] |
|
|
|
|
|
|
|
self.contents_dictionary = sorted_contents |
|
|
|
|
|
|
|
def _make_paths(self, path, level): |
|
|
|
paths = [] |
|
|
|
# def _make_paths(self, path: str, level: dict) -> List[str]: |
|
|
|
# paths = [] |
|
|
|
# for part in sorted(level.keys()): |
|
|
|
# part_path = os.path.join(path, part) |
|
|
|
# paths.append(part_path) |
|
|
|
# if level[part]: |
|
|
|
# paths.extend(self._make_paths(part_path, level[part])) |
|
|
|
# return paths |
|
|
|
|
|
|
|
def _make_paths(self, path: str, |
|
|
|
level: dict) -> Generator[str, None, None]: |
|
|
|
'''Генератор используемый для преобразования дерева сортировки путей в |
|
|
|
последовательность путей.''' |
|
|
|
for part in sorted(level.keys()): |
|
|
|
part_path = os.path.join(path, part) |
|
|
|
paths.append(part_path) |
|
|
|
yield part_path |
|
|
|
if level[part]: |
|
|
|
paths.extend(self._make_paths(part_path, level[part])) |
|
|
|
return paths |
|
|
|
yield from self._make_paths(part_path, level[part]) |
|
|
|
|
|
|
|
def add_dir(self, file_name): |
|
|
|
def add_dir(self, file_name: str) -> None: |
|
|
|
'''Метод для добавления в CONTENTS директорий.''' |
|
|
|
file_name = self.remove_chroot_path(file_name) |
|
|
|
|
|
|
@ -888,7 +910,8 @@ class Package: |
|
|
|
contents_item = OrderedDict({'type': 'dir'}) |
|
|
|
self.contents_dictionary[file_name] = contents_item |
|
|
|
|
|
|
|
def add_sym(self, file_name, target_path=None, mtime=None): |
|
|
|
def add_sym(self, file_name: str, target_path: Union[str, None] = None, |
|
|
|
mtime: Union[str, None] = None) -> None: |
|
|
|
'''Метод для добавления в CONTENTS символьных ссылок.''' |
|
|
|
real_path = file_name |
|
|
|
|
|
|
@ -913,7 +936,8 @@ class Package: |
|
|
|
|
|
|
|
self.contents_dictionary[file_name] = contents_item |
|
|
|
|
|
|
|
def add_obj(self, file_name, file_md5=None, mtime=None): |
|
|
|
def add_obj(self, file_name: str, file_md5: Union[str, None] = None, |
|
|
|
mtime: Union[str, None] = None) -> None: |
|
|
|
'''Метод для добавления в CONTENTS обычных файлов как obj.''' |
|
|
|
real_path = file_name |
|
|
|
file_name = self.remove_chroot_path(file_name) |
|
|
@ -938,7 +962,7 @@ class Package: |
|
|
|
'mtime': mtime}) |
|
|
|
self.contents_dictionary[file_name] = contents_item |
|
|
|
|
|
|
|
def add_file(self, file_name): |
|
|
|
def add_file(self, file_name: str) -> None: |
|
|
|
'''Метод для добавления в CONTENTS файла любого типа.''' |
|
|
|
if file_name != '/': |
|
|
|
real_path = file_name |
|
|
@ -954,7 +978,7 @@ class Package: |
|
|
|
elif os.path.isfile(real_path): |
|
|
|
self.add_obj(file_name) |
|
|
|
|
|
|
|
def remove_obj(self, file_path): |
|
|
|
def remove_obj(self, file_path: str) -> OrderedDict: |
|
|
|
'''Метод для удаления файлов и ссылок.''' |
|
|
|
file_path = self.remove_chroot_path(file_path) |
|
|
|
file_path = self.remove_cfg_prefix(file_path) |
|
|
@ -965,7 +989,7 @@ class Package: |
|
|
|
self.contents_dictionary.pop(file_path)}) |
|
|
|
return removed |
|
|
|
|
|
|
|
def remove_dir(self, file_path): |
|
|
|
def remove_dir(self, file_path: str) -> OrderedDict: |
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся |
|
|
|
внутри удаляемой директории и самой директории.''' |
|
|
|
directory_path = self.remove_chroot_path(file_path) |
|
|
@ -982,7 +1006,7 @@ class Package: |
|
|
|
|
|
|
|
return removed |
|
|
|
|
|
|
|
def remove_file(self, file_path): |
|
|
|
def remove_file(self, file_path: str) -> OrderedDict: |
|
|
|
file_path = self.remove_chroot_path(file_path) |
|
|
|
file_path = self.remove_cfg_prefix(file_path) |
|
|
|
removed = OrderedDict() |
|
|
@ -996,7 +1020,7 @@ class Package: |
|
|
|
self.contents_dictionary.pop(file_path)}) |
|
|
|
return removed |
|
|
|
|
|
|
|
def clear_dir(self, file_path): |
|
|
|
def clear_dir(self, file_path: str) -> OrderedDict: |
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся |
|
|
|
внутри очищаемой директории.''' |
|
|
|
directory_path = self.remove_chroot_path(file_path) |
|
|
@ -1039,7 +1063,7 @@ class Package: |
|
|
|
self.contents_dictionary.pop(file_path)}) |
|
|
|
return removed |
|
|
|
|
|
|
|
def get_md5(self, file_path): |
|
|
|
def get_md5(self, file_path: str) -> str: |
|
|
|
'''Метод для получения md5 хэш-суммы указанного файла.''' |
|
|
|
try: |
|
|
|
file_text = read_file(file_path).encode() |
|
|
@ -1049,14 +1073,15 @@ class Package: |
|
|
|
file_md5 = hashlib.md5(file_text).hexdigest() |
|
|
|
return file_md5 |
|
|
|
|
|
|
|
def get_link_target(self, link_path): |
|
|
|
def get_link_target(self, link_path: str) -> str: |
|
|
|
try: |
|
|
|
return read_link(link_path) |
|
|
|
except FilesError as error: |
|
|
|
raise PackageError(str(error)) |
|
|
|
|
|
|
|
def check_contents_data(self, file_path, file_md5=None, |
|
|
|
sym_target=None, symlink=False): |
|
|
|
def check_contents_data(self, file_path: str, file_md5: str = None, |
|
|
|
sym_target: str = None, symlink: bool = False |
|
|
|
) -> bool: |
|
|
|
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана |
|
|
|
в файле CONTENTS.''' |
|
|
|
contents_path = file_path |
|
|
@ -1080,13 +1105,13 @@ class Package: |
|
|
|
else: |
|
|
|
return False |
|
|
|
|
|
|
|
def __contains__(self, file_path): |
|
|
|
def __contains__(self, file_path: str) -> bool: |
|
|
|
if self.chroot_path != "/": |
|
|
|
if file_path.startswith(self.chroot_path): |
|
|
|
file_path = file_path[len(self.chroot_path):] |
|
|
|
file_path = self.remove_cfg_prefix(file_path) |
|
|
|
return file_path in self.contents_dictionary |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
def __repr__(self) -> str: |
|
|
|
return '<Package: {}/{}>'.format(self.package_name.category, |
|
|
|
self.package_name.fullname) |