# vim: fileencoding=utf-8 # import os import re import glob from collections import OrderedDict from ..templates.format.contents_format import ContentsFormat from .files import read_file, read_link, join_paths, FilesError import hashlib class PackageError(Exception): pass class PackageAtomError(Exception): pass class PackageAtom: atom_regex = re.compile(r'''(?P[^\s/]*)/ (?P[^\s:]*) (?P:\S*)? (?P(?:\s+\S*)*) ''', re.VERBOSE) def __init__(self, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path if chroot_path != '/': self.pkg_path = join_paths(chroot_path, pkg_path) else: self.pkg_path = pkg_path self.package_atom = '' self._atom_dictionary = {} def parse_package_parameter(self, package_atom, add_slot=False, add_uses=False): self.package_atom = package_atom self._atom_dictionary = {} parsing_result = self.atom_regex.search(package_atom) if not parsing_result or parsing_result.string != package_atom: raise PackageAtomError("'package' parameter value '{}' is not" " correct".format(package_atom)) if 'category' in parsing_result.groupdict(): self._atom_dictionary['category'] = parsing_result.groupdict( )['category'] if 'name' in parsing_result.groupdict(): self._atom_dictionary['name'] = parsing_result.groupdict()['name'] self._check_package_existance() if ('slot' in parsing_result.groupdict() and parsing_result.groupdict()['slot'] and parsing_result.groupdict()['slot'] != ':'): print('slot value is correct') self._atom_dictionary['slot'] = parsing_result.groupdict( )['slot'][1:] elif add_slot: self._atom_dictionary['slot'] = self._get_slot_value() if ('uses' in parsing_result.groupdict() and parsing_result.groupdict()['uses']): self._atom_dictionary['uses'] = [] uses = parsing_result.groupdict()['uses'].strip().split(' ') for use_flag in uses: self._atom_dictionary['uses'].append(use_flag.strip()) elif add_uses: self._atom_dictionary['uses'] = self._get_use_flags_value() def _check_package_existance(self, package_atom=''): if package_atom: self.parse_package_parameter(package_atom) return True elif (self._atom_dictionary['category'] and self._atom_dictionary['name']): glob_result = glob.glob( '{0}/{1}/{2}*/CONTENTS'.format(self.pkg_path, self._atom_dictionary['category'], self._atom_dictionary['name'])) if not glob_result: raise PackageAtomError("Package from 'package' parameter value" " '{}' does not exist".format( self.package_atom)) elif len(glob_result) == 1: contents_path = next(iter(glob_result)) self._atom_dictionary['name'] = contents_path.split('/')[-2] self._atom_dictionary['contents'] = contents_path else: raise PackageAtomError("'package' parameter value '{}' matches" " multiple installed packages".format( self.package_atom)) def _get_slot_value(self): contents_path = self._atom_dictionary['contents'] slot_path = os.path.join(os.path.dirname(contents_path), 'SLOT') try: return read_file(slot_path).strip('\n') except FilesError: raise PackageAtomError("could not read slot value for" " 'package': {}".format(self.package_atom)) def _get_use_flags_value(self): contents_path = self._atom_dictionary['contents'] use_path = os.path.join(os.path.dirname(contents_path), 'USE') try: return read_file(use_path).strip('\n').split(' ') except FilesError: raise PackageAtomError("could not read use flags for 'package'" " parameter: {}".format(self.package_atom)) def _get_category_packages(self, category): for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path, category)): yield path def get_file_package(self, file_path, with_slot=False, with_uses=False): if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] for category in os.listdir(self.pkg_path): for contents_path in self._get_category_packages(category): try: with open(contents_path, 'r') as contents_file: for file_line in contents_file.readlines(): contents_name = file_line.split(' ')[1].strip() if contents_name == file_path: package = '/'.join(os.path.dirname( contents_path).split('/')[-2:]) return package except (OSError, IOError): continue else: raise PackageAtomError("The file does not belong to any package") @property def atom_dictionary(self): return self._atom_dictionary @property def category(self): if 'category' in self._atom_dictionary: return self._atom_dictionary['category'] else: return False @property def name(self): if 'name' in self._atom_dictionary: return self._atom_dictionary['name'] else: return False @property def slot(self): if 'slot' in self._atom_dictionary: return self._atom_dictionary['slot'] else: return False @property def uses(self): if 'uses' in self._atom_dictionary: return self._atom_dictionary['uses'] else: return False class Package: re_cfg = re.compile(r'/\._cfg\d{4}_') def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'): self.chroot_path = chroot_path self.contents_file_path = os.path.join(pkg_path, package_atom, 'CONTENTS') self.contents_dictionary = OrderedDict() if chroot_path != '/': self.contents_file_path = join_paths(chroot_path, self.contents_file_path) os.path.exists(self.contents_file_path) self.read_contents_file() def remove_cfg_prefix(self, file_name): return self.re_cfg.sub('/', file_name) def remove_chroot_path(self, file_name): if self.chroot_path != '/' and file_name.startswith(self.chroot_path): return file_name[len(self.chroot_path):] else: return file_name def read_contents_file(self): try: contents_text = read_file(self.contents_file_path) except FilesError as error: raise PackageError(str(error)) if contents_text: contents_format = ContentsFormat(contents_text, template_parser=False) self.contents_dictionary = contents_format._document_dictionary return True else: return False def write_contents_file(self): pass def render_contents_file(self): contents_format = ContentsFormat('', template_parser=False) contents_format._document_dictionary = self.contents_dictionary return contents_format.get_document_text() def add_dir(self, file_name): file_name = self.remove_cfg_prefix(file_name) file_name = self.remove_chroot_path(file_name) if (file_name != '/' and (file_name not in self.contents_dictionary or self.contents_dictionary[file_name]['type'] != 'dir')): self.add_dir(os.path.dirname(file_name)) contents_item = OrderedDict({'type': 'dir'}) self.contents_dictionary[file_name] = contents_item def add_sym(self, file_name): file_name = self.remove_cfg_prefix(file_name) real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) self.add_dir(os.path.dirname(file_name)) mtime = str(int(os.lstat(real_path).st_mtime)) try: contents_item = OrderedDict({'type': 'sym', 'target': read_link(real_path), 'mtime': mtime}) except FilesError as error: raise PackageError(str(error)) self.contents_dictionary[file_name] = contents_item def add_obj(self, file_name): file_name = self.remove_cfg_prefix(file_name) real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path == file_name: real_path = join_paths(self.chroot_path, file_name) self.add_dir(os.path.dirname(file_name)) try: file_text = read_file(real_path).encode() except FilesError as error: raise PackageError(str(error)) contents_item = OrderedDict({'type': 'obj', 'md5': hashlib.md5(file_text).hexdigest(), 'mtime': str(int(os.lstat(real_path).st_mtime))}) self.contents_dictionary[file_name] = contents_item def add_file(self, file_name): if file_name != '/': real_path = file_name file_name = self.remove_chroot_path(file_name) if real_path != file_name: real_path = join_paths(self.chroot_path, file_name) if os.path.isdir(real_path): self.add_dir(file_name) elif os.path.islink(real_path): self.add_link(file_name) elif os.path.isfile(real_path): self.add_obj(file_name) def modify_contents_item(self, file_name, item_value): pass def remove_file(self, file_name): pass def remove_empty_directories(self): pass def check_file_md5(self, file_path): pass