# vim: fileencoding=utf-8 # import os import re import glob from collections import OrderedDict from ..templates.format.contents_format import ContentsFormat from .files import read_file, read_link, join_paths, FilesError import hashlib import operator class PackageError(Exception): '''Исключение выбрасываемое при ошибках в объектах Package, работающих с CONTENTS-файлами пакетов.''' pass # Коды ошибок ATOM-парсера. DEFAULT, NOTEXIST, NOTCORRECT = range(3) class PackageAtomError(Exception): '''Исключение выбрасываемое при ошибках разбора ATOM-названий.''' def __init__(self, message='Package atom error', errno=DEFAULT, atom_name=None): self.message = message self.errno = errno self.atom = atom_name class VersionError(Exception): '''Исключение выбрасываемое объектами Version.''' pass class PackageNotFound(Exception): '''Специальное исключение выбрасываемое, если не удалось найти пакет, к которому принадлежит файл.''' pass class Version: '''Класс для работы со значениями версий.''' def __init__(self, version_value=None): if version_value is None: self._version_string = '-1' self._version_value = [-1] elif isinstance(version_value, Version): self._version_string = version_value._version_string self._version_value = version_value._version_value else: value = self._get_version_value(version_value) if not value: raise VersionError( "Can't initialize Version object using '{0}'" " value with type {1}".format(version_value, type(version_value))) if isinstance(version_value, str): self._version_string = version_value.strip('-') else: self._version_string = str(version_value) self._version_value = value def _get_version_value(self, version): '''Вспомогательный метод для получения значения версии, представленного в виде списка.''' if isinstance(version, Version): return version._version_value elif isinstance(version, int): version_value = [str(version)] elif isinstance(version, float): version_value = [] version = str(version).split('.') for version_part in version: version_value.append(int(version_part.strip())) elif isinstance(version, str): version = version.strip('-') version_value = [] if '-' in version: version = version.split('-')[0] if '_' in version: version = version.split('_')[0] for version_part in version.split('.'): if version_part.isdigit(): version_part = int(version_part) version_value.append(version_part) else: return False else: return False return version_value def _use_compare_operation(self, compare_operator, other_value): '''Вспомогательный метод для реализации различных операций сравнения. ''' version_value = self._version_value[:] other_value_length = len(other_value) version_value_length = len(version_value) if other_value_length < version_value_length: for counter in range(version_value_length - other_value_length): other_value.append(0) elif version_value_length < other_value_length: for counter in range(other_value_length - version_value_length): version_value.append(0) for lvalue, rvalue in zip(version_value, other_value): if lvalue == rvalue: continue if compare_operator(lvalue, rvalue): return True else: return False else: if (compare_operator != operator.lt and compare_operator != operator.gt and compare_operator != operator.ne): return True else: return False def __lt__(self, other): '''Перегрузка x < y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.lt, other_value) def __le__(self, other): '''Перегрузка x <= y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.le, other_value) def __eq__(self, other): '''Перегрузка x == y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.eq, other_value) def __ne__(self, other): '''Перегрузка x != y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.ne, other_value) def __gt__(self, other): '''Перегрузка x > y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.gt, other_value) def __ge__(self, other): '''Перегрузка x >= y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.ge, other_value) def __hash__(self): return hash(self._version_string) def __repr__(self): return ''.format(self._version_string) def __str__(self): return self._version_string def __bool__(self): if self._version_value == [-1]: return False else: return True class PackageAtomName: '''Класс для хранения результата определения пакета. Для определения пакета использует путь к его pkg директории.''' def __init__(self, atom_dictionary, parser=None): self._package_directory = atom_dictionary['pkg_path'] self._version = atom_dictionary['version'] if self._package_directory is None: self._name = 'None' else: self._name = atom_dictionary.get('name', os.path.basename( self._package_directory)) self._parser = parser @property def name(self): if self._package_directory is None: return 'None' return self._name @property def fullname(self): if self._package_directory is None: return 'None' return os.path.basename(self._package_directory) @property def category(self): if self._package_directory is None: return 'None' return os.path.basename(os.path.dirname(self._package_directory)) @property def atom(self): if self._package_directory is None: return 'None' return "{}/{}".format(self.category, self.fullname) @property def version(self): if self._package_directory is None: return 'None' return self._version @property def use_flags(self): if self._package_directory is None: return [] use_path = os.path.join(self._package_directory, 'USE') try: return read_file(use_path).strip('\n').split(' ') except FilesError: raise PackageAtomError("could not read use flags for 'package'" " parameter: {}".format(self.package_atom), atom_name=self.package_atom) @property def pkg_path(self): return self._package_directory @property def slot(self): if self._package_directory is None: return None slot_path = os.path.join(self._package_directory, 'SLOT') try: return read_file(slot_path).strip('\n') except FilesError: raise PackageAtomError("could not read slot value for" " 'package': {}".format(self.package_atom), atom_name=self.package_atom) def __eq__(self, other): print('EQ') print('other:', other) if isinstance(other, PackageAtomName): result = (self._package_directory == other._package_directory) print('result:', result) return result elif isinstance(other, str) and self._parser: try: other_atom = self._parser.parse_package_parameter(other) print('OTHER ATOM:', other_atom) result = (self._package_directory == other_atom._package_directory) print('result:', result) return result except Exception: return False else: return False def __ne__(self, other): if isinstance(other, PackageAtomName): return (self._package_directory != other._package_directory) elif isinstance(other, str) and self._parser: try: other_atom = self._parser.parse_package_parameter(other) return self._package_directory != other_atom._package_directory except Exception: return True else: return False def __bool__(self): if self._package_directory is None: return True return bool(self._package_directory) def __repr__(self): if self._package_directory is None: return '' return ''.format(self.category, self.name) def __hash__(self): return hash(self._package_directory) NonePackage = PackageAtomName({'pkg_path': None, 'version': None}) class PackageAtomParser: '''Класс для парсинга параметра package, его проверки, а также определения принадлежности файла пакету.''' package_name_pattern =\ r'(?P\D[\w\d]*(\-\D[\w\d]*)*)(?P-\d[^\s:]*)?' atom_regex = re.compile(r'''(?P[^\s/]*)/ {0} (?P:[^\s\[]*)?\s* (?P\[\S*(?:\s+\S*)*\])?'''.format( package_name_pattern ), re.VERBOSE) package_name_regex = re.compile(package_name_pattern) def __init__(self, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path if chroot_path != '/': self.pkg_path = join_paths(chroot_path, pkg_path) else: self.pkg_path = pkg_path self.package_atom = '' self._atom_dictionary = {} def parse_package_parameter(self, package_atom): '''Метод для разбора значения package, после разбора инициирует проверку полученных значений. Возвращает объект PackageAtomName.''' self.package_atom = package_atom self._atom_dictionary = {} parsing_result = self.atom_regex.search(package_atom) if (not parsing_result or parsing_result.string != package_atom or not parsing_result.groupdict()['category'] or not parsing_result.groupdict()['name']): raise PackageAtomError("'package' parameter value '{}' is not" " correct".format(package_atom), errno=NOTCORRECT, atom_name=self.atom_name) self._atom_dictionary['category'] = parsing_result.groupdict( )['category'] self._atom_dictionary['name'] = parsing_result.groupdict()['name'] if parsing_result.groupdict()['version']: version_value = parsing_result.groupdict()['version'].strip('-') self._atom_dictionary['version'] = Version(version_value) if (parsing_result.groupdict()['slot'] and parsing_result.groupdict()['slot'] != ':'): self._atom_dictionary['slot'] = parsing_result.groupdict( )['slot'][1:] if parsing_result.groupdict()['use_flags']: self._atom_dictionary['use_flags'] = [] use_flags = parsing_result.groupdict()['use_flags'].strip().\ rstrip(']').lstrip('[') for use_flag in use_flags.split(): self._atom_dictionary['use_flags'].append(use_flag.strip()) self._check_package_existance() atom_name_object = PackageAtomName(self._atom_dictionary, parser=self) self._atom_dictionary.clear() return atom_name_object def _check_package_existance(self, package_atom=''): '''Метод для проверки существования пакета. Существование пакета определяется наличием соответствующего CONTENTS файла.''' if package_atom: self.parse_package_parameter(package_atom) return True else: # Используем glob-паттерн для поиска. if 'version' in self._atom_dictionary: full_name = self._atom_dictionary['name'] + '-' +\ self._atom_dictionary['version']._version_string else: full_name = self._atom_dictionary['name'] if 'version' not in self._atom_dictionary: glob_result = glob.glob( r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format( self.pkg_path, self._atom_dictionary['category'], full_name)) else: glob_result = glob.glob( r'{0}/{1}/{2}*/CONTENTS'.format( self.pkg_path, self._atom_dictionary['category'], full_name)) if not glob_result: # Если ничего не нашлось. raise PackageAtomError("Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST, atom_name=self.package_atom) if len(glob_result) == 1: # Если нашелся один пакет. pkg_path = os.path.dirname(next(iter(glob_result))) self._check_slot_value(pkg_path) self._check_use_flags_value(pkg_path) parsed_name = self._atom_dictionary['name'] full_name = os.path.basename(pkg_path) self._atom_dictionary['version'] = Version( full_name[len(parsed_name):]) self._atom_dictionary['pkg_path'] = pkg_path else: packages = dict() # Если подходящих пакетов много -- проверяем по use-флагам, # слотам и версии, если таковые заданы. for contents_path in glob_result: pkg_path = os.path.dirname(contents_path) try: self._check_slot_value(pkg_path) self._check_use_flags_value(pkg_path) parsed_name = self._atom_dictionary['name'] full_name = os.path.basename(pkg_path) packages[pkg_path] = Version( full_name[len(parsed_name):]) except PackageAtomError: continue if not packages: # Если после проверки отсеялись все кандидаты. raise PackageAtomError( "Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST, atom_name=self.package_atom) if len(packages) == 1: # Если был найден только один кандидат -- выдаем его. pkg_path = next(iter(packages.keys())) self._atom_dictionary['pkg_path'] = pkg_path self._atom_dictionary['version'] = packages[pkg_path] else: # Если подходящих пакетов много -- берем старшую версию. pkg_path = sorted(packages.keys(), key=lambda path: packages[path])[-1] self._atom_dictionary['pkg_path'] = pkg_path self._atom_dictionary['version'] = packages[pkg_path] def _check_slot_value(self, pkg_path): '''Метод для проверки полученного из параметра package значения slot. ''' if 'slot' in self._atom_dictionary: slot = self._get_slot_value(pkg_path) if slot != self._atom_dictionary['slot']: raise PackageAtomError("Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST, atom_name=self.package_atom) def _check_use_flags_value(self, pkg_path): '''Метод для проверки полученных из параметра package значений use-флагов.''' if 'use_flags' in self._atom_dictionary: use_flags = self._get_use_flags_value(pkg_path) for use_flag in self._atom_dictionary['use_flags']: if use_flag not in use_flags: raise PackageAtomError( "Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST, atom_name=self.package_atom) def _get_slot_value(self, pkg_path): '''Метод для получения значения slot из файла SLOT.''' slot_path = os.path.join(pkg_path, 'SLOT') try: return read_file(slot_path).strip('\n') except FilesError: raise PackageAtomError("could not read slot value for" " 'package': {}".format(self.package_atom), atom_name=self.package_atom) def _get_use_flags_value(self, pkg_path): '''Метод для получения списка значений use-флагов из файла USE.''' use_path = os.path.join(pkg_path, 'USE') try: return read_file(use_path).strip('\n').split(' ') except FilesError: raise PackageAtomError("could not read use flags for 'package'" " parameter: {}".format(self.package_atom), atom_name=self.package_atom) def _get_category_packages(self, category): '''Генератор имен категорий, имеющихся в /var/db/pkg''' for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path, category)): yield path def get_file_package(self, file_path): '''Метод для определения пакета, которому принадлежит файл.''' # Удаляем часть пути соответствующую chroot_path if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] for category in os.listdir(self.pkg_path): for contents_path in self._get_category_packages(category): try: with open(contents_path, 'r') as contents_file: for file_line in contents_file.readlines(): contents_name = file_line.split(' ')[1].strip() if contents_name == file_path: package_path = os.path.dirname(contents_path) package_name = os.path.basename(package_path) parsing_result = self.package_name_regex.\ search(package_name) version = parsing_result.groupdict()['version'] version = Version(version) package_atom = PackageAtomName( {'pkg_path': package_path, 'version': version}, parser=self) return package_atom except (OSError, IOError): continue else: raise PackageNotFound("The file does not belong to any package") @property def atom_dictionary(self): '''Метод для получения ATOM-словаря.''' return self._atom_dictionary @property def atom_name(self): '''Метод для получения из ATOM-словаря объекта PackageAtomName.''' return PackageAtomName(self._atom_dictionary, parser=self) @atom_dictionary.setter def set_atom_dictionary(self, atom_dictionary): '''Метод для установки ATOM-словаря.''' self._atom_dictionary = atom_dictionary @property def category(self): '''Метод для получения категории пакета из ATOM-словаря.''' if 'category' in self._atom_dictionary: return self._atom_dictionary['category'] else: return False @property def name(self): '''Метод для получения имени пакета из ATOM-словаря.''' if 'name' in self._atom_dictionary: return self._atom_dictionary['name'] else: return False @property def slot(self): '''Метод для получения значения slot из ATOM-словаря.''' if 'slot' in self._atom_dictionary: return self._atom_dictionary['slot'] else: if 'contents' in self._atom_dictionary: return self._get_slot_value(self._atom_dictionary['contents']) return False @property def use_flags(self): '''Метод для получения use-флагов из ATOM-словаря.''' if 'use_flags' in self._atom_dictionary: return self._atom_dictionary['use_flags'] else: if 'contents' in self._atom_dictionary: return self._get_use_flags_value( self._atom_dictionary['contents']) else: return False class Package: '''Класс для работы с принадлежностью файлов пакетам.''' re_cfg = re.compile(r'/\._cfg\d{4}_') def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path self.contents_file_path = self._get_contents_path(package_atom) self.package_name = package_atom if (chroot_path != '/' and not self.contents_file_path.startswith(chroot_path)): self.contents_file_path = join_paths(chroot_path, self.contents_file_path) if not os.path.exists(self.contents_file_path): raise PackageError("Can not find CONTENTS file in path: {}".format( self.contents_file_path )) self.contents_dictionary = OrderedDict() self.read_contents_file() def _get_contents_path(self, package_atom): '''Метод для получения из ATOM-названия или готового объекта PackageAtomName пути к файлу CONTENTS.''' if isinstance(package_atom, str): package_atom_parser = PackageAtomParser( chroot_path=self.chroot_path) atom_name = package_atom_parser.parse_package_parameter( package_atom) return os.path.join(atom_name.pkg_path, 'CONTENTS') elif isinstance(package_atom, PackageAtomName): return os.path.join(package_atom.pkg_path, 'CONTENTS') else: raise PackageError( "Incorrect 'package_atom' value: '{}', type: '{}''". format(package_atom, type(package_atom))) def remove_cfg_prefix(self, file_name): '''Метод для удаления префикса ._cfg????_.''' return self.re_cfg.sub('/', file_name) def remove_chroot_path(self, file_name): '''Метод для удаления из пути файла корневого пути, если он не является /.''' if self.chroot_path != '/' and file_name.startswith(self.chroot_path): return file_name[len(self.chroot_path):] else: return file_name def read_contents_file(self): '''Метод для чтения файла CONTENTS.''' try: contents_text = read_file(self.contents_file_path) except FilesError as error: raise PackageError(str(error)) if contents_text: contents_format = ContentsFormat(contents_text, None, template_parser=False) self.contents_dictionary = contents_format._document_dictionary return True else: return False def write_contents_file(self): '''Метод для записи файла CONTENTS.''' contents_file = open(self.contents_file_path, 'w') contents_text = self.render_contents_file() contents_file.write(contents_text) contents_file.close() def render_contents_file(self): '''Метод для получения текста файла CONTENTS.''' contents_format = ContentsFormat('', None, template_parser=False) contents_format._document_dictionary = self.contents_dictionary return contents_format.document_text def add_dir(self, file_name): '''Метод для добавления в CONTENTS директорий.''' file_name = self.remove_chroot_path(file_name) if (file_name != '/' and (file_name not in self.contents_dictionary or self.contents_dictionary[file_name]['type'] != 'dir')): self.add_dir(os.path.dirname(file_name)) contents_item = OrderedDict({'type': 'dir'}) self.contents_dictionary[file_name] = contents_item def add_sym(self, file_name, target_path=None): '''Метод для добавления в CONTENTS символьных ссылок.''' file_name = self.remove_cfg_prefix(file_name) real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) if target_path is None: target = read_link(real_path) else: target = target_path self.add_dir(os.path.dirname(file_name)) mtime = str(int(os.lstat(real_path).st_mtime)) try: contents_item = OrderedDict({'type': 'sym', 'target': target, 'mtime': mtime}) except FilesError as error: raise PackageError(str(error)) self.contents_dictionary[file_name] = contents_item def add_obj(self, file_name, file_md5=None): '''Метод для добавления в CONTENTS обычных файлов как obj.''' real_path = file_name file_name = self.remove_chroot_path(file_name) file_name = self.remove_cfg_prefix(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) self.add_dir(os.path.dirname(file_name)) if file_md5 is not None: md5_value = file_md5 else: try: file_text = read_file(real_path).encode() except FilesError as error: raise PackageError(str(error)) md5_value = hashlib.md5(file_text).hexdigest() contents_item = OrderedDict({'type': 'obj', 'md5': md5_value, 'mtime': str(int(os.lstat(real_path).st_mtime))}) self.contents_dictionary[file_name] = contents_item def add_file(self, file_name): '''Метод для добавления в CONTENTS файла любого типа.''' if file_name != '/': real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path != file_name: real_path = join_paths(self.chroot_path, file_name) if os.path.isdir(real_path): self.add_dir(file_name) elif os.path.islink(real_path): self.add_link(file_name) elif os.path.isfile(real_path): self.add_obj(file_name) def remove_obj(self, file_path): '''Метод для удаления файлов и ссылок.''' file_path = self.remove_chroot_path(file_path) file_path = self.remove_cfg_prefix(file_path) if file_path in self.contents_dictionary: self.contents_dictionary.pop(file_path) def remove_dir(self, file_path): '''Метод для удаления из CONTENTS файлов и директорий находящихся внутри удаляемой директории и самой директории.''' directory_path = self.remove_chroot_path(file_path) paths_to_remove = [] for file_path in self.contents_dictionary: if file_path.startswith(directory_path): paths_to_remove.append(file_path) for file_path in paths_to_remove: self.contents_dictionary.pop(file_path) def clear_dir(self, file_path): '''Метод для удаления из CONTENTS файлов и директорий находящихся внутри очищаемой директории.''' directory_path = self.remove_chroot_path(file_path) paths_to_remove = [] for file_path in self.contents_dictionary: if file_path == directory_path: continue if file_path.startswith(directory_path): paths_to_remove.append(file_path) for file_path in paths_to_remove: self.contents_dictionary.pop(file_path) def remove_empty_directories(self): '''Метод для удаления из CONTENTS директорий, которые после удаления тех или иных файлов больше не находятся на пути к тем файлам, которые по-прежнему принадлежат пакету.''' used_directories = set() not_directory_list = [path for path, value in self.contents_dictionary.items() if value['type'] != 'dir'] for filepath in not_directory_list: file_directory = os.path.dirname(filepath) while file_directory != '/': used_directories.add(file_directory) file_directory = os.path.dirname(file_directory) paths_to_delete = [file_path for file_path, value in self.contents_dictionary.items() if value['type'] == 'dir' and file_path not in used_directories] for file_path in paths_to_delete: self.contents_dictionary.pop(file_path) def get_md5(self, file_path): '''Метод для получения md5 хэш-суммы указанного файла.''' try: file_text = read_file(file_path).encode() except FilesError as error: raise PackageError(str(error)) file_md5 = hashlib.md5(file_text).hexdigest() return file_md5 def is_md5_equal(self, file_path, file_md5=None): '''Метод для проверки соответствия md5 хэш суммы файла той, что указана в файле CONTENTS.''' if file_md5 is None: file_md5 = self.get_md5(file_path) if self.chroot_path != "/" and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] file_path = self.remove_cfg_prefix(file_path) contents_md5 = self.contents_dictionary[file_path]['md5'] return file_md5 == contents_md5 def __contains__(self, file_path): if self.chroot_path != "/" and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] file_path = self.remove_cfg_prefix(file_path) return file_path in self.contents_dictionary def __repr__(self): return ''.format(self.package_name.category, self.package_name.name)