You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1141 lines
47 KiB

# vim: fileencoding=utf-8
#
import os
import re
import glob
from collections import OrderedDict
from .files import read_file, read_link, join_paths, FilesError
from typing import (
Generator,
Optional,
Dict,
Tuple,
Union,
List,
Any
)
from calculate.utils.tools import Singleton
import hashlib
class PackageError(Exception):
'''Исключение выбрасываемое при ошибках в объектах Package, работающих
с CONTENTS-файлами пакетов.'''
pass
# Коды ошибок ATOM-парсера.
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
class PackageAtomError(Exception):
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.'''
def __init__(self, message: str = 'Package atom error',
errno: int = DEFAULT):
self.message = message
self.errno = errno
class VersionError(Exception):
'''Исключение выбрасываемое объектами Version.'''
pass
class PackageNotFound(Exception):
'''Специальное исключение выбрасываемое, если не удалось найти пакет, к
которому принадлежит файл.'''
pass
class PackageCreator(type):
"""Метакласс для создания классов пакетов, напоминающий метакласс
Singleton. Следит за тем, чтобы для каждого пакета создавался только один
экземляр объекта Package."""
# TODO Перенести этот функционал в класс Package.
_instances: Dict["PackageAtomName", "Package"] = {}
def __call__(cls, *args, **kwargs):
autosave = False
if 'autosave' in kwargs:
autosave = kwargs['autosave']
if not autosave or isinstance(args[0], str):
return super().__call__(*args, **kwargs)
if args[0] not in cls._instances:
cls._instances[args[0]] = super().__call__(*args, **kwargs)
return cls._instances[args[0]]
@classmethod
def save_all(cls) -> None:
for atom, pkg in cls._instances.items():
pkg.remove_empty_directories()
pkg.write_contents_file()
@classmethod
def clear_instances(cls) -> None:
"""Метод для очистки текущего кэша пакетных объектов.
Необходим при тестировании."""
cls._instances.clear()
class Version:
'''Класс объектов представляющих значения версий.'''
_suffix_order = {'alpha': 0, 'beta': 1, 'pre': 2,
'rc': 3, 'no': 4, 'p': 5}
def __init__(self, version_value: Union["Version", None, int,
float, str] = None):
if version_value is None:
self._string = '-1'
self._value = [-1]
self._literal = None
self._suffix = [(4, 0)]
self._revision = 0
elif isinstance(version_value, Version):
self._string = version_value._string
self._value = version_value._value
self._literal = version_value._literal
self._suffix = version_value._suffix
self._revision = version_value._revision
else:
value = self._get_version_value(version_value)
if value is None:
raise VersionError(
"Can't initialize Version object using '{0}'"
" value with type {1}".format(version_value,
type(version_value)))
self._string = value['string'].strip('-')
self._value = value['value']
self._literal = value['literal']
self._suffix = value['suffix']
self._revision = value['revision']
def _get_version_value(self, version: Union["Version", int, float, str]
) -> Union[dict, None]:
'''Вспомогательный метод для получения значения версии, представленного
в виде списка.'''
if isinstance(version, Version):
version_value = {'string': version._string,
'value': version._value,
'literal': version._literal,
'suffix': version._suffix,
'revision': version._revision}
elif isinstance(version, int):
version_value = {'string': str(int),
'value': [version],
'literal': '',
'suffix': [(4, 0)],
'revision': 0}
elif isinstance(version, float):
version_list = []
version = str(version).split('.')
for version_part in version:
version_list.append(int(version_part.strip()))
version_value = {'string': str(version),
'value': version_list,
'literal': '',
'suffix': (4, 0),
'revision': 0}
elif isinstance(version, str):
parse_result = PackageAtomParser.version_regex.search(
version.strip('-'))
if not parse_result:
return
result_dict = parse_result.groupdict()
version_value = {'string': version}
version_list = []
for version_part in result_dict['value'].split('.'):
version_list.append(int(version_part.strip('-')))
version_value['value'] = version_list
# Парсим литерал, если он есть.
version_value['literal'] = result_dict['literal'] or ''
# Парсим всю совокупность имеющихся суффиксов.
suffixes = result_dict['suffix']
suffix_list = []
if suffixes is not None:
suffixes = suffixes.strip('_')
suffixes = suffixes.split('_')
for suffix in suffixes:
result = re.search(r'([^\d]+)(\d+)?', suffix)
suffix_list.append((self._suffix_order[result.group(1)],
int(result.group(2) or 0)))
else:
suffix_list = [(self._suffix_order['no'], 0)]
version_value['suffix'] = suffix_list
# Парсим ревизию.
if parse_result['revision'] is not None:
version_value['revision'] = int(
parse_result['revision'].strip('-r'))
else:
version_value['revision'] = 0
else:
return
return version_value
def _compare_lists(self, lversion: list, rversion: list, filler: Any = 0
) -> int:
'''Метод для сравнения двух списков, даже если если они не одинаковы
по длине. Возвращает 0, если списки равны, 1 если lversion больше, -1
если lversion меньше.'''
if lversion == rversion:
return 0
for index in range(0, max(len(lversion), len(rversion))):
lvalue = lversion[index] if len(lversion) > index else filler
rvalue = rversion[index] if len(rversion) > index else filler
if lvalue == rvalue:
continue
if lvalue > rvalue:
return 1
else:
return -1
return 0
@property
def string(self):
return self._string
def __lt__(self, other: "Version") -> bool:
'''Перегрузка x < y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value, other_version['value'])
if cmp_res != 0:
return cmp_res < 0
if self._literal != other_version['literal']:
return self._literal < other_version['literal']
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return cmp_res < 0
return self._revision < other_version['revision']
def __le__(self, other: "Version") -> bool:
'''Перегрузка x <= y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value, other_version['value'])
if cmp_res != 0:
return cmp_res < 0
if self._literal != other_version['literal']:
return self._literal < other_version['literal']
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return cmp_res < 0
return self._revision <= other_version['revision']
def __eq__(self, other: "Version", ignore_revision: bool = False) -> bool:
'''Перегрузка x == y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value,
other_version['value'])
if cmp_res != 0:
return False
if self._literal != other_version['literal']:
return False
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return False
if ignore_revision:
return True
else:
return self._revision == other_version['revision']
def __ne__(self, other: "Version") -> bool:
'''Перегрузка x != y.'''
return not self.__eq__(other)
def __gt__(self, other: "Version") -> bool:
'''Перегрузка x > y.'''
return not self.__le__(other)
def __ge__(self, other: "Version") -> bool:
'''Перегрузка x >= y.'''
return not self.__lt__(other)
def __hash__(self):
return hash(self._string)
def __repr__(self):
return '<Version: {}>'.format(self._string)
def __str__(self):
return self._string
def __bool__(self):
if self._value == [-1]:
return False
else:
return True
def __rshift__(self, other: tuple) -> bool:
"Проверка нахождения значения переменной в указанном диапазоне."
if (not isinstance(other, tuple) or len(other) != 2
or not isinstance(other[0], str) or not isinstance(other[1], str)):
raise VersionError("Versions range must be tuple of two strings,"
f" not '{type(other)}'")
lequal = other[0].startswith('=')
lversion = Version(other[0].strip('='))
requal = other[0].startswith('=')
rversion = Version(other[1].strip('='))
return (((lequal and self >= lversion)
or (not lequal and self > lversion))
and ((requal and self <= rversion)
or (not requal and self < rversion)))
class ContentsParser(metaclass=Singleton):
def __init__(self):
self._parsers = {'dir': ContentsParser._parse_dir,
'sym': ContentsParser._parse_sym,
'obj': ContentsParser._parse_obj}
self._patterns = {'dir': "dir {path}",
'sym': "sym {path} -> {target} {mtime}",
'obj': "obj {path} {md5} {mtime}"}
def parse(self, text: str) -> OrderedDict:
output = OrderedDict()
for line in text.split('\n'):
line = line.strip()
if not line:
continue
parts = line.split()
path, value = self._parsers[parts[0]](parts)
output[path] = value
return output
@staticmethod
def _parse_dir(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'dir'}
@staticmethod
def _parse_obj(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'obj',
'md5': parts[2].strip(),
'mtime': parts[3].strip()}
@staticmethod
def _parse_sym(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'sym',
'target': parts[3].strip(),
'mtime': parts[4].strip()}
def render(self, contents_dictionary: OrderedDict) -> str:
lines = []
for path, value in contents_dictionary.items():
lines.append(
self._patterns[value['type']].format(path=path, **value))
lines.append('')
return "\n".join(lines)
class PackageAtomName:
'''Класс для хранения результата определения пакета. Для определения пакета
использует путь к его pkg директории.'''
def __init__(self, atom_dictionary: dict):
self._package_directory = atom_dictionary['pkg_path']
self._version = atom_dictionary['version']
if self._package_directory is not None:
self._name = self.fullname[:-len(self._version.string)]
else:
self._name = 'None'
if self._package_directory is not None:
self._with_slot = atom_dictionary.get('with_slot', False)
else:
self._with_slot = False
@property
def name(self) -> str:
return self._name
@property
def fullname(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.basename(self._package_directory)
@property
def category(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.basename(os.path.dirname(self._package_directory))
@property
def atom(self) -> str:
if self._package_directory is None:
return 'None'
return "{}/{}".format(self.category, self.fullname)
@property
def version(self) -> Version:
if self._package_directory is None:
return Version()
return self._version
@property
def contents_path(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.join(self._package_directory, 'CONTENTS')
@property
def use_flags(self) -> list:
if self._package_directory is None:
return []
use_path = os.path.join(self._package_directory, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags for 'package'"
" parameter: {}".format(self.package_atom))
@property
def pkg_path(self) -> str:
return self._package_directory
@property
def slot(self) -> str:
if self._package_directory is None:
return None
slot_path = os.path.join(self._package_directory, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(self.package_atom))
@property
def slot_specified(self) -> bool:
return self._with_slot
def __eq__(self, other: Any) -> bool:
if isinstance(other, PackageAtomName):
return self._package_directory == other._package_directory
else:
return False
def __ne__(self, other: Any) -> bool:
if isinstance(other, PackageAtomName):
return self._package_directory != other._package_directory
else:
return False
def __bool__(self) -> bool:
if self._package_directory is None:
return True
return bool(self._package_directory)
def __repr__(self) -> bool:
if self._package_directory is None:
return '<PackageAtomName: None>'
return '<PackageAtomName: {}/{}>'.format(self.category,
self.fullname)
def __hash__(self) -> bool:
return hash(self._package_directory)
def __str__(self) -> str:
category_path, name = os.path.split(self._package_directory)
category = os.path.basename(category_path)
return f'{category}/{name}'
NonePackage = PackageAtomName({'pkg_path': None, 'version': None})
def make_version_pattern() -> str:
_value = r'(?P<value>\d+(\.\d+)*)'
_literal = r'(?P<literal>[a-z])?'
_suffix = r'(?P<suffix>(_(pre|p|beta|alpha|rc)(\d+)?)+)?'
_revision = r'(?P<revision>-r\d+)?'
return _value + _literal + _suffix + _revision
class PackageAtomParser:
'''Класс для парсинга параметра package, его проверки, а также определения
принадлежности файла пакету.'''
_version_pattern: str = make_version_pattern()
package_name_pattern: str =\
fr'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-{_version_pattern})?'
atom_name_pattern: str = r'''(?P<condition>[=~><])?
(?P<category>[^\s/]*)/
{0}
(?P<slot>:[^\[\s]*)?
(?P<use_flags>\[\S*(?:\s+\S*)*\])?'''.format(
package_name_pattern)
atom_regex = re.compile(atom_name_pattern, re.VERBOSE)
package_name_regex = re.compile(package_name_pattern)
version_regex = re.compile(_version_pattern)
atom_dict_fields: List[str] = ['category', 'name', 'version', 'slot',
'use_flags', 'with_slot', 'condition']
def __init__(self, pkg_path: str = '/var/db/pkg',
chroot_path: str = '/'):
self.chroot_path: str = chroot_path
self.pkg_path: str
if chroot_path != '/':
self.pkg_path = join_paths(chroot_path, pkg_path)
else:
self.pkg_path = pkg_path
self.package_atom: str = ''
def parse_package_parameter(self, package_atom: Union[str, dict]
) -> PackageAtomName:
'''Метод для разбора значения package, после разбора инициирует
проверку полученных значений. Возвращает объект PackageAtomName.'''
self.package_atom = package_atom
if isinstance(package_atom, str):
atom_dictionary = self.parse_atom_name(package_atom)
atom_dictionary['package_atom'] = package_atom
elif isinstance(package_atom, dict):
atom_dictionary = package_atom
if 'package_atom' not in atom_dictionary:
atom_dictionary['package_atom'] = (
f'{atom_dictionary["category"]}/{atom_dictionary["name"]}')
atom_dictionary = self._check_package_existance(atom_dictionary)
atom_name_object = PackageAtomName(atom_dictionary)
return atom_name_object
def is_package_exists(self, package_atom: Union[str, dict]) -> bool:
try:
self.parse_package_parameter(package_atom)
return True
except PackageAtomError as e:
if e.errno == NOTEXIST:
return False
raise
def _check_package_existance(self, atom_dictionary: dict) -> dict:
'''Метод для проверки существования пакета. Существование пакета
определяется наличием соответствующего CONTENTS файла.'''
# Используем glob-паттерн для поиска.
glob_pattern = r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
self.pkg_path,
atom_dictionary['category'],
atom_dictionary['name'])
glob_result = glob.glob(glob_pattern)
if not glob_result:
# Если ничего не нашлось.
raise PackageAtomError("Package '{}' is not found.".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(glob_result) == 1:
# Если нашелся один пакет.
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
atom_dictionary['version'] = pkg_version
atom_dictionary['pkg_path'] = pkg_path
else:
packages = dict()
# Если подходящих пакетов много -- проверяем по use-флагам,
# слотам и версии, если таковые заданы.
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
packages[pkg_path] = pkg_version
except PackageAtomError:
continue
if not packages:
# Если после проверки отсеялись все кандидаты.
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(packages) == 1:
# Если был найден только один кандидат -- выдаем его.
pkg_path = next(iter(packages.keys()))
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
else:
# Если подходящих пакетов много -- берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
return atom_dictionary
def _check_slot_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученного из параметра package значения slot.
'''
if atom_dictionary['slot']:
slot = self._get_slot_value(pkg_path,
atom_dictionary['package_atom'])
if slot != atom_dictionary['slot']:
raise PackageAtomError("Package '{}' is not found.".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _check_use_flags_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученных из параметра package значений
use-флагов.'''
if atom_dictionary['use_flags']:
use_flags = self._get_use_flags_value(
pkg_path,
atom_dictionary['package_atom'])
for use_flag in atom_dictionary['use_flags']:
if use_flag.startswith('-'):
if use_flag.strip()[1:] not in use_flags:
continue
elif use_flag in use_flags:
continue
raise PackageAtomError("Package '{}' is not found".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _get_slot_value(self, pkg_path: str, package_atom: str) -> str:
'''Метод для получения значения slot из файла SLOT.'''
slot_path = os.path.join(pkg_path, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(package_atom))
def _get_use_flags_value(self, pkg_path: str, package_atom: str) -> list:
'''Метод для получения списка значений use-флагов из файла USE.'''
use_path = os.path.join(pkg_path, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags in atom name: {}".
format(package_atom))
def _get_category_packages(self, category: str) -> str:
'''Генератор имен категорий, имеющихся в /var/db/pkg'''
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
category)):
yield path
def _check_version(self, atom_dictionary: dict, pkg_version: Version
) -> None:
condition = atom_dictionary['condition']
if condition == '=':
condition_result = (atom_dictionary['version']
== pkg_version)
elif condition == '~':
condition_result = atom_dictionary['version'].__eq__(
pkg_version,
ignore_revision=True)
elif condition == '>':
condition_result = atom_dictionary['version'] < pkg_version
elif condition == '<':
condition_result = atom_dictionary['version'] > pkg_version
else:
condition_result = False
if not condition_result:
raise PackageAtomError("Package '{}' is not found".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def get_file_package(self, file_path: str) -> PackageAtomName:
'''Метод для определения пакета, которому принадлежит файл.'''
# Удаляем часть пути соответствующую chroot_path
# TODO предусмотреть кэширование результата.
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
for category in os.listdir(self.pkg_path):
for contents_path in self._get_category_packages(category):
try:
with open(contents_path, 'r') as contents_file:
for file_line in contents_file.readlines():
contents_name = file_line.split(' ')[1].strip()
if contents_name == file_path:
package_path = os.path.dirname(contents_path)
package_name = os.path.basename(package_path)
parsing_result = self.package_name_regex.\
search(package_name)
version = parsing_result.groupdict()['version']
version = Version(version)
package_atom = PackageAtomName(
{'pkg_path': package_path,
'version': version})
return package_atom
except (OSError, IOError):
continue
else:
raise PackageNotFound("The file does not belong to any package")
@classmethod
def parse_atom_name(cls, atom_name: str) -> dict:
parsing_result = cls.atom_regex.search(atom_name)
if (not parsing_result or parsing_result.string != atom_name or
not parsing_result.groupdict()['category'] or
not parsing_result.groupdict()['name']):
raise PackageAtomError("atom name '{}' is not"
" correct".format(atom_name),
errno=NOTCORRECT)
parsing_result = parsing_result.groupdict()
category = parsing_result['category']
name = parsing_result['name']
if parsing_result['version'] is not None:
version = Version(parsing_result['version'].strip('-'))
else:
version = None
if parsing_result['condition'] is not None:
if parsing_result['version'] is None:
raise PackageAtomError(f"Atom name '{atom_name}' is not"
" correct. Version value is missed",
errno=NOTCORRECT)
elif parsing_result['version'] is not None:
parsing_result['condition'] = '='
if (parsing_result['slot'] is not None
and parsing_result['slot'] != ':'):
slot = parsing_result['slot'].strip(':')
else:
slot = None
if parsing_result['use_flags'] is not None:
use_flags = [use.strip() for use in
parsing_result['use_flags'].strip().
strip('[]').split(' ')]
else:
use_flags = None
atom_dict = {'category': category,
'name': name,
'version': version,
'slot': slot,
'use_flags': use_flags,
'with_slot': slot is not None,
'condition': parsing_result['condition']}
return atom_dict
class Package(metaclass=PackageCreator):
'''Класс для работы с принадлежностью файлов пакетам.'''
re_cfg = re.compile(r'/\._cfg\d{4}_')
def __init__(self, package_atom: Union[str, PackageAtomName],
pkg_path: str = '/var/db/pkg',
chroot_path: str = '/',
autosave: bool = False):
self.chroot_path: str = chroot_path
self.contents_file_path = self._get_contents_path(package_atom)
self.package_name = package_atom
self.parser = ContentsParser()
if (chroot_path != '/' and
not self.contents_file_path.startswith(chroot_path)):
self.contents_file_path = join_paths(chroot_path,
self.contents_file_path)
if not os.path.exists(self.contents_file_path):
raise PackageError("Can not find CONTENTS file in path: {}".format(
self.contents_file_path
))
self.contents_dictionary = OrderedDict()
self.read_contents_file()
self.autosave: bool = autosave
def _get_contents_path(self, package_atom: Union[str, PackageAtomName]
) -> str:
'''Метод для получения из ATOM-названия или готового объекта
PackageAtomName пути к файлу CONTENTS.'''
if isinstance(package_atom, str):
package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
atom_name = package_atom_parser.parse_package_parameter(
package_atom)
return os.path.join(atom_name.pkg_path,
'CONTENTS')
elif isinstance(package_atom, PackageAtomName):
return os.path.join(package_atom.pkg_path,
'CONTENTS')
else:
raise PackageError(
"Incorrect 'package_atom' value: '{}', type: '{}''".
format(package_atom, type(package_atom)))
def remove_cfg_prefix(self, file_name: str) -> str:
'''Метод для удаления префикса ._cfg????_.'''
return self.re_cfg.sub('/', file_name)
def remove_chroot_path(self, file_name: str) -> str:
'''Метод для удаления из пути файла корневого пути, если он не
является /.'''
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
return file_name[len(self.chroot_path):]
else:
return file_name
def read_contents_file(self) -> bool:
'''Метод для чтения файла CONTENTS.'''
try:
contents_text = read_file(self.contents_file_path)
except FilesError as error:
raise PackageError(str(error))
if contents_text:
self.contents_dictionary = self.parser.parse(contents_text)
return True
else:
return False
def write_contents_file(self) -> None:
'''Метод для записи файла CONTENTS.'''
with open(self.contents_file_path, 'w') as contents_file:
contents_text = self.render_contents_file()
contents_file.write(contents_text)
def render_contents_file(self) -> str:
'''Метод для получения текста файла CONTENTS.'''
return self.parser.render(self.contents_dictionary)
@property
def files(self) -> List[str]:
'''Метод для получения списка путей файлов, имеющихся в CONTENTS-файле
пакета.'''
return list(self.contents_dictionary.keys())
def get_file_type(self, file_path: str) -> str:
'''Метод для получения по пути файла типа, указанного для него в
CONTENTS-файле.'''
file_path = self.remove_chroot_path(file_path)
if file_path in self.contents_dictionary:
return self.contents_dictionary[file_path]['type']
return None
def sort_contents_dictionary(self) -> None:
'''Метод для сортировки словаря, полученного в результате разбора и
изменения CONTENTS-файла.'''
tree = {}
for path in self.contents_dictionary.keys():
path = path.strip('/').split('/')
level = tree
for part in path:
if part not in level:
level[part] = {}
level = level[part]
sorted_contents = OrderedDict()
for path in self._make_paths('/', tree):
sorted_contents[path] = self.contents_dictionary[path]
self.contents_dictionary = sorted_contents
# def _make_paths(self, path: str, level: dict) -> List[str]:
# paths = []
# for part in sorted(level.keys()):
# part_path = os.path.join(path, part)
# paths.append(part_path)
# if level[part]:
# paths.extend(self._make_paths(part_path, level[part]))
# return paths
def _make_paths(self, path: str,
level: dict) -> Generator[str, None, None]:
'''Генератор используемый для преобразования дерева сортировки путей в
последовательность путей.'''
for part in sorted(level.keys()):
part_path = os.path.join(path, part)
yield part_path
if level[part]:
yield from self._make_paths(part_path, level[part])
def add_dir(self, file_name: str) -> None:
'''Метод для добавления в CONTENTS директорий.'''
file_name = self.remove_chroot_path(file_name)
if (file_name != '/' and
(file_name not in self.contents_dictionary
or self.contents_dictionary[file_name]['type'] != 'dir')):
self.add_dir(os.path.dirname(file_name))
contents_item = OrderedDict({'type': 'dir'})
self.contents_dictionary[file_name] = contents_item
def add_sym(self, file_name: str, target_path: Optional[str] = None,
mtime: Optional[str] = None) -> None:
'''Метод для добавления в CONTENTS символьных ссылок.'''
real_path = file_name
file_name = self.remove_cfg_prefix(file_name)
file_name = self.remove_chroot_path(file_name)
if not real_path.startswith(self.chroot_path):
real_path = join_paths(self.chroot_path, real_path)
if target_path is None:
target_path = read_link(real_path)
self.add_dir(os.path.dirname(file_name))
if mtime is None:
mtime = str(int(os.lstat(real_path).st_mtime))
try:
contents_item = OrderedDict({'type': 'sym',
'target': target_path,
'mtime': mtime})
except FilesError as error:
raise PackageError(str(error))
self.contents_dictionary[file_name] = contents_item
def add_obj(self, file_name: str, file_md5: Optional[str] = None,
mtime: Optional[str] = None) -> None:
'''Метод для добавления в CONTENTS обычных файлов как obj.'''
real_path = file_name
file_name = self.remove_chroot_path(file_name)
file_name = self.remove_cfg_prefix(file_name)
if real_path == file_name:
real_path = join_paths(self.chroot_path, file_name)
self.add_dir(os.path.dirname(file_name))
if file_md5 is None:
try:
file_text = read_file(real_path, binary=True)
except FilesError as error:
raise PackageError(str(error))
file_md5 = hashlib.md5(file_text).hexdigest()
if mtime is None:
mtime = str(int(os.lstat(real_path).st_mtime))
contents_item = OrderedDict({'type': 'obj',
'md5': file_md5,
'mtime': mtime})
self.contents_dictionary[file_name] = contents_item
def add_file(self, file_name: str) -> None:
'''Метод для добавления в CONTENTS файла любого типа.'''
if file_name != '/':
real_path = file_name
if file_name.startswith(self.chroot_path):
file_name = self.remove_chroot_path(file_name)
else:
real_path = join_paths(self.chroot_path, file_name)
if os.path.isdir(real_path):
self.add_dir(file_name)
elif os.path.islink(real_path):
self.add_sym(file_name)
elif os.path.isfile(real_path):
self.add_obj(file_name)
def remove_obj(self, file_path: str) -> OrderedDict:
'''Метод для удаления файлов и ссылок.'''
file_path = self.remove_chroot_path(file_path)
file_path = self.remove_cfg_prefix(file_path)
removed = OrderedDict()
if file_path in self.contents_dictionary:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_dir(self, file_path: str) -> OrderedDict:
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри удаляемой директории и самой директории.'''
directory_path = self.remove_chroot_path(file_path)
paths_to_remove = []
removed = OrderedDict()
for file_path in self.contents_dictionary:
if file_path.startswith(directory_path):
paths_to_remove.append(file_path)
for file_path in paths_to_remove:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_file(self, file_path: str) -> OrderedDict:
file_path = self.remove_chroot_path(file_path)
file_path = self.remove_cfg_prefix(file_path)
removed = OrderedDict()
if file_path not in self.contents_dictionary:
return
if self.contents_dictionary[file_path]['type'] == 'dir':
removed.update(self.remove_dir(file_path))
else:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def clear_dir(self, file_path: str) -> OrderedDict:
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри очищаемой директории.'''
directory_path = self.remove_chroot_path(file_path)
paths_to_remove = []
removed = OrderedDict()
for file_path in self.contents_dictionary:
if file_path == directory_path:
continue
if file_path.startswith(directory_path):
paths_to_remove.append(file_path)
for file_path in paths_to_remove:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_empty_directories(self) -> OrderedDict:
'''Метод для удаления из CONTENTS директорий, которые после удаления
тех или иных файлов больше не находятся на пути к тем файлам, которые
по-прежнему принадлежат пакету.'''
used_directories = set()
removed = OrderedDict()
not_directory_list = [path for path, value in
self.contents_dictionary.items()
if value['type'] != 'dir']
for filepath in not_directory_list:
file_directory = os.path.dirname(filepath)
while file_directory != '/':
used_directories.add(file_directory)
file_directory = os.path.dirname(file_directory)
paths_to_delete = [file_path for file_path, value in
self.contents_dictionary.items()
if value['type'] == 'dir' and
file_path not in used_directories]
for file_path in paths_to_delete:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def get_md5(self, file_path: str) -> str:
'''Метод для получения md5 хэш-суммы указанного файла.'''
try:
file_text = read_file(file_path).encode()
except FilesError as error:
raise PackageError(str(error))
file_md5 = hashlib.md5(file_text).hexdigest()
return file_md5
def get_link_target(self, link_path: str) -> str:
try:
return read_link(link_path)
except FilesError as error:
raise PackageError(str(error))
def check_contents_data(self, file_path: str, file_md5: str = None,
sym_target: str = None, symlink: bool = False
) -> bool:
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана
в файле CONTENTS.'''
contents_path = file_path
if self.chroot_path != "/" and contents_path.startswith(
self.chroot_path):
contents_path = contents_path[len(self.chroot_path):]
contents_path = self.remove_cfg_prefix(contents_path)
if (not symlink and
self.contents_dictionary[contents_path]["type"] != "sym"):
if file_md5 is None:
file_md5 = self.get_md5(file_path)
contents_md5 = self.contents_dictionary[contents_path]['md5']
return file_md5 == contents_md5
elif (symlink and
self.contents_dictionary[contents_path]["type"] == "sym"):
if sym_target is None:
sym_target = self.get_link_target(file_path)
return (sym_target ==
self.contents_dictionary[contents_path]["target"])
else:
return False
def __contains__(self, file_path: str) -> bool:
if self.chroot_path != "/":
if file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
file_path = self.remove_cfg_prefix(file_path)
return file_path in self.contents_dictionary
def __repr__(self) -> str:
return '<Package: {}/{}>'.format(self.package_name.category,
self.package_name.fullname)
# def __del__(self) -> None:
# if self.autosave and os.path.exists(self.contents_file_path):
# self.remove_empty_directories()
# self.write_contents_file()