25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1142 lines
47 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
import os
import re
import glob
from collections import OrderedDict
from .files import read_file, read_link, join_paths, FilesError
from typing import (
Generator,
Optional,
Dict,
Tuple,
Union,
List,
Any
)
from calculate.utils.tools import Singleton
import hashlib
class PackageError(Exception):
'''Исключение выбрасываемое при ошибках в объектах Package, работающих
с CONTENTS-файлами пакетов.'''
pass
# Коды ошибок ATOM-парсера.
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
class PackageAtomError(Exception):
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.'''
def __init__(self, message: str = 'Package atom error',
errno: int = DEFAULT):
self.message = message
self.errno = errno
class VersionError(Exception):
'''Исключение выбрасываемое объектами Version.'''
pass
class PackageNotFound(Exception):
'''Специальное исключение выбрасываемое, если не удалось найти пакет, к
которому принадлежит файл.'''
pass
class PackageCreator(type):
"""Метакласс для создания классов пакетов, напоминающий метакласс
Singleton. Следит за тем, чтобы для каждого пакета создавался только один
экземляр объекта Package."""
# TODO Перенести этот функционал в класс Package.
_instances: Dict["PackageAtomName", "Package"] = {}
def __call__(cls, *args, **kwargs):
autosave = False
if 'autosave' in kwargs:
autosave = kwargs['autosave']
if not autosave or isinstance(args[0], str):
return super().__call__(*args, **kwargs)
if args[0] not in cls._instances:
cls._instances[args[0]] = super().__call__(*args, **kwargs)
return cls._instances[args[0]]
@classmethod
def save_all(cls) -> None:
for atom, pkg in cls._instances.items():
pkg.remove_empty_directories()
pkg.write_contents_file()
@classmethod
def clear_instances(cls) -> None:
"""Метод для очистки текущего кэша пакетных объектов.
Необходим при тестировании."""
cls._instances.clear()
class Version:
'''Класс объектов представляющих значения версий.'''
_suffix_order = {'alpha': 0, 'beta': 1, 'pre': 2,
'rc': 3, 'no': 4, 'p': 5}
def __init__(self, version_value: Union["Version", None, int,
float, str] = None):
if version_value is None:
self._string = '-1'
self._value = [-1]
self._literal = None
self._suffix = [(4, 0)]
self._revision = 0
elif isinstance(version_value, Version):
self._string = version_value._string
self._value = version_value._value
self._literal = version_value._literal
self._suffix = version_value._suffix
self._revision = version_value._revision
else:
value = self._get_version_value(version_value)
if value is None:
raise VersionError(
"Can't initialize Version object using '{0}'"
" value with type {1}".format(version_value,
type(version_value)))
self._string = value['string'].strip('-')
self._value = value['value']
self._literal = value['literal']
self._suffix = value['suffix']
self._revision = value['revision']
def _get_version_value(self, version: Union["Version", int, float, str]
) -> Union[dict, None]:
'''Вспомогательный метод для получения значения версии, представленного
в виде списка.'''
if isinstance(version, Version):
version_value = {'string': version._string,
'value': version._value,
'literal': version._literal,
'suffix': version._suffix,
'revision': version._revision}
elif isinstance(version, int):
version_value = {'string': str(int),
'value': [version],
'literal': '',
'suffix': [(4, 0)],
'revision': 0}
elif isinstance(version, float):
version_list = []
version = str(version).split('.')
for version_part in version:
version_list.append(int(version_part.strip()))
version_value = {'string': str(version),
'value': version_list,
'literal': '',
'suffix': (4, 0),
'revision': 0}
elif isinstance(version, str):
parse_result = PackageAtomParser.version_regex.search(
version.strip('-'))
if not parse_result:
return
result_dict = parse_result.groupdict()
version_value = {'string': version}
version_list = []
for version_part in result_dict['value'].split('.'):
version_list.append(int(version_part.strip('-')))
version_value['value'] = version_list
# Парсим литерал, если он есть.
version_value['literal'] = result_dict['literal'] or ''
# Парсим всю совокупность имеющихся суффиксов.
suffixes = result_dict['suffix']
suffix_list = []
if suffixes is not None:
suffixes = suffixes.strip('_')
suffixes = suffixes.split('_')
for suffix in suffixes:
result = re.search(r'([^\d]+)(\d+)?', suffix)
suffix_list.append((self._suffix_order[result.group(1)],
int(result.group(2) or 0)))
else:
suffix_list = [(self._suffix_order['no'], 0)]
version_value['suffix'] = suffix_list
# Парсим ревизию.
if parse_result['revision'] is not None:
version_value['revision'] = int(
parse_result['revision'].strip('-r'))
else:
version_value['revision'] = 0
else:
return
return version_value
def _compare_lists(self, lversion: list, rversion: list, filler: Any = 0
) -> int:
'''Метод для сравнения двух списков, даже если если они не одинаковы
по длине. Возвращает 0, если списки равны, 1 если lversion больше, -1
если lversion меньше.'''
if lversion == rversion:
return 0
for index in range(0, max(len(lversion), len(rversion))):
lvalue = lversion[index] if len(lversion) > index else filler
rvalue = rversion[index] if len(rversion) > index else filler
if lvalue == rvalue:
continue
if lvalue > rvalue:
return 1
else:
return -1
return 0
@property
def string(self):
return self._string
def __lt__(self, other: "Version") -> bool:
'''Перегрузка x < y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value, other_version['value'])
if cmp_res != 0:
return cmp_res < 0
if self._literal != other_version['literal']:
return self._literal < other_version['literal']
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return cmp_res < 0
return self._revision < other_version['revision']
def __le__(self, other: "Version") -> bool:
'''Перегрузка x <= y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value, other_version['value'])
if cmp_res != 0:
return cmp_res < 0
if self._literal != other_version['literal']:
return self._literal < other_version['literal']
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return cmp_res < 0
return self._revision <= other_version['revision']
def __eq__(self, other: "Version", ignore_revision: bool = False) -> bool:
'''Перегрузка x == y.'''
other_version = self._get_version_value(other)
if other_version is None:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
cmp_res = self._compare_lists(self._value,
other_version['value'])
if cmp_res != 0:
return False
if self._literal != other_version['literal']:
return False
cmp_res = self._compare_lists(self._suffix,
other_version['suffix'],
filler=(4, 0))
if cmp_res != 0:
return False
if ignore_revision:
return True
else:
return self._revision == other_version['revision']
def __ne__(self, other: "Version") -> bool:
'''Перегрузка x != y.'''
return not self.__eq__(other)
def __gt__(self, other: "Version") -> bool:
'''Перегрузка x > y.'''
return not self.__le__(other)
def __ge__(self, other: "Version") -> bool:
'''Перегрузка x >= y.'''
return not self.__lt__(other)
def __hash__(self):
return hash(self._string)
def __repr__(self):
return '<Version: {}>'.format(self._string)
def __str__(self):
return self._string
def __bool__(self):
if self._value == [-1]:
return False
else:
return True
def __rshift__(self, other: tuple) -> bool:
"Проверка нахождения значения переменной в указанном диапазоне."
if (not isinstance(other, tuple) or len(other) != 2
or not isinstance(other[0], str) or not isinstance(other[1], str)):
raise VersionError("Versions range must be tuple of two strings,"
f" not '{type(other)}'")
lequal = other[0].startswith('=')
lversion = Version(other[0].strip('='))
requal = other[0].startswith('=')
rversion = Version(other[1].strip('='))
return (((lequal and self >= lversion)
or (not lequal and self > lversion))
and ((requal and self <= rversion)
or (not requal and self < rversion)))
class ContentsParser(metaclass=Singleton):
def __init__(self):
self._parsers = {'dir': ContentsParser._parse_dir,
'sym': ContentsParser._parse_sym,
'obj': ContentsParser._parse_obj}
self._patterns = {'dir': "dir {path}",
'sym': "sym {path} -> {target} {mtime}",
'obj': "obj {path} {md5} {mtime}"}
def parse(self, text: str) -> OrderedDict:
output = OrderedDict()
for line in text.split('\n'):
line = line.strip()
if not line:
continue
parts = line.split()
path, value = self._parsers[parts[0]](parts)
output[path] = value
return output
@staticmethod
def _parse_dir(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'dir'}
@staticmethod
def _parse_obj(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'obj',
'md5': parts[2].strip(),
'mtime': parts[3].strip()}
@staticmethod
def _parse_sym(parts: List[str]) -> Tuple[str, dict]:
return parts[1], {'type': 'sym',
'target': parts[3].strip(),
'mtime': parts[4].strip()}
def render(self, contents_dictionary: OrderedDict) -> str:
lines = []
for path, value in contents_dictionary.items():
lines.append(
self._patterns[value['type']].format(path=path, **value))
lines.append('')
return "\n".join(lines)
class PackageAtomName:
'''Класс для хранения результата определения пакета. Для определения пакета
использует путь к его pkg директории.'''
def __init__(self, atom_dictionary: dict):
self._package_directory = atom_dictionary['pkg_path']
self._version = atom_dictionary['version']
if self._package_directory is not None:
self._name = self.fullname[:-len(self._version.string)]
else:
self._name = 'None'
if self._package_directory is not None:
self._with_slot = atom_dictionary.get('with_slot', False)
else:
self._with_slot = False
@property
def name(self) -> str:
return self._name
@property
def fullname(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.basename(self._package_directory)
@property
def category(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.basename(os.path.dirname(self._package_directory))
@property
def atom(self) -> str:
if self._package_directory is None:
return 'None'
return "{}/{}".format(self.category, self.fullname)
@property
def version(self) -> Version:
if self._package_directory is None:
return Version()
return self._version
@property
def contents_path(self) -> str:
if self._package_directory is None:
return 'None'
return os.path.join(self._package_directory, 'CONTENTS')
@property
def use_flags(self) -> list:
if self._package_directory is None:
return []
use_path = os.path.join(self._package_directory, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags for 'package'"
" parameter: {}".format(self.package_atom))
@property
def pkg_path(self) -> str:
return self._package_directory
@property
def slot(self) -> str:
if self._package_directory is None:
return None
slot_path = os.path.join(self._package_directory, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(self.package_atom))
@property
def slot_specified(self) -> bool:
return self._with_slot
def __eq__(self, other: Any) -> bool:
if isinstance(other, PackageAtomName):
return self._package_directory == other._package_directory
else:
return False
def __ne__(self, other: Any) -> bool:
if isinstance(other, PackageAtomName):
return self._package_directory != other._package_directory
else:
return False
def __bool__(self) -> bool:
if self._package_directory is None:
return True
return bool(self._package_directory)
def __repr__(self) -> bool:
if self._package_directory is None:
return '<PackageAtomName: None>'
return '<PackageAtomName: {}/{}>'.format(self.category,
self.fullname)
def __hash__(self) -> bool:
return hash(self._package_directory)
def __str__(self) -> str:
category_path, name = os.path.split(self._package_directory)
category = os.path.basename(category_path)
return f'{category}/{name}'
NonePackage = PackageAtomName({'pkg_path': None, 'version': None})
def make_version_pattern() -> str:
_value = r'(?P<value>\d+(\.\d+)*)'
_literal = r'(?P<literal>[a-z])?'
_suffix = r'(?P<suffix>(_(pre|p|beta|alpha|rc)(\d+)?)+)?'
_revision = r'(?P<revision>-r\d+)?'
return _value + _literal + _suffix + _revision
class PackageAtomParser:
'''Класс для парсинга параметра package, его проверки, а также определения
принадлежности файла пакету.'''
_version_pattern: str = make_version_pattern()
package_name_pattern: str =\
fr'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-{_version_pattern})?'
atom_name_pattern: str = r'''(?P<condition>[=~><])?
(?P<category>[^\s/]*)/
{0}
(?P<slot>:[^\[\s]*)?
(?P<use_flags>\[\S*(?:\s+\S*)*\])?'''.format(
package_name_pattern)
atom_regex = re.compile(atom_name_pattern, re.VERBOSE)
package_name_regex = re.compile(package_name_pattern)
version_regex = re.compile(_version_pattern)
atom_dict_fields: List[str] = ['category', 'name', 'version', 'slot',
'use_flags', 'with_slot', 'condition']
def __init__(self, pkg_path: str = '/var/db/pkg',
chroot_path: str = '/'):
self.chroot_path: str = chroot_path
self.pkg_path: str
if chroot_path != '/':
self.pkg_path = join_paths(chroot_path, pkg_path)
else:
self.pkg_path = pkg_path
self.package_atom: str = ''
def parse_package_parameter(self, package_atom: Union[str, dict]
) -> PackageAtomName:
'''Метод для разбора значения package, после разбора инициирует
проверку полученных значений. Возвращает объект PackageAtomName.'''
self.package_atom = package_atom
if isinstance(package_atom, str):
atom_dictionary = self.parse_atom_name(package_atom)
atom_dictionary['package_atom'] = package_atom
elif isinstance(package_atom, dict):
atom_dictionary = package_atom
if 'package_atom' not in atom_dictionary:
atom_dictionary['package_atom'] = (
f'{atom_dictionary["category"]}/{atom_dictionary["name"]}')
atom_dictionary = self._check_package_existance(atom_dictionary)
atom_name_object = PackageAtomName(atom_dictionary)
return atom_name_object
def is_package_exists(self, package_atom: Union[str, dict]) -> bool:
try:
self.parse_package_parameter(package_atom)
return True
except PackageAtomError as e:
if e.errno == NOTEXIST:
return False
raise
def _check_package_existance(self, atom_dictionary: dict) -> dict:
'''Метод для проверки существования пакета. Существование пакета
определяется наличием соответствующего CONTENTS файла.'''
# Используем glob-паттерн для поиска.
glob_pattern = r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
self.pkg_path,
atom_dictionary['category'],
atom_dictionary['name'])
glob_result = glob.glob(glob_pattern)
if not glob_result:
# Если ничего не нашлось.
raise PackageAtomError("Package '{}' is not found.".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(glob_result) == 1:
# Если нашелся один пакет.
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
atom_dictionary['version'] = pkg_version
atom_dictionary['pkg_path'] = pkg_path
else:
packages = dict()
# Если подходящих пакетов много -- проверяем по use-флагам,
# слотам и версии, если таковые заданы.
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
packages[pkg_path] = pkg_version
except PackageAtomError:
continue
if not packages:
# Если после проверки отсеялись все кандидаты.
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(packages) == 1:
# Если был найден только один кандидат -- выдаем его.
pkg_path = next(iter(packages.keys()))
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
else:
# Если подходящих пакетов много -- берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
return atom_dictionary
def _check_slot_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученного из параметра package значения slot.
'''
if atom_dictionary['slot']:
slot = self._get_slot_value(pkg_path,
atom_dictionary['package_atom'])
if slot != atom_dictionary['slot']:
raise PackageAtomError("Package '{}' is not found.".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _check_use_flags_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученных из параметра package значений
use-флагов.'''
if atom_dictionary['use_flags']:
use_flags = self._get_use_flags_value(
pkg_path,
atom_dictionary['package_atom'])
for use_flag in atom_dictionary['use_flags']:
if use_flag.startswith('-'):
if use_flag.strip()[1:] not in use_flags:
continue
elif use_flag in use_flags:
continue
raise PackageAtomError("Package '{}' is not found".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _get_slot_value(self, pkg_path: str, package_atom: str) -> str:
'''Метод для получения значения slot из файла SLOT.'''
slot_path = os.path.join(pkg_path, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(package_atom))
def _get_use_flags_value(self, pkg_path: str, package_atom: str) -> list:
'''Метод для получения списка значений use-флагов из файла USE.'''
use_path = os.path.join(pkg_path, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags in atom name: {}".
format(package_atom))
def _get_category_packages(self, category: str) -> str:
'''Генератор имен категорий, имеющихся в /var/db/pkg'''
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
category)):
yield path
def _check_version(self, atom_dictionary: dict, pkg_version: Version
) -> None:
condition = atom_dictionary['condition']
if condition == '=':
condition_result = (atom_dictionary['version']
== pkg_version)
elif condition == '~':
condition_result = atom_dictionary['version'].__eq__(
pkg_version,
ignore_revision=True)
elif condition == '>':
condition_result = atom_dictionary['version'] < pkg_version
elif condition == '<':
condition_result = atom_dictionary['version'] > pkg_version
else:
condition_result = False
if not condition_result:
raise PackageAtomError("Package '{}' is not found".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def get_file_package(self, file_path: str) -> PackageAtomName:
'''Метод для определения пакета, которому принадлежит файл.'''
# Удаляем часть пути соответствующую chroot_path
# TODO предусмотреть кэширование результата.
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
for category in os.listdir(self.pkg_path):
for contents_path in self._get_category_packages(category):
try:
with open(contents_path, 'r') as contents_file:
for file_line in contents_file.readlines():
contents_name = file_line.split(' ')[1].strip()
if contents_name == file_path:
package_path = os.path.dirname(contents_path)
package_name = os.path.basename(package_path)
parsing_result = self.package_name_regex.\
search(package_name)
version = parsing_result.groupdict()['version']
version = Version(version)
package_atom = PackageAtomName(
{'pkg_path': package_path,
'version': version})
return package_atom
except (OSError, IOError):
continue
else:
raise PackageNotFound("The file does not belong to any package")
@classmethod
def parse_atom_name(cls, atom_name: str) -> dict:
parsing_result = cls.atom_regex.search(atom_name)
if (not parsing_result or parsing_result.string != atom_name or
not parsing_result.groupdict()['category'] or
not parsing_result.groupdict()['name']):
raise PackageAtomError("atom name '{}' is not"
" correct".format(atom_name),
errno=NOTCORRECT)
parsing_result = parsing_result.groupdict()
category = parsing_result['category']
name = parsing_result['name']
if parsing_result['version'] is not None:
version = Version(parsing_result['version'].strip('-'))
else:
version = None
if parsing_result['condition'] is not None:
if parsing_result['version'] is None:
raise PackageAtomError(f"Atom name '{atom_name}' is not"
" correct. Version value is missed",
errno=NOTCORRECT)
elif parsing_result['version'] is not None:
parsing_result['condition'] = '='
if (parsing_result['slot'] is not None
and parsing_result['slot'] != ':'):
slot = parsing_result['slot'].strip(':')
else:
slot = None
if parsing_result['use_flags'] is not None:
use_flags = [use.strip() for use in
parsing_result['use_flags'].strip().
strip('[]').split(' ')]
else:
use_flags = None
atom_dict = {'category': category,
'name': name,
'version': version,
'slot': slot,
'use_flags': use_flags,
'with_slot': slot is not None,
'condition': parsing_result['condition']}
return atom_dict
class Package(metaclass=PackageCreator):
'''Класс для работы с принадлежностью файлов пакетам.'''
re_cfg = re.compile(r'/\._cfg\d{4}_')
def __init__(self, package_atom: Union[str, PackageAtomName],
pkg_path: str = '/var/db/pkg',
chroot_path: str = '/',
autosave: bool = False):
self.chroot_path: str = chroot_path
self.contents_file_path = self._get_contents_path(package_atom)
self.package_name = package_atom
self.parser = ContentsParser()
if (chroot_path != '/' and
not self.contents_file_path.startswith(chroot_path)):
self.contents_file_path = join_paths(chroot_path,
self.contents_file_path)
if not os.path.exists(self.contents_file_path):
raise PackageError("Can not find CONTENTS file in path: {}".format(
self.contents_file_path
))
self.contents_dictionary = OrderedDict()
self.read_contents_file()
self.autosave: bool = autosave
def _get_contents_path(self, package_atom: Union[str, PackageAtomName]
) -> str:
'''Метод для получения из ATOM-названия или готового объекта
PackageAtomName пути к файлу CONTENTS.'''
if isinstance(package_atom, str):
package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
atom_name = package_atom_parser.parse_package_parameter(
package_atom)
return os.path.join(atom_name.pkg_path,
'CONTENTS')
elif isinstance(package_atom, PackageAtomName):
return os.path.join(package_atom.pkg_path,
'CONTENTS')
else:
raise PackageError(
"Incorrect 'package_atom' value: '{}', type: '{}''".
format(package_atom, type(package_atom)))
def remove_cfg_prefix(self, file_name: str) -> str:
'''Метод для удаления префикса ._cfg????_.'''
return self.re_cfg.sub('/', file_name)
def remove_chroot_path(self, file_name: str) -> str:
'''Метод для удаления из пути файла корневого пути, если он не
является /.'''
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
return file_name[len(self.chroot_path):]
else:
return file_name
def read_contents_file(self) -> bool:
'''Метод для чтения файла CONTENTS.'''
try:
contents_text = read_file(self.contents_file_path)
except FilesError as error:
raise PackageError(str(error))
if contents_text:
self.contents_dictionary = self.parser.parse(contents_text)
return True
else:
return False
def write_contents_file(self) -> None:
'''Метод для записи файла CONTENTS.'''
with open(self.contents_file_path, 'w') as contents_file:
contents_text = self.render_contents_file()
contents_file.write(contents_text)
def render_contents_file(self) -> str:
'''Метод для получения текста файла CONTENTS.'''
return self.parser.render(self.contents_dictionary)
@property
def files(self) -> List[str]:
'''Метод для получения списка путей файлов, имеющихся в CONTENTS-файле
пакета.'''
return list(self.contents_dictionary.keys())
def get_file_type(self, file_path: str) -> str:
'''Метод для получения по пути файла типа, указанного для него в
CONTENTS-файле.'''
file_path = self.remove_chroot_path(file_path)
if file_path in self.contents_dictionary:
return self.contents_dictionary[file_path]['type']
return None
def sort_contents_dictionary(self) -> None:
'''Метод для сортировки словаря, полученного в результате разбора и
изменения CONTENTS-файла.'''
tree = {}
for path in self.contents_dictionary.keys():
path = path.strip('/').split('/')
level = tree
for part in path:
if part not in level:
level[part] = {}
level = level[part]
sorted_contents = OrderedDict()
for path in self._make_paths('/', tree):
sorted_contents[path] = self.contents_dictionary[path]
self.contents_dictionary = sorted_contents
# def _make_paths(self, path: str, level: dict) -> List[str]:
# paths = []
# for part in sorted(level.keys()):
# part_path = os.path.join(path, part)
# paths.append(part_path)
# if level[part]:
# paths.extend(self._make_paths(part_path, level[part]))
# return paths
def _make_paths(self, path: str,
level: dict) -> Generator[str, None, None]:
'''Генератор используемый для преобразования дерева сортировки путей в
последовательность путей.'''
for part in sorted(level.keys()):
part_path = os.path.join(path, part)
yield part_path
if level[part]:
yield from self._make_paths(part_path, level[part])
def add_dir(self, file_name: str) -> None:
'''Метод для добавления в CONTENTS директорий.'''
file_name = self.remove_chroot_path(file_name)
if (file_name != '/' and
(file_name not in self.contents_dictionary
or self.contents_dictionary[file_name]['type'] != 'dir')):
self.add_dir(os.path.dirname(file_name))
contents_item = OrderedDict({'type': 'dir'})
self.contents_dictionary[file_name] = contents_item
def add_sym(self, file_name: str, target_path: Optional[str] = None,
mtime: Optional[str] = None) -> None:
'''Метод для добавления в CONTENTS символьных ссылок.'''
real_path = file_name
file_name = self.remove_cfg_prefix(file_name)
file_name = self.remove_chroot_path(file_name)
if not real_path.startswith(self.chroot_path):
real_path = join_paths(self.chroot_path, real_path)
if target_path is None:
target_path = read_link(real_path)
self.add_dir(os.path.dirname(file_name))
if mtime is None:
mtime = str(int(os.lstat(real_path).st_mtime))
try:
contents_item = OrderedDict({'type': 'sym',
'target': target_path,
'mtime': mtime})
except FilesError as error:
raise PackageError(str(error))
self.contents_dictionary[file_name] = contents_item
def add_obj(self, file_name: str, file_md5: Optional[str] = None,
mtime: Optional[str] = None) -> None:
'''Метод для добавления в CONTENTS обычных файлов как obj.'''
real_path = file_name
file_name = self.remove_chroot_path(file_name)
file_name = self.remove_cfg_prefix(file_name)
if real_path == file_name:
real_path = join_paths(self.chroot_path, file_name)
self.add_dir(os.path.dirname(file_name))
if file_md5 is None:
try:
file_text = read_file(real_path, binary=True)
except FilesError as error:
raise PackageError(str(error))
file_md5 = hashlib.md5(file_text).hexdigest()
if mtime is None:
mtime = str(int(os.lstat(real_path).st_mtime))
contents_item = OrderedDict({'type': 'obj',
'md5': file_md5,
'mtime': mtime})
self.contents_dictionary[file_name] = contents_item
def add_file(self, file_name: str) -> None:
'''Метод для добавления в CONTENTS файла любого типа.'''
if file_name != '/':
real_path = file_name
if file_name.startswith(self.chroot_path):
file_name = self.remove_chroot_path(file_name)
else:
real_path = join_paths(self.chroot_path, file_name)
if os.path.isdir(real_path):
self.add_dir(file_name)
elif os.path.islink(real_path):
self.add_sym(file_name)
elif os.path.isfile(real_path):
self.add_obj(file_name)
def remove_obj(self, file_path: str) -> OrderedDict:
'''Метод для удаления файлов и ссылок.'''
file_path = self.remove_chroot_path(file_path)
file_path = self.remove_cfg_prefix(file_path)
removed = OrderedDict()
if file_path in self.contents_dictionary:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_dir(self, file_path: str) -> OrderedDict:
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри удаляемой директории и самой директории.'''
directory_path = self.remove_chroot_path(file_path)
paths_to_remove = []
removed = OrderedDict()
for file_path in self.contents_dictionary:
if file_path.startswith(directory_path):
paths_to_remove.append(file_path)
for file_path in paths_to_remove:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_file(self, file_path: str) -> OrderedDict:
file_path = self.remove_chroot_path(file_path)
file_path = self.remove_cfg_prefix(file_path)
removed = OrderedDict()
if file_path not in self.contents_dictionary:
return
if self.contents_dictionary[file_path]['type'] == 'dir':
removed.update(self.remove_dir(file_path))
else:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def clear_dir(self, file_path: str) -> OrderedDict:
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри очищаемой директории.'''
directory_path = self.remove_chroot_path(file_path)
paths_to_remove = []
removed = OrderedDict()
for file_path in self.contents_dictionary:
if file_path == directory_path:
continue
if file_path.startswith(directory_path):
paths_to_remove.append(file_path)
for file_path in paths_to_remove:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def remove_empty_directories(self) -> OrderedDict:
'''Метод для удаления из CONTENTS директорий, которые после удаления
тех или иных файлов больше не находятся на пути к тем файлам, которые
по-прежнему принадлежат пакету.'''
used_directories = set()
removed = OrderedDict()
not_directory_list = [path for path, value in
self.contents_dictionary.items()
if value['type'] != 'dir']
for filepath in not_directory_list:
file_directory = os.path.dirname(filepath)
while file_directory != '/':
used_directories.add(file_directory)
file_directory = os.path.dirname(file_directory)
paths_to_delete = [file_path for file_path, value in
self.contents_dictionary.items()
if value['type'] == 'dir' and
file_path not in used_directories]
for file_path in paths_to_delete:
removed.update({file_path:
self.contents_dictionary.pop(file_path)})
return removed
def get_md5(self, file_path: str) -> str:
'''Метод для получения md5 хэш-суммы указанного файла.'''
try:
file_text = read_file(file_path).encode()
except FilesError as error:
raise PackageError(str(error))
file_md5 = hashlib.md5(file_text).hexdigest()
return file_md5
def get_link_target(self, link_path: str) -> str:
try:
return read_link(link_path)
except FilesError as error:
raise PackageError(str(error))
def check_contents_data(self, file_path: str, file_md5: str = None,
sym_target: str = None, symlink: bool = False
) -> bool:
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана
в файле CONTENTS.'''
contents_path = file_path
if self.chroot_path != "/" and contents_path.startswith(
self.chroot_path):
contents_path = contents_path[len(self.chroot_path):]
contents_path = self.remove_cfg_prefix(contents_path)
if (not symlink and
self.contents_dictionary[contents_path]["type"] != "sym"):
if file_md5 is None:
file_md5 = self.get_md5(file_path)
contents_md5 = self.contents_dictionary[contents_path]['md5']
return file_md5 == contents_md5
elif (symlink and
self.contents_dictionary[contents_path]["type"] == "sym"):
if sym_target is None:
sym_target = self.get_link_target(file_path)
return (sym_target ==
self.contents_dictionary[contents_path]["target"])
else:
return False
def __contains__(self, file_path: str) -> bool:
if self.chroot_path != "/":
if file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
file_path = self.remove_cfg_prefix(file_path)
return file_path in self.contents_dictionary
def __repr__(self) -> str:
return '<Package: {}/{}>'.format(self.package_name.category,
self.package_name.fullname)
# def __del__(self) -> None:
# if self.autosave and os.path.exists(self.contents_file_path):
# self.remove_empty_directories()
# self.write_contents_file()