# vim: fileencoding=utf-8 # import os import re import glob from collections import OrderedDict from ..templates.format.contents_format import ContentsFormat from .files import read_file, read_link, join_paths, FilesError import hashlib import operator class PackageError(Exception): pass DEFAULT, NOTEXIST, NOTCORRECT = range(3) class PackageAtomError(Exception): def __init__(self, message='Package atom error', errno=DEFAULT): self.message = message self.errno = errno class VersionError(Exception): pass class Version: '''Временный класс для работы со значениями версий.''' def __init__(self, version_value=None): if version_value is None: self._version_string = '-1' self._version_value = [-1] elif isinstance(version_value, Version): self._version_string = version_value._version_string self._version_value = version_value._version_value else: value = self._get_version_value(version_value) if not value: raise VersionError( "Can't initialize Version object using '{0}'" " value with type {1}".format(version_value, type(version_value))) if isinstance(version_value, str): self._version_string = version_value.strip('-') else: self._version_string = str(version_value) self._version_value = value def _get_version_value(self, version): if isinstance(version, Version): return version._version_value elif isinstance(version, int): version_value = [str(version)] elif isinstance(version, float): version_value = [] version = str(version).split('.') for version_part in version: version_value.append(int(version_part.strip())) elif isinstance(version, str): version = version.strip('-') version_value = [] if '-' in version: version = version.split('-')[0] if '_' in version: version = version.split('_')[0] for version_part in version.split('.'): if version_part.isdigit(): version_part = int(version_part) version_value.append(version_part) else: return False else: return False return version_value def _use_compare_operation(self, compare_operator, other_value): '''Перегрузка x < y.''' version_value = self._version_value[:] other_value_length = len(other_value) version_value_length = len(version_value) if other_value_length < version_value_length: for counter in range(version_value_length - other_value_length): other_value.append(0) elif version_value_length < other_value_length: for counter in range(other_value_length - version_value_length): version_value.append(0) for lvalue, rvalue in zip(version_value, other_value): if lvalue == rvalue: continue if compare_operator(lvalue, rvalue): return True else: return False else: if (compare_operator != operator.lt and compare_operator != operator.gt and compare_operator != operator.ne): return True else: return False def __lt__(self, other): '''Перегрузка x < y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.lt, other_value) def __le__(self, other): '''Перегрузка x <= y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.le, other_value) def __eq__(self, other): '''Перегрузка x == y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.eq, other_value) def __ne__(self, other): '''Перегрузка x != y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.ne, other_value) def __gt__(self, other): '''Перегрузка x > y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.gt, other_value) def __ge__(self, other): '''Перегрузка x >= y.''' other_value = self._get_version_value(other) if not other_value: raise VersionError( "Unable to compare Version object with the '{0}'" " value of '{1}' type".format(other, type(other))) return self._use_compare_operation(operator.ge, other_value) def __hash__(self): return hash(self._version_string) def __repr__(self): return ''.format(self._version_string) def __str__(self): return self._version_string def __bool__(self): if self._version_value == [-1]: return False else: return True class PackageAtomName: def __init__(self, atom_dictionary): self._package_directory = atom_dictionary['pkg_path'] self._version = atom_dictionary['version'] @property def name(self): return os.path.basename(self._package_directory) @property def category(self): return os.path.basename(os.path.dirname(self._package_directory)) @property def version(self): return self._version @property def use_flags(self): use_path = os.path.join(self._package_directory, 'USE') try: return read_file(use_path).strip('\n').split(' ') except FilesError: raise PackageAtomError("could not read use flags for 'package'" " parameter: {}".format(self.package_atom)) @property def pkg_path(self): return self._package_directory @property def slot(self): slot_path = os.path.join(self._package_directory, 'SLOT') try: return read_file(slot_path).strip('\n') except FilesError: raise PackageAtomError("could not read slot value for" " 'package': {}".format(self.package_atom)) def __eq__(self, other): if isinstance(other, PackageAtomName): return (self._package_directory == other._package_directory) else: return False def __ne__(self, other): if isinstance(other, PackageAtomName): return (self._package_directory != other._package_directory) else: return False def __bool__(self): return bool(self._package_directory) def __repr__(self): return ''.format(self.category, self.name) class PackageAtomParser: atom_regex = re.compile(r'''(?P[^\s/]*)/ (?P\D[\w\d]*(\-\D[\w\d]*)*) (?P-\d[^\s:]*)? (?P:[^\s\[]*)?\s* (?P\[\S*(?:\s+\S*)*\])?''', re.VERBOSE) def __init__(self, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path if chroot_path != '/': self.pkg_path = join_paths(chroot_path, pkg_path) else: self.pkg_path = pkg_path self.package_atom = '' self._atom_dictionary = {} def parse_package_parameter(self, package_atom): self.package_atom = package_atom self._atom_dictionary = {} parsing_result = self.atom_regex.search(package_atom) if (not parsing_result or parsing_result.string != package_atom or not parsing_result.groupdict()['category'] or not parsing_result.groupdict()['name']): raise PackageAtomError("'package' parameter value '{}' is not" " correct".format(package_atom), errno=NOTCORRECT) self._atom_dictionary['category'] = parsing_result.groupdict( )['category'] self._atom_dictionary['name'] = parsing_result.groupdict()['name'] if parsing_result.groupdict()['version']: version_value = parsing_result.groupdict()['version'].strip('-') self._atom_dictionary['version'] = Version(version_value) if (parsing_result.groupdict()['slot'] and parsing_result.groupdict()['slot'] != ':'): self._atom_dictionary['slot'] = parsing_result.groupdict( )['slot'][1:] if parsing_result.groupdict()['use_flags']: self._atom_dictionary['use_flags'] = [] use_flags = parsing_result.groupdict()['use_flags'].strip().\ rstrip(']').lstrip('[') for use_flag in use_flags.split(): self._atom_dictionary['use_flags'].append(use_flag.strip()) self._check_package_existance() atom_name_object = PackageAtomName(self._atom_dictionary) self._atom_dictionary.clear() return atom_name_object def _check_package_existance(self, package_atom=''): if package_atom: self.parse_package_parameter(package_atom) return True else: if 'version' in self._atom_dictionary: full_name = self._atom_dictionary['name'] + '-' +\ self._atom_dictionary['version']._version_string else: full_name = self._atom_dictionary['name'] if 'version' not in self._atom_dictionary: glob_result = glob.glob( r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format( self.pkg_path, self._atom_dictionary['category'], full_name)) else: glob_result = glob.glob( r'{0}/{1}/{2}*/CONTENTS'.format( self.pkg_path, self._atom_dictionary['category'], full_name)) if not glob_result: raise PackageAtomError("Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST) if len(glob_result) == 1: pkg_path = os.path.dirname(next(iter(glob_result))) self._check_slot_value(pkg_path) self._check_use_flags_value(pkg_path) parsed_name = self._atom_dictionary['name'] full_name = os.path.basename(pkg_path) self._atom_dictionary['version'] = Version( full_name[len(parsed_name):]) self._atom_dictionary['pkg_path'] = pkg_path else: packages = dict() for contents_path in glob_result: pkg_path = os.path.dirname(contents_path) try: self._check_slot_value(pkg_path) self._check_use_flags_value(pkg_path) parsed_name = self._atom_dictionary['name'] full_name = os.path.basename(pkg_path) packages[pkg_path] = Version( full_name[len(parsed_name):]) except PackageAtomError: continue if not packages: raise PackageAtomError( "Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST) if len(packages) == 1: pkg_path = next(iter(packages.keys())) self._atom_dictionary['pkg_path'] = pkg_path self._atom_dictionary['version'] = packages[pkg_path] else: # Берем старшую версию. pkg_path = sorted(packages.keys(), key=lambda path: packages[path])[-1] self._atom_dictionary['pkg_path'] = pkg_path self._atom_dictionary['version'] = packages[pkg_path] def _check_slot_value(self, pkg_path): if 'slot' in self._atom_dictionary: slot = self._get_slot_value(pkg_path) if slot != self._atom_dictionary['slot']: raise PackageAtomError("Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST) def _check_use_flags_value(self, pkg_path): if 'use_flags' in self._atom_dictionary: use_flags = self._get_use_flags_value(pkg_path) for use_flag in self._atom_dictionary['use_flags']: if use_flag not in use_flags: raise PackageAtomError( "Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom), errno=NOTEXIST) def _get_slot_value(self, pkg_path): slot_path = os.path.join(pkg_path, 'SLOT') try: return read_file(slot_path).strip('\n') except FilesError: raise PackageAtomError("could not read slot value for" " 'package': {}".format(self.package_atom)) def _get_use_flags_value(self, pkg_path): use_path = os.path.join(pkg_path, 'USE') try: return read_file(use_path).strip('\n').split(' ') except FilesError: raise PackageAtomError("could not read use flags for 'package'" " parameter: {}".format(self.package_atom)) def _get_category_packages(self, category): for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path, category)): yield path def get_file_package(self, file_path, with_slot=False, with_uses=False): if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] for category in os.listdir(self.pkg_path): for contents_path in self._get_category_packages(category): try: with open(contents_path, 'r') as contents_file: for file_line in contents_file.readlines(): contents_name = file_line.split(' ')[1].strip() if contents_name == file_path: package = '/'.join(os.path.dirname( contents_path).split('/')[-2:]) return package except (OSError, IOError): continue else: raise PackageAtomError("The file does not belong to any package") @property def atom_dictionary(self): return self._atom_dictionary @property def atom_name(self): return PackageAtomName(self._atom_dictionary) @atom_dictionary.setter def set_atom_dictionary(self, atom_dictionary): self._atom_dictionary = atom_dictionary @property def category(self): if 'category' in self._atom_dictionary: return self._atom_dictionary['category'] else: return False @property def name(self): if 'name' in self._atom_dictionary: return self._atom_dictionary['name'] else: return False @property def slot(self): if 'slot' in self._atom_dictionary: return self._atom_dictionary['slot'] else: if 'contents' in self._atom_dictionary: return self._get_slot_value(self._atom_dictionary['contents']) return False @property def use_flags(self): if 'use_flags' in self._atom_dictionary: return self._atom_dictionary['use_flags'] else: if 'contents' in self._atom_dictionary: return self._get_use_flags_value( self._atom_dictionary['contents']) else: return False class Package: re_cfg = re.compile(r'/\._cfg\d{4}_') def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path self.contents_file_path = self._get_contents_path(package_atom) if (chroot_path != '/' and not self.contents_file_path.startswith(chroot_path)): self.contents_file_path = join_paths(chroot_path, self.contents_file_path) if not os.path.exists(self.contents_file_path): raise PackageError("Can not find CONTENTS file in path: {}".format( self.contents_file_path )) self.contents_dictionary = OrderedDict() self.read_contents_file() def _get_contents_path(self, package_atom): if isinstance(package_atom, str): package_atom_parser = PackageAtomParser( chroot_path=self.chroot_path) atom_name = package_atom_parser.parse_package_parameter( package_atom) return os.path.join(atom_name.pkg_path, 'CONTENTS') if isinstance(package_atom, PackageAtomName): return os.path.join(package_atom.pkg_path, 'CONTENTS') else: raise PackageError( "Incorrect 'pakage_atom' value: '{}', type: '{}''". format(package_atom, type(package_atom))) def remove_cfg_prefix(self, file_name): return self.re_cfg.sub('/', file_name) def remove_chroot_path(self, file_name): if self.chroot_path != '/' and file_name.startswith(self.chroot_path): return file_name[len(self.chroot_path):] else: return file_name def read_contents_file(self): try: contents_text = read_file(self.contents_file_path) except FilesError as error: raise PackageError(str(error)) if contents_text: contents_format = ContentsFormat(contents_text, template_parser=False) self.contents_dictionary = contents_format._document_dictionary return True else: return False def write_contents_file(self): pass def render_contents_file(self): contents_format = ContentsFormat('', template_parser=False) contents_format._document_dictionary = self.contents_dictionary return contents_format.get_document_text() def add_dir(self, file_name): file_name = self.remove_cfg_prefix(file_name) file_name = self.remove_chroot_path(file_name) if (file_name != '/' and (file_name not in self.contents_dictionary or self.contents_dictionary[file_name]['type'] != 'dir')): self.add_dir(os.path.dirname(file_name)) contents_item = OrderedDict({'type': 'dir'}) self.contents_dictionary[file_name] = contents_item def add_sym(self, file_name): file_name = self.remove_cfg_prefix(file_name) real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) self.add_dir(os.path.dirname(file_name)) mtime = str(int(os.lstat(real_path).st_mtime)) try: contents_item = OrderedDict({'type': 'sym', 'target': read_link(real_path), 'mtime': mtime}) except FilesError as error: raise PackageError(str(error)) self.contents_dictionary[file_name] = contents_item def add_obj(self, file_name): file_name = self.remove_cfg_prefix(file_name) real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) self.add_dir(os.path.dirname(file_name)) try: file_text = read_file(real_path).encode() except FilesError as error: raise PackageError(str(error)) contents_item = OrderedDict({'type': 'obj', 'md5': hashlib.md5(file_text).hexdigest(), 'mtime': str(int(os.lstat(real_path).st_mtime))}) self.contents_dictionary[file_name] = contents_item def add_file(self, file_name): if file_name != '/': real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path != file_name: real_path = join_paths(self.chroot_path, file_name) if os.path.isdir(real_path): self.add_dir(file_name) elif os.path.islink(real_path): self.add_link(file_name) elif os.path.isfile(real_path): self.add_obj(file_name) def modify_contents_item(self, file_name, item_value): pass def remove_file(self, file_name): pass def remove_empty_directories(self): pass def check_file_md5(self, file_path): pass