# vim: fileencoding=utf-8 # import os from .base_format import Format, FormatError from calculate.utils.files import join_paths from pyparsing import Literal, Regex, SkipTo, LineEnd, lineno, LineStart from calculate.utils.package import PackageAtomParser, Package, PackageNotFound from ..template_engine import ParametersContainer, Variables from ...variables.datavars import NamespaceNode from ...variables.loader import Datavars from fnmatch import fnmatch from typing import Union from glob import iglob ADD, REMOVE, MOVE = range(0, 3) class ContentsFormat(Format): FORMAT = 'contents' EXECUTABLE = True _initialized = False def __new__(cls, *args, **kwargs): if not cls._initialized: cls._initialize_parser() return super().__new__(cls) def __init__(self, template_text: str, template_path: str, parameters: ParametersContainer, datavars: Union[Datavars, NamespaceNode, Variables]): self._command_methods = {ADD: self._add_command, REMOVE: self._remove_command, MOVE: self._move_command} self._parse_errors = [] self._commands = self._parse_template(template_text) if self._parse_errors: raise FormatError('errors while parsing template:\n{}'. format('\n'.join(self._parse_errors))) self._template_path = template_path self._packages = dict() self._atom_parser = None # Предупреждения. self._warnings: list = [] def execute_format(self, target_path: str, chroot_path='/') -> dict: '''Метод для запуска работы формата.''' self._package = dict() self._atom_parser = PackageAtomParser(chroot_path=chroot_path) for command in self._commands: target_package = self._get_package(command['target'], chroot_path) if 'source' not in command: source_package = None else: source_package = self._get_package(command['source'], chroot_path) self._command_methods[command['action']](target_package, command['path'], command['lineno'], source=source_package) self._save_changes() return {} def _add_command(self, target, glob_path, lineno, source=None): # Если файл уже есть в пакете -- ничего не делаем. glob_path = join_paths(target.chroot_path, glob_path) for file_path in iglob(glob_path): # Если файл уже в пакете -- ничего не делаем. if file_path in target: continue # Проверяем существование файла. if not os.path.exists(file_path): raise FormatError(f"Format processing error:{lineno}: file " f"'{file_path}' does not exist.") # Проверяем, не принадлежит ли файл какому-нибудь другому пакету. try: file_package = self._atom_parser.get_file_package(file_path) except PackageNotFound: file_package = None if file_package is not None: raise FormatError(f"Format processing error:{lineno}: can not" f" add file '{file_path}' belonging to the" f" package {file_package} into the package" f" '{target}'.") target.add_file(file_path) def _remove_command(self, target, glob_path, lineno, source=None): paths_to_remove = [] if glob_path.endswith('/'): glob_path = glob_path[0:-1] for file_path in target.contents_dictionary.keys(): if fnmatch(file_path, glob_path): paths_to_remove.append(file_path) for file_path in paths_to_remove: target.remove_file(file_path) def _move_command(self, target, glob_path, lineno, source=None): if source is None: return paths_to_move = [] if glob_path.endswith('/'): glob_path = glob_path[0:-1] for file_path in source.contents_dictionary.keys(): if fnmatch(file_path, glob_path): paths_to_move.append(file_path) for file_path in paths_to_move: removed = source.remove_file(file_path) for file_path, file_info in removed.items(): if file_info['type'] == 'dir': target.add_dir(file_path) elif file_info['type'] == 'obj': target.add_obj(file_path, file_md5=file_info['md5'], mtime=file_info['mtime']) elif file_info['type'] == 'sym': target.add_sym(file_path, target_path=file_info['target'], mtime=file_info['mtime']) target.sort_contents_dictionary() def _save_changes(self): for atom_name, package in self._packages.items(): package.write_contents_file() self._packages = {} def _get_package(self, atom_name: str, chroot_path: str) -> Package: atom = self._atom_parser.parse_package_parameter(atom_name) if atom not in self._packages: package = Package(atom, chroot_path=chroot_path) self._packages.update({atom: package}) else: package = self._packages[atom] return package def _parse_template(self, template_text: str) -> list: for tokens, start, end in self.template_parser.scanString( template_text): parse_result = tokens[0] if parse_result is None: continue elif 'error' in parse_result: error = (f'Parse Error:{parse_result["lineno"]}:' f' {parse_result["error"]}') self._parse_errors.append(error) continue yield parse_result @classmethod def _initialize_parser(cls): atom = Regex(PackageAtomParser.atom_regex) remove_symbol = Literal('!') path = Regex(r'\S+') add_parser = atom('target') + path('path') + LineEnd().suppress() add_parser.setParseAction(cls._parse_add) move_parser = (atom('source') + Literal(',').suppress() + atom('target') + path('path') + LineEnd().suppress()) move_parser.setParseAction(cls._parse_move) remove_parser = (remove_symbol.suppress() + atom('target') + path('path') + LineEnd().suppress()) remove_parser.setParseAction(cls._parse_remove) empty_line = LineStart().suppress() + LineEnd().suppress() empty_line.setParseAction(cls._parse_skip) comment = Literal('#') + SkipTo(LineEnd(), include=True) comment.setParseAction(cls._parse_skip) unexpected = SkipTo(LineEnd(), include=True) unexpected.setParseAction(cls._parse_unexpected) cls.template_parser = (move_parser | remove_parser | add_parser | empty_line | comment | unexpected) @classmethod def _parse_add(cls, string, location, parse_result): target = parse_result.asDict()['target'] path = parse_result.asDict()['path'] return {'action': ADD, 'target': target, 'path': path, 'lineno': lineno(location, string)} @classmethod def _parse_remove(cls, string, location, parse_result): target = parse_result.asDict()['target'] path = parse_result.asDict()['path'] return {'action': REMOVE, 'target': target, 'path': path, 'lineno': lineno(location, string)} @classmethod def _parse_move(cls, string, location, parse_result): source = parse_result.asDict()['source'] target = parse_result.asDict()['target'] path = parse_result.asDict()['path'] return {'action': MOVE, 'source': source, 'target': target, 'path': path, 'lineno': lineno(location, string)} @classmethod def _parse_skip(cls, parse_result): return [None] @classmethod def _parse_unexpected(cls, string, location, parse_result): result = parse_result[0] if not result: return [None] return {'error': result, 'lineno': lineno(location, string)} @property def warnings(self): return self._warnings