# vim: fileencoding=utf-8 # from pprint import pprint from ..utils.package import PackageAtomParser, Package, PackageNotFound,\ PackageAtomName, Version, NonePackage from ..utils.files import join_paths, write_file, read_file_lines, FilesError,\ check_directory_link, read_link, Process,\ get_target_from_link from .template_engine import TemplateEngine, Variables, ConditionFailed,\ ParametersProcessor, DIR, FILE,\ ParametersContainer from calculate.variables.datavars import StringType, ListType, NamespaceNode from calculate.variables.loader import Datavars from .format.base_format import Format from ..utils.io_module import IOModule from collections import OrderedDict, abc from ..utils.mount import Mounts import hashlib import fnmatch import shutil import errno import stat import glob import copy import os # Наверное временно. CALCULATE_VERSION = Version('4.0') class TemplateExecutorError(Exception): pass class TemplateTypeConflict(Exception): pass class TemplateCollisionError(Exception): pass class CalculateConfigFile: '''Класс для работы с файлом /var/lib/calculate/config.''' def __init__(self, cl_config_path='/var/lib/calculate/config', cl_chroot_path='/'): self.chroot_path = cl_chroot_path self.cl_config_path = cl_config_path self._config_dictionary = self._get_cl_config_dictionary() self._unsaved_changes = False def __contains__(self, file_path: str) -> bool: file_path = self._remove_chroot(file_path) return file_path in self._config_dictionary def _get_cl_config_dictionary(self) -> OrderedDict: '''Метод для загрузки словаря файла /var/lib/calculate/config.''' config_dictionary = OrderedDict() if os.path.exists(self.cl_config_path): if os.path.isdir(self.cl_config_path): raise TemplateExecutorError( "directory instead calculate config file in: {}". format(self.cl_config_path)) else: write_file(self.cl_config_path).close() return config_dictionary try: config_file_lines = read_file_lines(self.cl_config_path) except FilesError as error: raise TemplateExecutorError( "cannot read calculate config file in: {0}. Reason: {1}". format(self.cl_config_path, str(error))) # TODO Продумать проверку корректности найденного файла. for file_line in config_file_lines: filename, md5_sum = file_line.split(' ') config_dictionary.update({filename: md5_sum}) self._unsaved_changes = False return config_dictionary def set_files_md5(self, file_path: str, file_md5: str) -> None: '''Метод для установки в config соответствия файла некоторой контрольной сумме.''' file_path = self._remove_chroot(file_path) self._config_dictionary[file_path] = file_md5 self._unsaved_changes = True def remove_file(self, file_path: str) -> None: '''Метод для удаления файла из config.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: self._config_dictionary.pop(file_path) self._unsaved_changes = True def compare_md5(self, file_path: str, file_md5: str) -> None: '''Метод для сравнения хэш-суммы из config и некоторой заданной.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: return self._config_dictionary[file_path] == file_md5 else: return False def save_changes(self) -> None: '''Метод для записи изменений, внессенных в файл config.''' if not self._unsaved_changes: return config_file = write_file(self.cl_config_path) for file_name, file_md5 in self._config_dictionary.items(): config_file.write('{} {}\n'.format(file_name, file_md5)) config_file.close() self._unsaved_changes = False def _remove_chroot(self, file_path: str) -> str: '''Метод для удаления корневого пути из указанного пути.''' if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return file_path class TemplateWrapper: '''Класс связывающий шаблон с целевым файлом и определяющий параметры наложения шаблона, обусловленные состоянием целевого файла.''' type_checks = {DIR: os.path.isdir, FILE: os.path.isfile} _protected_is_set = False _protected_set = set() _unprotected_set = set() def __new__(cls, *args, **kwargs): if not cls._protected_is_set: # Устанавливаем значения PROTECTED, если не заданы. if 'chroot_path' in kwargs: chroot_path = kwargs['chroot_path'] else: chroot_path = '/' cls._set_protected(chroot_path) return super().__new__(cls) def __init__(self, target_file_path, parameters, template_type, template_path, template_text='', target_package=None, chroot_path='/', config_archive_path='/var/lib/calculate/config-archive', dbpkg=True): self.target_path = target_file_path self.template_path = template_path self.chroot_path = chroot_path self.config_archive_path = config_archive_path self.target_package_name = None self.package_atom_parser = PackageAtomParser( chroot_path=self.chroot_path) # Вспомогательный флаг, включается, если по целевому пути лежит файл, # для которого не определился никакой пакет. self.target_without_package = False self.parameters = parameters self.output_path = self.target_path self.input_path = None self.template_type = template_type self.template_text = template_text # Флаг, указывающий, что нужно удалить файл из target_path перед # применением шаблона. self.remove_original = False # Флаг, указывающий, что целевой путь был изменен. self.target_path_is_changed = False # Флаг, указывающий, что файл по целевому пути является ссылкой. self.target_is_link = False # Пакет, к которому относится файл. self.target_package = target_package # Флаг, разрешающий работу с CONTENTS. Если False, то выключает # protected для всех файлов блокирует все операции с CONTENTS и ._cfg. self.dbpkg = dbpkg # Флаг, указывающий, что файл является PROTECTED. self.protected = False # Временный флаг для определения того, является ли шаблон userspace. self.is_userspace = False self.format_class = None if self.parameters.run or self.parameters.exec: # Если есть параметр run или exec, то кроме текста шаблона ничего # не нужно. return if self.parameters.append in {'join', 'before', 'after', 'replace'}: # Получаем класс соответствующего формата файла. if self.parameters.format: self.format_class = ParametersProcessor.\ available_formats[self.parameters.format] elif self.template_type is FILE: # TODO Здесь будет детектор форматов. Когда-нибудь. raise TemplateExecutorError("'format' parameter is not set" " file template.") # Если по этому пути что-то есть -- проверяем конфликты. if os.path.exists(target_file_path): for file_type, checker in self.type_checks.items(): if checker(target_file_path): self.target_type = file_type break self.target_is_link = os.path.islink(target_file_path) # Если установлен параметр mirror и есть параметр source, # содержащий несуществующий путь -- удаляем целевой файл. if self.parameters.source is True and self.parameters.mirror: self.remove_original = True else: if self.parameters.mirror: raise TemplateExecutorError("target file does not exist, while" " 'mirror' parameter is set") self.target_type = None if self.format_class is not None and self.format_class.EXECUTABLE: # Если формат исполняемый -- проверяем, существует ли директория, # из которой будет выполняться шаблон. if not os.path.exists(self.target_path): # Если не существует -- создаем ее. os.makedirs(self.target_path) elif os.path.isfile(self.target_path): # Если вместо директории файл -- определяем по файлу # директорию. self.target_path = os.path.dirname(self.target_path) # Если есть параметр package, определяем по нему пакет. if self.parameters.package: self.target_package_name = self.parameters.package if (self.target_package is None or self.target_package.package_name != self.target_package_name): self.target_package = Package(self.parameters.package, chroot_path=self.chroot_path) return self._check_type_conflicts() self._check_package_collision() self._check_user_changes() def _check_type_conflicts(self) -> None: '''Метод для проверки конфликтов типов.''' if self.parameters.append == 'link': if self.parameters.force: self.remove_original = True elif self.target_is_link: if self.template_type != self.target_type: raise TemplateTypeConflict( "the target is a link to {} while the template" "is {} and has append = 'link'". format('directory' if self.template_type == DIR else 'file', 'file' if self.template_type == DIR else 'directory')) else: self.remove_original = True elif self.target_type == DIR: raise TemplateTypeConflict("the target is a directory while " "the template has append = 'link'") elif self.target_type == FILE: raise TemplateTypeConflict("the target is a file while the" " template has append = 'link'") elif self.template_type == DIR: if self.target_type == FILE: if self.parameters.force: self.remove_original = True else: raise TemplateTypeConflict("the target is a file while the" " template is a directory") elif self.target_is_link: if self.parameters.force: self.remove_original = True else: try: link_source = check_directory_link( self.target_path, chroot_path=self.chroot_path) self.target_path = link_source self.target_path_is_changed = True except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.template_type == FILE: if self.parameters.force: if self.target_type == DIR: self.remove_original = True elif self.target_is_link and self.target_type == FILE: try: link_source = read_link(self.target_path) self.target_path = get_target_from_link( self.target_path, link_source, chroot_path=self.chroot_path) self.target_path_is_changed = True except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.target_is_link: if self.target_type == DIR: raise TemplateTypeConflict("the target file is a link to a" " directory while the template" " is a file") else: raise TemplateTypeConflict("the target file is a link to" " a file while the template" " is a file") elif self.target_type == DIR: raise TemplateTypeConflict("the target file is a directory" " while the template is a file") def _check_package_collision(self) -> None: '''Метод для проверки на предмет коллизии, то есть конфликта пакета шаблона и целевого файла.''' if self.parameters.package: parameter_package = self.parameters.package else: parameter_package = None if self.target_type is not None: try: file_package = self.package_atom_parser.get_file_package( self.target_path) except PackageNotFound: file_package = None self.target_without_package = True else: file_package = None # Если для шаблона и целевого файла никаким образом не удается # определить пакет и есть параметр append -- шаблон пропускаем. if parameter_package is None and file_package is None: if self.parameters.append and self.parameters.append != 'skip': raise TemplateCollisionError( "'package' parameter is not defined for" " template with 'append' parameter.") else: return elif parameter_package is None: self.target_package_name = file_package elif file_package is None: self.target_package_name = parameter_package elif file_package != parameter_package and self.template_type != DIR: raise TemplateCollisionError(( "The template package is {0} while target" " file package is {1}").format( parameter_package.atom, file_package.atom )) else: self.target_package_name = parameter_package if (self.target_package is None or self.target_package_name != self.target_package.package_name): self.target_package = Package(self.target_package_name, chroot_path=self.chroot_path) def _check_user_changes(self) -> None: '''Метод для проверки наличия пользовательских изменений в конфигурационных файлах.''' # Эта проверка только для файлов. if self.template_type != FILE: return # Проверим, является ли файл защищенным. # Сначала проверяем по переменной CONFIG_PROTECT. if self.dbpkg: for protected_path in self._protected_set: if self.target_path.startswith(protected_path): self.protected = True break # Затем по переменной CONFIG_PROTECT_MASK. for unprotected_path in self._unprotected_set: if self.target_path.startswith(unprotected_path): self.protected = False break else: self.protected = False # Собираем список имеющихся ._cfg файлов. cfg_pattern = os.path.join(os.path.dirname(self.target_path), "._cfg????_{}".format( os.path.basename(self.target_path))) self.cfg_list = glob.glob(cfg_pattern) self.cfg_list.sort() # Путь к архивной версии файла. self.archive_path = self._get_archive_path(self.target_path) self.md5_matching = (self.parameters.autoupdate or self.parameters.force) if not self.protected: self.md5_matching = True elif self.parameters.unbound: # Если присутствует unbound, то просто модифицируем файл и # удаляем его из CONTENTS. self.md5_matching = True elif self.target_type is None: # Если целевой файл отсутствует. if self.target_path in self.target_package: # Проверка -- был ли файл удален. # self.md5_matching = self.md5_matching pass else: self.md5_matching = True elif self.target_without_package: # Если файл по целевому пути не относится к какому-либо пакету. # self.md5_matching = False pass elif not self.md5_matching: # Если файл есть и он относится к текущему пакету. # Если по каким-то причинам уже нужно считать, что хэш-суммы # совпадают -- в дальнейшей проверке нет необходимости. target_md5 = self.target_package.get_md5(self.target_path) self.md5_matching = self.target_package.is_md5_equal( self.target_path, file_md5=target_md5) # Если по целевому пути файл не относящийся к какому-либо пакету и # присутствует параметр autoupdate -- удаляем этот файл. if (self.target_without_package and (self.parameters.autoupdate or self.parameters.force)): self.remove_original = True # Определяем пути входных и выходных файлов. if self.md5_matching: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source elif self.cfg_list and not self.parameters.unbound: self.input_path = self.archive_path else: self.input_path = self.target_path self.output_path = self.target_path else: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source else: self.input_path = self.archive_path self.output_path = self._get_cfg_path(self.target_path) def _get_archive_path(self, file_path: str) -> str: '''Метод для получения пути к архивной версии указанного файла.''' if self.chroot_path != "/" and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return join_paths(self.config_archive_path, file_path) def _get_cfg_path(self, file_path: str) -> str: '''Метод для получения пути для создания нового ._cfg????_ файла.''' if self.cfg_list: last_cfg_name = os.path.basename(self.cfg_list[-1]) slice_value = len('._cfg') cfg_number = int(last_cfg_name[slice_value: slice_value + 4]) else: cfg_number = 0 cfg_number = str(cfg_number + 1) if len(cfg_number) < 4: cfg_number = '0' * (4 - len(cfg_number)) + cfg_number new_cfg_name = "._cfg{}_{}".format(cfg_number, os.path.basename(file_path)) new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name) return new_cfg_path def remove_from_contents(self) -> None: '''Метод для удаления целевого файла из CONTENTS.''' if self.target_package is None: return if self.template_type == DIR: self.target_package.remove_dir(self.target_path) elif self.template_type == FILE: self.target_package.remove_obj(self.target_path) def clear_dir_contents(self) -> None: '''Метод для удаления из CONTENTS всего содержимого директории после применения append = "clear".''' if self.template_type == DIR and self.target_package is not None: self.target_package.clear_dir(self.target_path) def add_to_contents(self, file_md5=None) -> None: '''Метод для добавления целевого файла в CONTENTS.''' if self.target_package is None: return # В подавляющем большинстве случаев берем хэш-сумму из выходного файла, # но если по какой-то причине выходного файла нет -- пытаемся # по целевому. Такое поведение маловероятно, но, наверное, стоит # учесть возможность такой ситуации. if os.path.exists(self.output_path): source_path = self.output_path else: source_path = self.target_path if self.parameters.append == 'link': self.target_package.add_sym(source_path, self.parameters.source) elif self.template_type == DIR: self.target_package.add_dir(source_path) elif self.template_type == FILE: self.target_package.add_obj(source_path, file_md5=file_md5) def update_contents_from_list(self, changed_list: dict) -> None: '''Метод для изменения CONTENTS по списку измененных файлов.''' if self.target_package is None: return for file_path, mode in changed_list.items(): if mode == "modify": if os.path.islink(file_path): self.target_package.add_sym(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) elif os.path.isfile(file_path): self.target_package.add_obj(file_path) elif mode == "remove": if os.path.islink(file_path) or os.path.isfile(file_path): self.target_package.remove_obj(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) @classmethod def _set_protected(cls, chroot_path: str) -> None: '''Метод для получения множества защищенных директорий.''' cls._protected_set = set() cls._unprotected_set = set() cls._protected_set.add(join_paths(chroot_path, '/etc')) config_protect_env = os.environ.get('CONFIG_PROTECT', False) if config_protect_env: for protected_path in config_protect_env.split(): protected_path = join_paths(chroot_path, protected_path.strip()) cls._protected_set.add(protected_path) config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False) if config_protect_mask_env: for unprotected_path in config_protect_mask_env.split(): unprotected_path = join_paths(chroot_path, unprotected_path.strip()) cls._unprotected_set.add(unprotected_path) cls._protected_is_set = True def save_changes(self) -> None: '''Метод для сохранения изменений внесенных в CONTENTS.''' if self.target_package: self.target_package.remove_empty_directories() self.target_package.write_contents_file() class TemplateExecutor: '''Класс исполнительного модуля.''' def __init__(self, datavars_module=Variables(), chroot_path='/', cl_config_archive='/var/lib/calculate/config-archive', cl_config_path='/var/lib/calculate/config', execute_archive_path='/var/lib/calculate/.execute/', dbpkg=True): # TODO добавить список измененных файлов. self.datavars_module = datavars_module self.chroot_path = chroot_path # Объект для проверки файловых систем. Пока не инициализируем. self.mounts = None # Директория для хранения полученных при обработке exec скриптов. self.execute_archive_path = execute_archive_path self.execute_files = OrderedDict() self.dbpkg = dbpkg # Список целевых путей измененных файлов. Нужен для корректиного # формирования calculate-заголовка. self.processed_targets = [] self.directory_default_parameters =\ ParametersProcessor.directory_default_parameters self.file_default_parameters =\ ParametersProcessor.file_default_parameters # Отображение имен действий для директорий на методы их реализующие. self.directory_appends = {'join': self._append_join_directory, 'remove': self._append_remove_directory, 'skip': self._append_skip_directory, 'clear': self._append_clear_directory, 'link': self._append_link_directory, 'replace': self._append_replace_directory} # Отображение имен действий для файлов на методы их реализующие. self.file_appends = {'join': self._append_join_file, 'after': self._append_after_file, 'before': self._append_before_file, 'replace': self._append_replace_file, 'remove': self._append_remove_file, 'skip': self._append_skip_file, 'clear': self._append_clear_file, 'link': self._append_link_file} self.formats_classes = ParametersProcessor.available_formats self.calculate_config_file = CalculateConfigFile( cl_config_path=cl_config_path, cl_chroot_path=chroot_path) self.cl_config_archive_path = cl_config_archive Format.CALCULATE_VERSION = CALCULATE_VERSION @property def available_appends(self) -> set: '''Метод для получения множества возможных значений append.''' appends_set = set(self.directory_appends.keys()).union( set(self.file_appends.keys())) return appends_set def execute_template(self, target_path: str, parameters: ParametersContainer, template_type: int, template_path: str, template_text='', save_changes=True, target_package=None) -> dict: '''Метод для запуска выполнения шаблонов.''' # Словарь с данными о результате работы исполнительного метода. self.executor_output = {'target_path': None, 'stdout': None, 'stderr': None} if parameters.append == 'skip': return self.executor_output try: template_object = TemplateWrapper( target_path, parameters, template_type, template_path, template_text=template_text, target_package=target_package, chroot_path=self.chroot_path, config_archive_path=self.cl_config_archive_path, dbpkg=self.dbpkg) except TemplateTypeConflict as error: raise TemplateExecutorError("type conflict: {}".format(str(error))) except TemplateCollisionError as error: raise TemplateExecutorError("collision: {}".format(str(error))) # Удаляем оригинал, если это необходимо из-за наличия force или по # другим причинам. if template_object.remove_original: if template_object.target_type == DIR: self._remove_directory(template_object.target_path) else: self._remove_file(template_object.target_path) if self.dbpkg: template_object.remove_from_contents() template_object.target_type = None # Если был включен mirror, то после удаления файла завершаем # выполнение шаблона. if template_object.parameters.mirror: if save_changes: template_object.save_changes() return template_object.target_type = None if template_object.parameters.run: # Если есть параметр run -- запускаем текст шаблона. self._run_template(template_object) elif template_object.parameters.exec: # Если есть параметр exec -- запускаем текст шаблона после # обработки всех шаблонов. self._exec_template(template_object) elif template_object.parameters.append: if template_object.template_type == DIR: self.directory_appends[template_object.parameters.append]( template_object) elif template_object.template_type == FILE: self.file_appends[template_object.parameters.append]( template_object) # Сохраняем изменения в CONTENTS внесенные согласно шаблону. if save_changes: template_object.save_changes() # Возвращаем целевой путь, если он был изменен, или # None если не был изменен. if template_object.target_path_is_changed: self.executor_output['target_path'] =\ template_object.target_path return self.executor_output def save_changes(self): '''Метод для сохранения чего-нибудь после выполнения всех шаблонов.''' # Пока сохраняем только получившееся содержимое config-файла. self.calculate_config_file.save_changes() def _append_join_directory(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия для append = "join", если шаблон -- директория. Создает директорию, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) if self.dbpkg: template_object.add_to_contents() def _append_remove_directory(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия для append = "remove", если шаблон -- директория. Удаляет директорию со всем содержимым, если она есть.''' if template_object.target_type is not None: self._remove_directory(template_object.target_path) if self.dbpkg: template_object.remove_from_contents() def _append_skip_directory(self, template_object: TemplateWrapper) -> None: pass def _append_clear_directory(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия для append = "clear", если шаблон -- директория. Удаляет все содержимое директории, если она есть.''' if template_object.target_type is not None: self._clear_directory(template_object.target_path) # Меняем права и владельца очищенной директории, если это # необходимо. if template_object.parameters.chmod: self._chmod_directory(template_object.target_path, template_object.parameters.chmod) if template_object.parameters.chown: self._chown_directory(template_object.target_path, template_object.parameters.chown) if self.dbpkg: template_object.clear_dir_contents() def _append_link_directory(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия для append = "link", если шаблон -- директория. Создает ссылку на директорию, если она есть.''' self._link_directory(template_object.parameters.source, template_object.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_directory(template_object.parameters.source, template_object.parameters.chmod) if template_object.parameters.chown: self._chown_directory(template_object.parameters.source, template_object.parameters.chown) if self.dbpkg: template_object.add_to_contents() def _append_replace_directory(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия для append = "replace", если шаблон -- директория. Очищает директорию или создает, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) if self.dbpkg: template_object.add_to_contents() else: self._clear_directory(template_object.target_path) if self.dbpkg: template_object.clear_dir_contents() def _append_join_file(self, template_object: TemplateWrapper, join_before=False, replace=False) -> None: '''Метод описывающий действия при append = "join", если шаблон -- файл. Объединяет шаблон с целевым файлом.''' input_path = template_object.input_path output_path = template_object.output_path template_format = template_object.format_class # Задаемся значениями chmod и chown в зависимости от наличия или # отсутствия файла, принадлежности его пакету и наличия значений # параметров по умолчанию. chmod = template_object.parameters.chmod if not chmod: if (template_object.target_type is not None and not template_object.target_without_package): chmod = self._get_file_mode(template_object.target_path) else: chmod = self.file_default_parameters.get('chmod', False) chown = template_object.parameters.chown if not chown: if (template_object.target_type is not None and not template_object.target_without_package): chown = self._get_file_owner(template_object.target_path) else: chown = self.file_default_parameters.get('chown', False) if template_format.EXECUTABLE or template_object.md5_matching: # Действия при совпадении md5 из CONTENTS и md5 целевого файла. # А также если шаблон просто исполнительный. output_paths = [output_path] # Если целевой файл защищен, а шаблон не userspace. if template_object.protected and not template_object.is_userspace: # Тогда также обновляем архив. output_paths.append(template_object.archive_path) if template_object.target_type is not None and not replace: # Если целевой файл есть и нет параметра replace -- используем # текст целевого файла. if (not input_path.startswith(self.cl_config_archive_path) or os.path.exists(input_path)): # Если входной файл просто не из архива, или из архива и # при этом существует -- используем его with open(input_path, 'r') as input_file: input_text = input_file.read() else: # В противном случае используем пустой файл. (!) input_text = '' else: input_text = '' parsed_template = template_format(template_object.template_text, template_object.template_path, ignore_comments=True) if not template_object.format_class.EXECUTABLE: # Если шаблон не исполнительный разбираем входной текст. parsed_input = template_format( input_text, template_object.template_path, add_header=True, join_before=join_before, already_changed=(template_object.target_path in self.processed_targets), parameters=template_object.parameters) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() for save_path in output_paths: if not os.path.exists(os.path.dirname(save_path)): self._create_directory( template_object, path_to_create=os.path.dirname(save_path)) with open(save_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца всех сохраняемых файлов, # если это необходимо. if chown: self._chown_file(save_path, chown) if chmod: self._chmod_file(save_path, chmod) if self.dbpkg: # Убираем все ._cfg файлы. if template_object.cfg_list: for cfg_file_path in template_object.cfg_list: self._remove_file(cfg_file_path) # Убираем целевой файл из CL. self.calculate_config_file.remove_file( template_object.target_path) # Обновляем CONTENTS. if template_object.protected: if template_object.parameters.unbound: template_object.remove_from_contents() else: template_object.add_to_contents( file_md5=output_text_md5) else: changed_files = parsed_template.execute_format( input_text=input_text, target_path=template_object.target_path) # Удаляем форматный объект входного файла. del(parsed_template) # Если исполняемый формат выдал список измененных файлов для # изменения CONTENTS и при этом задан пакет -- обновляем # CONTENTS. if (self.dbpkg and changed_files and template_object.target_package): template_object.update_contents_from_list(changed_files) else: if template_object.target_type is not None and not replace: if (not input_path.startswith(self.cl_config_archive_path) or os.path.exists(input_path)): with open(input_path, 'r') as input_file: input_text = input_file.read() else: input_text = '' else: input_text = '' parsed_input = template_format( input_text, template_object.template_path, add_header=True, join_before=join_before, already_changed=False, parameters=template_object.parameters) parsed_template = template_format(template_object.template_text, template_object.template_path, ignore_comments=True) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() if not self.calculate_config_file.compare_md5( template_object.target_path, output_text_md5): if not os.path.exists(os.path.dirname(output_path)): self._create_directory( template_object, path_to_create=os.path.dirname(output_path)) with open(output_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца ._cfg????_ файлов, если # это необходимо. if chown: self._chown_file(output_path, chown) if chmod: self._chmod_file(output_path, chmod) # Обновляем CL. self.calculate_config_file.set_files_md5( template_object.target_path, output_text_md5) # Обновляем CONTENTS. template_object.add_to_contents(file_md5=output_text_md5) else: # Действия если CL совпало. Пока ничего не делаем. pass def _append_after_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в конец файла и в конец каждой секции файла.''' self._append_join_file(template_object, join_before=False) def _append_before_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в начало файла и в начало каждой секции файла.''' self._append_join_file(template_object, join_before=True) def _append_skip_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "skip". Пока никаких действий.''' pass def _append_replace_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "replace", если шаблон -- файл. Очищает файл и затем накладывает на него шаблон.''' self._append_join_file(template_object, replace=True) def _append_remove_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "remove", если шаблон -- файл. Удаляет файл.''' if template_object.target_type is not None: self._remove_file(template_object.target_path) if self.dbpkg: template_object.remove_from_contents() def _append_clear_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "clear", если шаблон -- файл. Очищает файл.''' if template_object.target_type is not None: self._clear_file(template_object.target_path) # Меняем владельца и права доступа к очищенному файлу, если нужно. if template_object.parameters.chown: self._chown_file(template_object.target_path, template_object.parameters.chown) if template_object.parameters.chmod: self._chmod_file(template_object.target_path, template_object.parameters.chmod) if self.dbpkg: template_object.add_to_contents() def _append_link_file(self, template_object: TemplateWrapper) -> None: '''Метод описывающий действия при append = "link", если шаблон -- файл. Создает ссылку на файл, указанный в параметре source.''' self._link_file(template_object.parameters.source, template_object.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_file(template_object.parameters.source, template_object.parameters.chmod) if template_object.parameters.chown: self._chown_file(template_object.parameters.source, template_object.parameters.chown) if self.dbpkg: template_object.add_to_contents() def _create_directory(self, template_object: TemplateWrapper, path_to_create=None) -> None: '''Метод для создания директории и, при необходимости, изменения владельца и доступа все директорий на пути к целевой.''' if path_to_create is None: target_path = template_object.target_path else: target_path = path_to_create template_parameters = template_object.parameters # Если файл есть, но указан chmod или chown -- используем их. if os.access(target_path, os.F_OK): if template_parameters.chmod: self._chmod_directory(target_path, template_parameters.chmod) if template_parameters.chown: self._chown_directory(target_path, template_parameters.chown) return directories_to_create = [target_path] directory_path = os.path.dirname(target_path) # Составляем список путей к директориям, которые нужно создать. while not os.access(directory_path, os.F_OK) and directory_path: directories_to_create.append(directory_path) directory_path = os.path.dirname(directory_path) # получаем информацию о владельце и правах доступа ближайшей # существующей директории. chmod = template_parameters.chmod if not chmod: chmod = self._get_file_mode(directory_path) chown = template_parameters.chown if not chown: chown = self._get_file_owner(directory_path) directories_to_create.reverse() # создаем директории. for create_path in directories_to_create: try: os.mkdir(create_path) # Для каждой созданной директории меняем права и владельца # если это необходимо. if chmod: self._chmod_directory(create_path, chmod) if chown: self._chown_directory(create_path, chown) except OSError as error: raise TemplateExecutorError( 'Failed to create directory: {}, reason: {}'. format(create_path, str(error))) def _remove_directory(self, target_path: str) -> None: '''Метод для удаления директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): try: if os.path.islink(target_path): os.unlink(target_path) else: shutil.rmtree(target_path) return except Exception as error: raise TemplateExecutorError( ("Failed to delete the directory: {0}," " reason: {1}").format(target_path, str(error))) else: error_message = "target file is not directory" else: error_message = "target file does not exist" raise TemplateExecutorError(("Failed to delete the directory: {0}," "reason: {1}").format(target_path, error_message)) def _clear_directory(self, target_path: str) -> None: '''Метод для очистки содержимого целевой директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): # Удаляем все содержимое директории. for node in os.scandir(target_path): if node.is_dir(): self._remove_directory(node.path) else: self._remove_file(node.path) return else: error_message = "target file is not directory" else: error_message = "target directory does not exist" raise TemplateExecutorError(("failed to clear directory: {}," " reason: {}").format(target_path, error_message)) def _link_directory(self, source: str, target_path: str) -> None: '''Метод для создания по целевому пути ссылки на директорию расположенную на пути, указанному в source.''' try: os.symlink(source, target_path, target_is_directory=True) except OSError as error: raise TemplateExecutorError( "failed to create symlink: {0} -> {1}, reason: {2}". format(target_path, source, str(error))) def _remove_file(self, target_path: str) -> None: '''Метод для удаления файлов.''' if os.path.exists(target_path): if os.path.isfile(target_path): if os.path.islink(target_path): try: os.unlink(target_path) return except OSError as error: error_message = str(error) try: os.remove(target_path) return except OSError as error: error_message = str(error) else: error_message = 'target is not a file' elif os.path.islink(target_path): try: os.unlink(target_path) return except OSError as error: error_message = str(error) else: error_message = 'target file does not exist' raise TemplateExecutorError(("failed to remove the file: {0}," "reason: {1}").format(target_path, error_message)) def _clear_file(self, target_path: str) -> None: '''Метод для очистки файлов.''' if os.path.exists(target_path): if os.path.isfile(target_path): try: with open(target_path, 'w') as f: f.truncate(0) return except IOError as error: error_message = str(error) else: error_message = 'target is not a file' else: error_message = 'target file does not exist' raise TemplateExecutorError(("failed to clear the file: {0}," "reason: {1}").format(target_path, error_message)) def _link_file(self, source: str, target_path: str) -> None: '''Метод для создания по целевому пути ссылки на файл расположенный на пути, указанному в source.''' try: os.symlink(source, target_path) except OSError as error: raise TemplateExecutorError( "failed to create symlink to the file: {0} -> {1}, reason: {2}". format(target_path, source, str(error))) def _run_template(self, template_object: TemplateWrapper) -> None: '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в run прямо во время обработки шаблонов.''' text_to_run = template_object.template_text interpreter = template_object.parameters.run if template_object.template_type == FILE: cwd_path = os.path.dirname(template_object.target_path) else: cwd_path = template_object.target_path if not os.path.exists(cwd_path): raise TemplateExecutorError(("can not run template, directory from" " target path does not exist: {}"). format(template_object.target_path)) elif not os.path.isdir(cwd_path): raise TemplateExecutorError(("can not exec template, {} is not a" " directory.").format(cwd_path)) try: run_process = Process(interpreter, cwd=cwd_path) run_process.write(text_to_run) if run_process.readable: stdout = run_process.read() if stdout: self.executor_output['stdout'] = stdout if run_process.readable_errors: stderr = run_process.read_error() if stderr: self.executor_output['stderr'] = stderr except FilesError as error: raise TemplateExecutorError(("can not run template using the" " interpreter '{}', reason: {}"). format(interpreter, str(error))) def _exec_template(self, template_object: TemplateWrapper) -> None: '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в exec после выполнения всех прочих шаблонов. ''' text_to_run = template_object.template_text interpreter = template_object.parameters.exec if template_object.template_type == FILE: cwd_path = os.path.dirname(template_object.target_path) else: cwd_path = template_object.target_path if not os.path.exists(cwd_path): raise TemplateExecutorError( ("can not exec template, directory from" " target path does not exist: {}"). format(cwd_path)) elif not os.path.isdir(cwd_path): raise TemplateExecutorError(("can not exec template, {} is not a" " directory.").format(cwd_path)) # Получаем путь к директории для хранения файлов .execute. if (self.chroot_path != '/' and not self.execute_archive_path.startswith(self.chroot_path)): self.execute_archive_path = join_paths( self.chroot_path, self.execute_archive_path) # Если директория уже существует получаем номер очередного файла для # exec по номеру последнего. exec_number = 0 if not self.execute_files: if os.path.exists(self.execute_archive_path): exec_files_list = os.listdir(self.execute_archive_path) if exec_files_list: exec_number = int(exec_files_list[-1][-4:]) exec_number = str(exec_number + 1) else: exec_number = str(len(self.execute_files) + 1) # Получаем название нового exec_???? файла. if len(exec_number) < 4: exec_number = '0' * (4 - len(exec_number)) + exec_number exec_file_name = 'exec_{}'.format(exec_number) exec_file_path = join_paths(self.execute_archive_path, exec_file_name) exec_file = write_file(exec_file_path) exec_file.write(text_to_run) exec_file.close() self.execute_files[exec_file_path] = {'interpreter': interpreter, 'cwd_path': cwd_path, 'template_path': template_object.template_path} def execute_file(self, interpreter: str, exec_path: str, cwd_path: str) -> dict: """Метод для выполнения скриптов сохраненных в результате обработки шаблонов с параметром exec. Скрипт всегда удаляется вне зависимости от успешности его выполнения.""" exec_output = {'stdout': None, 'stderr': None} with open(exec_path, 'r') as exec_file: script_text = exec_file.read() os.remove(exec_path) try: run_process = Process(interpreter, cwd=cwd_path) run_process.write(script_text) if run_process.readable: stdout = run_process.read() if stdout: exec_output['stdout'] = stdout if run_process.readable_errors: stderr = run_process.read_error() if stderr: exec_output['stderr'] = stderr return exec_output except FilesError as error: raise TemplateExecutorError(("can not run template using the" " interpreter '{}', reason: {}"). format(interpreter, str(error))) def _chown_directory(self, target_path: str, chown_value: dict) -> None: """Метод для смены владельца директории.""" try: if os.path.exists(target_path): os.chown(target_path, chown_value['uid'], chown_value['gid']) else: raise TemplateExecutorError( 'The target directory does not exist: {0}'. format(target_path)) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( chown_value['uid'], chown_value['gid']), str(error))) def _chmod_directory(self, target_path: str, chmod_value: int) -> None: '''Метод для смены прав доступа к директории.''' try: if os.path.exists(target_path): os.chmod(target_path, chmod_value) else: raise TemplateExecutorError( 'The target directory does not exist: {0}'. format(target_path)) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chmod directory: {0}, reason: {1}'. format(target_path, str(error))) def _chown_file(self, target_path: str, chown_value: dict) -> None: '''Метод для смены владельца файла.''' try: if os.path.exists(target_path): os.lchown(target_path, chown_value['uid'], chown_value['gid']) else: raise TemplateExecutorError( 'The target file does not exist: {0}'. format(target_path)) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( chown_value['uid'], chown_value['gid']), str(error))) def _chmod_file(self, target_path: str, chmod_value: int) -> None: '''Метод для смены прав доступа к директории.''' try: if not os.path.exists(target_path): raise TemplateExecutorError( 'The target file does not exist: {0}'. format(target_path)) os.chmod(target_path, chmod_value) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chmod file: {0}, reason: {1}'. format(target_path, str(error))) def _get_file_mode(self, file_path: str) -> int: '''Метод для получения прав доступа для указанного файла.''' if not os.path.exists(file_path): raise TemplateExecutorError( 'The file to get mode does not exist: {0}'. format(file_path)) file_stat = os.stat(file_path) return stat.S_IMODE(file_stat.st_mode) def _get_file_owner(self, file_path: str) -> dict: '''Метод для получения uid и gid значений для владельца указанного файла.''' if not os.path.exists(file_path): raise TemplateExecutorError( 'The file to get owner does not exist: {0}'. format(file_path)) file_stat = os.stat(file_path) return {'uid': file_stat.st_uid, 'gid': file_stat.st_gid} def _translate_uid_gid(self, uid: int, gid: int) -> str: '''Метод для получения из uid и gid имен пользователя и группы при, необходимых для выдачи сообщения об ошибке при попытке chown.''' import pwd import grp try: if self.chroot_path == '/': user_name = pwd.getpwuid(uid).pw_name else: user_name = self._get_user_name_from_uid(uid) except (TypeError, KeyError): user_name = str(uid) try: if self.chroot_path == '/': group_name = grp.getgrgid(gid).gr_name else: group_name = self._get_group_name_form_gid(gid) except (TypeError, KeyError): group_name = str(gid) return '{0}:{1}'.format(user_name, group_name) def _get_user_name_from_uid(self, uid: int) -> str: '''Метод для получения имени пользователя по его uid.''' passwd_file_path = os.path.join(self.chroot_path, 'etc/passwd') passwd_dictionary = dict() if os.path.exists(passwd_file_path): with open(passwd_file_path, 'r') as passwd_file: for line in passwd_file: if line.startswith('#'): continue passwd_line = line.split(':') line_uid = int(passwd_line[2]) line_username = passwd_line[0] if line_uid and line_username: passwd_dictionary[line_uid] = line_username if uid in passwd_dictionary: return passwd_dictionary[uid] else: return str(uid) def _get_group_name_form_gid(self, gid: int) -> str: '''Метод для получения названия группы по его gid.''' group_file_path = os.path.join(self.chroot_path, 'etc/group') group_dictionary = dict() if os.path.exists(group_file_path): with open(group_file_path, 'r') as group_file: for line in group_file: if line.startswith('#'): continue group_line = line.split(':') line_gid = int(group_line[2]) line_group = group_line[0] if line_gid and line_group: group_dictionary[line_gid] = line_group if gid in group_dictionary: return group_dictionary[gid] else: return str(gid) def _check_os_error(self, error: Exception, path_to_check: str) -> True: '''Метод для проверки причины, по которой не удалось изменить владельца или права доступа файла.''' if hasattr(error, 'errno') and error.errno == errno.EPERM: if self._is_vfat(path_to_check): return True return hasattr(error, 'errno') and error.errno == errno.EACCES and\ 'var/calculate/remote' in path_to_check def _is_vfat(self, path_to_check): '''Метод, проверяющий является ли файловая система vfat. Нужно для того, чтобы знать о возможности применения chown, chmod и т.д.''' # Инициализируем объект для проверки примонтированных файловых систем. if self.mounts is None: self.mounts = Mounts() # Проверяем файловую систему на пути. fstab_info = self.mounts.get_from_fstab(what=self.mounts.TYPE, where=self.mounts.DIR, is_in=path_to_check)[0] return fstab_info in {'vfat', 'ntfs-3g', 'ntfs'} class DirectoryTree: '''Класс реализующий дерево каталогов для пакета.''' def __init__(self, base_directory): self.base_directory = base_directory self._tree = {} def update_tree(self, tree: dict) -> None: '''Метод, инициирующий наложение заданного дерева каталогов на данный экземпляр дерева.''' self._update(self._tree, tree) def _update(self, original_tree: dict, tree: dict) -> dict: '''Метод для рекурсивного наложения одного дерева на другое.''' for parent, child in tree.items(): if isinstance(child, abc.Mapping): original_tree[parent] = self._update(original_tree.get(parent, dict()), child) else: original_tree[parent] = child return original_tree def show_tree(self) -> None: pprint(self._tree) def get_directory_tree(self, directory: str): '''Метод для получения нового дерева из ветви данного дерева, соответствующей некоторому каталогу, содержащемуся в корне данного дерева.''' directory_tree = DirectoryTree(os.path.join(self.base_directory, directory)) if directory in self._tree: directory_tree._tree = self._tree[directory] return directory_tree def __getitem__(self, name: str) -> dict: if name in self._tree: return self._tree[name] else: return None def __setitem__(self, name: str, value): self._tree[name] = value def __iter__(self): if self._tree is not None: return iter(self._tree.keys()) else: return iter([]) def __repr__(self) -> str: return ''.format(self._tree) def __bool__(self) -> bool: return bool(self._tree) class DirectoryProcessor: '''Класс обработчика директорий шаблонов.''' def __init__(self, action: str, datavars_module=Variables(), package='', output_module=IOModule(), dbpkg=True): if isinstance(action, list): self.action = action else: self.action = [action] self.output = output_module self.datavars_module = datavars_module # Корневая директория. if 'cl_chroot_path' in datavars_module.main: self.cl_chroot_path = datavars_module.main.cl_chroot_path else: self.cl_chroot_path = '/' self.cl_ignore_files = self._get_cl_ignore_files() # Путь к файлу config с хэш-суммами файлов, для которых уже # предлагались изменения. if 'cl_config_path' in datavars_module.main: self.cl_config_path = self._add_chroot_path( self.datavars_module.main.cl_config_path) else: self.cl_config_path = self._add_chroot_path( '/var/lib/calculate/config') # Путь к директории config-archive для хранения оригинальной ветки # конфигурационных файлов. if 'cl_config_archive' in datavars_module.main: self.cl_config_archive = self._add_chroot_path( self.datavars_module.main.cl_config_archive) else: self.cl_config_archive = self._add_chroot_path( '/var/lib/calculate/config-archive') # Путь к директории .execute для хранения хранения файлов скриптов, # полученных из шаблонов с параметром exec. if 'cl_exec_dir_path' in datavars_module.main: self.cl_exec_dir_path = self._add_chroot_path( self.datavars_module.main.cl_exec_dir_path) else: self.cl_exec_dir_path = self._add_chroot_path( '/var/lib/calculate/.execute/') # Инициализируем исполнительный модуль. self.template_executor = TemplateExecutor( datavars_module=self.datavars_module, chroot_path=self.cl_chroot_path, cl_config_archive=self.cl_config_archive, cl_config_path=self.cl_config_path, execute_archive_path=self.cl_exec_dir_path, dbpkg=dbpkg) # Инициализируем шаблонизатор. self.template_engine = TemplateEngine( datavars_module=self.datavars_module, chroot_path=self.cl_chroot_path, appends_set=self.template_executor.available_appends) # Разбираем atom имя пакета, для которого накладываем шаблоны. self.for_package = False if package: if isinstance(package, PackageAtomName): self.for_package = package elif isinstance(package, str): try: self.for_package = self.template_engine.\ parameters_processor.check_package_parameter(package) except ConditionFailed as error: # ConfitionFailed потому что для проверки значения пакета, # используется тот же метод, что проверяет параметр package # в шаблонах, а в них этот параметр играет роль условия. self.output.set_error(str(error)) return # Получаем список директорий шаблонов. # TODO переменная список. if isinstance(self.datavars_module, (Datavars, NamespaceNode)): var_type = self.datavars_module.main[ 'cl_template_path'].variable_type else: var_type = StringType if var_type is StringType: self.template_paths = (self.datavars_module. main.cl_template_path.split(',')) elif var_type is ListType: self.template_paths = self.datavars_module.main.cl_template_path # Список обработанных пакетов. self.processed_packages = [] # Список пакетов, взятый из значений параметра merge. self.packages_to_merge = [] # Словарь для хранения деревьев директорий для различных пакетов. self.packages_file_trees = {} # Список обработчиков. self.handlers = OrderedDict() def _get_cl_ignore_files(self) -> list: '''Метод для получения из соответствующей переменной списка паттернов для обнаружения игнорируемых в ходе обработки шаблонов файлов.''' if 'cl_ignore_files' in self.datavars_module.main: if isinstance(self.datavars_module, (Datavars, NamespaceNode)): var_type = self.datavars_module.main[ 'cl_ignore_files'].variable_type else: var_type = StringType cl_ignore_files = self.datavars_module.main.cl_ignore_files cl_ignore_files_list = [] if var_type is StringType: for pattern in cl_ignore_files.split(','): cl_ignore_files_list.append(pattern.strip()) elif var_type is ListType: cl_ignore_files_list = cl_ignore_files return cl_ignore_files_list else: return [] def _add_chroot_path(self, path_to_add: str) -> str: '''Метод для добавления корневого пути к заданному пути, если таковой задан и отсутствует в заданном пути.''' if (self.cl_chroot_path != '/' and not path_to_add.startswith(self.cl_chroot_path)): return join_paths(self.cl_chroot_path, path_to_add) else: return path_to_add def process_template_directories(self) -> None: '''Метод для обхода шаблонов, содержащихся в каталогах из main.cl_template.path.''' # Режим заполнения очередей директорий пакетов, необходимых для более # быстрой обработки параметра merge. self.fill_trees = bool(self.for_package) if self.for_package: if self.for_package is NonePackage: package = self.for_package else: package = Package(self.for_package, chroot_path=self.cl_chroot_path) else: package = None for directory_path in self.template_paths: self.base_directory = directory_path.strip() entries = os.scandir(self.base_directory) for node in entries: self.directory_tree = {} self._walk_directory_tree(node.path, self.cl_chroot_path, ParametersContainer(), directory_tree=self.directory_tree, package=package) # Теперь когда дерево заполнено, можно выключить этот режим. self.fill_trees = False if self.for_package: self.output.set_info('Processing packages from merge parameter...') self.processed_packages.append(self.for_package) self._merge_packages() if self.template_executor.execute_files: self._run_exec_files() self.template_executor.save_changes() def _merge_packages(self): '''Метод для выполнения шаблонов относящихся к пакетам, указанным во всех встреченных значениях параметра merge.''' not_merged_packages = [] while self.packages_to_merge: self.for_package = self.packages_to_merge.pop() if self.for_package not in self.packages_file_trees: self.output.set_error( "Error: package '{0}' not found for action{1} '{2}'.". format(self.for_package, 's' if len(self.action) > 1 else '', ', '.join(self.action))) not_merged_packages.append(self.for_package) continue package = Package(self.for_package, chroot_path=self.cl_chroot_path) for directory_name in self.packages_file_trees[self.for_package]: directory_tree = self.packages_file_trees[self.for_package].\ get_directory_tree(directory_name) self._walk_directory_tree(directory_tree.base_directory, self.cl_chroot_path, ParametersContainer(), directory_tree=directory_tree, package=package) self.processed_packages.append(self.for_package) if not_merged_packages: self.output.set_error('Packages {} is not merged.'. format(','.join(self.packages_to_merge))) else: self.output.set_success('All packages are merged.') def _run_exec_files(self): '''Метод для выполнения скриптов, полученных в результате обработки шаблонов с параметром exec.''' for exec_file_path, exec_info in\ self.template_executor.execute_files.items(): try: output = self.template_executor.execute_file( exec_info['interpreter'], exec_file_path, exec_info['cwd_path']) if output['stdout'] is not None: self.output.set_info("stdout from template: {}:\n{}\n". format(exec_info['template_path'], output['stdout'])) if output['stderr'] is not None: self.output.set_error("stderr from template: {}:\n{}\n". format(exec_info['template_path'], output['stderr'])) except TemplateExecutorError as error: self.output.set_error(str(error)) def _get_directories_queue(self, path: str) -> tuple: '''Уже не актуальный метод для построение очередей из путей к шаблонам. Хотя возможно еще пригодится.''' directories_queue = [] for base_dir in self.template_paths: base_dir = base_dir.strip() if path.startswith(base_dir): base_directory = base_dir break while path != base_directory: path, dir_name = os.path.split(path) directories_queue.append(dir_name) return base_directory, directories_queue def _walk_directory_tree(self, current_directory_path: str, current_target_path: str, directory_parameters: ParametersContainer(), directory_tree={}, package=None) -> None: '''Метод для рекурсивного обхода директорий с шаблонами, а также, при необходимости, заполнения деревьев директорий шаблонов, с помощью которых далее выполняются шаблоны пакетов из merge.''' directory_name = os.path.basename(current_directory_path) # Если включено заполнение дерева создаем пустой словарь для сбора # содержимого текущей директории. if self.fill_trees: directory_tree[directory_name] = {} self.template_engine.change_directory(current_directory_path) template_directories, template_files = self._scan_directory( current_directory_path) # обрабатываем в первую очередь шаблон директории. if '.calculate_directory' in template_files: template_files.remove('.calculate_directory') template_text = self._parse_template(directory_parameters, '.calculate_directory', DIR, current_directory_path) if template_text is False: directory_tree = {} return # directory_parameters.print_parameters_for_debug() # Корректируем путь к целевой директории. current_target_path = self._make_target_path(current_target_path, directory_name, directory_parameters) # Если нужно заполнять дерево директорий, отправляем в метод для # проверки параметров package и action текущее дерево. if not self._check_package_and_action( directory_parameters, current_directory_path, directory_tree=(directory_tree if self.fill_trees else None)): # Если проверка не пройдена и включено заполнение дерева, то, # используя нынешнее состояние дерева директорий, обновляем # дерево пакета текущего шаблона директории. if self.fill_trees: self._update_package_tree(directory_parameters.package, directory_tree[directory_name]) # Перед выходом из директории очищаем текущий уровень # дерева. directory_tree = {} return # Если есть параметр merge -- сохраняем присутствующие в нем пакеты # для последующей обработки. if self.for_package and directory_parameters.merge: self.packages_to_merge.extend(directory_parameters.merge) # Если присутствует параметр package -- проверяем, изменился ли он # и был ли задан до этого. Если не был задан или изменился, меняем # текущий пакет ветки шаблонов на этот. if directory_parameters.package: if (package is None or package.package_name != directory_parameters.package): package = Package(directory_parameters.package, chroot_path=self.cl_chroot_path) else: # Если .calculate_directory отсутствует -- создаем директорию, # используя унаследованные параметры и имя самой директории. if not self._check_package_and_action( directory_parameters, current_directory_path, directory_tree=(directory_tree if self.fill_trees else None)): # Обновляем дерево директорий для данного пакета. if self.fill_trees: self._update_package_tree(directory_parameters.package, directory_tree[directory_name]) # Перед выходом из директории очищаем текущий уровень # дерева. directory_tree = {} return # Для того чтобы директория была создана, просто добавляем параметр # append = join. directory_parameters.set_parameter({'append': 'join'}) template_text = '' current_target_path = os.path.join(current_target_path, directory_name) # Выполняем наложение шаблона. current_target_path = self._execute_template( current_target_path, directory_parameters, DIR, current_directory_path, template_text=template_text, package=package) if not current_target_path: directory_tree = {} return # Далее обрабатываем файлы шаблонов хранящихся в директории. # Если в данный момент обходим дерево -- берем список файлов и # директорий из него. if not self.fill_trees and directory_tree: template_directories, template_files =\ self._get_files_and_dirs_from_tree(template_files, template_directories, directory_tree) # Просто псевдоним, чтобы меньше путаницы было далее. template_parameters = directory_parameters # Обрабатываем файлы шаблонов. for template_name in template_files: # Удаляем все параметры, которые не наследуются и используем # полученный контейнер для сбора параметров файлов шаблонов. template_parameters.remove_not_inheritable() template_path = os.path.join(current_directory_path, template_name) # Применяем к файлу шаблона шаблонизатор. template_text = self._parse_template(template_parameters, template_name, FILE, current_directory_path) if template_text is False: continue # template_parameters.print_parameters_for_debug() # Если находимся на стадии заполнения дерева директорий -- # проверяем параметры package и action с заполнением дерева. if not self._check_package_and_action( template_parameters, template_path, directory_tree=(directory_tree[directory_name] if self.fill_trees else None)): continue # Если есть параметр merge добавляем его содержимое в список # пакетов для последующей обработки. if self.for_package and template_parameters.merge: self.packages_to_merge.extend(template_parameters.merge) # Корректируем путь к целевому файлу. target_file_path = self._make_target_path(current_target_path, template_name, template_parameters) # Создаем объект пакета для файлов шаблонов этой директории. template_package = package if template_parameters.package: if (template_package is None or package.package_name != directory_parameters.package): template_package = Package(directory_parameters.package, chroot_path=self.cl_chroot_path) # Выполняем действия, указанные в шаблоне. target_file_path = self._execute_template( target_file_path, template_parameters, FILE, template_path, template_text=template_text, package=template_package) if not target_file_path: continue # * * * ПРИДУМАТЬ ОПТИМИЗАЦИЮ * * * # TODO Потому что накладывать дерево каждый раз, когда обнаружены # файлы какого-то пакета не рационально. # Обновляем дерево директорий для данного пакета, если происходит # его заполнение. if self.fill_trees: self._update_package_tree(template_parameters.package, directory_tree[directory_name]) directory_tree[directory_name] = {} # Проходимся далее по директориям. for directory in template_directories: if self.fill_trees: self._walk_directory_tree( directory, current_target_path, directory_parameters.get_inheritables(), directory_tree=directory_tree[directory_name], package=package) directory_tree[directory_name] = {} else: if isinstance(directory, DirectoryTree): # Если директории взяты из дерева -- путь к директории # соответствует корню каждой взятой ветви дерева. directory_path = directory.base_directory else: directory_path = directory self._walk_directory_tree( directory_path, current_target_path, directory_parameters.get_inheritables(), package=package) if self.fill_trees: directory_tree = {} return def _scan_directory(self, directory_path: str) -> tuple: '''Метод для получения и фильтрования списка файлов и директорий, содержащихся в директории шаблонов.''' template_files = [] template_directories = [] entries = os.scandir(directory_path) for node in entries: if not self._check_file_name(node.name): continue if node.is_symlink(): self.output.set_warning( 'symlink: {0} is ignored in the template directory: {1}'. format(node.path, directory_path)) continue elif node.is_dir(): template_directories.append(node.path) elif node.is_file(): template_files.append(node.name) return template_directories, template_files def _check_file_name(self, filename: str) -> bool: '''Метод для проверки соответствия имени файла содержимому переменной main.cl_ignore_files.''' for pattern in self.cl_ignore_files: if fnmatch.fnmatch(filename, pattern): return False return True def _get_files_and_dirs_from_tree(self, template_files, template_directories, directory_tree: DirectoryTree): '''Метод для получения списков файлов и директорий из дерева директорий.''' tree_files = [] tree_directories = [] for template in directory_tree: if template in template_files: tree_files.append(template) else: next_directory_tree =\ directory_tree.get_directory_tree(template) tree_directories.append(next_directory_tree) return tree_directories, tree_files def _make_target_path(self, target_path, template_name, parameters): '''Метод для получения пути к целевому файлу с учетом наличия параметров name, path и append = skip.''' # Если есть параметр name -- меняем имя шаблона. if parameters.name: template_name = parameters.name # Если для шаблона задан путь -- меняем путь к директории шаблона. if parameters.path: target_path = join_paths(self.cl_chroot_path, parameters.path) # Если параметр append не равен skip -- добавляем имя шаблона к # целевому пути. if not parameters.append == 'skip': target_path = os.path.join(target_path, template_name) return target_path def _parse_template(self, parameters, template_name, template_type, template_directory): '''Метод для разбора шаблонов, получения значений их параметров и их текста после отработки шаблонизитора.''' if template_type == DIR: template_path = template_directory else: template_path = join_paths(template_directory, template_name) try: self.template_engine.process_template(template_name, template_type, parameters=parameters) return self.template_engine.template_text except ConditionFailed as error: self.output.set_warning('{0}. Template: {1}'. format(str(error), template_path)) return False except Exception as error: self.output.set_error('Template error: {0} Template: {1}'. format(str(error), template_path)) return False def _execute_template(self, target_path, parameters, template_type, template_path, template_text='', package=None): '''Метод для наложения шаблонов и обработки информации полученной после наложения.''' try: output = self.template_executor.execute_template( target_path, parameters, template_type, template_path, template_text=template_text, target_package=package) # Если во время выполнения шаблона был изменен целевой путь, # например, из-за ссылки на директорию в source -- обновляем # целевой путь. if output['target_path'] is not None: target_path = output['target_path'] # Если есть вывод от параметра run -- выводим как info. if output['stdout'] is not None: self.output.set_info("stdout from template: {}:\n{}\n".format( template_path, output['stdout'])) # Если есть ошибки от параметра run -- выводим их как error. if output['stderr'] is not None: self.output.set_error("stderr from template: {}:\n{}\n". format(template_path, output['stderr'])) # Если run выполнен с ошибками -- пропускаем директорию. return False except TemplateExecutorError as error: self.output.set_error('Template execution error: {} Template: {}'. format(str(error), template_path)) return False if template_type == DIR: self.output.set_success('Processed directory: {}'. format(template_path)) else: self.output.set_success('Processed template: {}'. format(template_path)) return target_path def _update_package_tree(self, package, current_level_tree): '''Метод для обновления деревьев директорий пакетов, необходимых для обработки шаблонов пакетов из значения параметра merge.''' # Если текущему уровню соответствует заглушка None или он содержит # файлы, то есть не пустой -- тогда есть смысл обновлять. if current_level_tree is None or current_level_tree: if package in self.packages_file_trees: # Если для данного пакета уже есть дерево -- # накладываем на него текущее. self.packages_file_trees[package].update_tree( copy.deepcopy(self.directory_tree) ) else: # Если для данного пакета еще нет дерева -- # копируем для него текущее. directory_tree = DirectoryTree(self.base_directory) directory_tree.update_tree( copy.deepcopy(self.directory_tree)) self.packages_file_trees[package] = directory_tree def _check_package_and_action(self, parameters, template_path, directory_tree=None): '''Метод для проверки параметров action и package во время обработки каталогов с шаблонами. Если среди аргументов указано также дерево каталогов, то в случае несовпадения значений package для файла или директории, им в дереве присваивается значение None.''' if parameters.append != 'skip' or parameters.action: if not parameters.action: self.output.set_warning( ("Action parameter is not set for template:" " {0}").format(template_path)) return False elif parameters.action not in self.action: self.output.set_warning( ("Action parameter value '{0}' does not match its" " current value{1} '{2}'. Template: {3}").format( parameters.action, 's' if len(self.action) > 1 else '', ', '.join(self.action), template_path)) return False if parameters.handler: self.handlers.update({parameters.handler: template_path}) return False if self.for_package: if not parameters.package: if self.for_package is not NonePackage: self.output.set_warning( "'package' parameter is not defined. Template: {}". format(template_path)) elif parameters.package != self.for_package: if directory_tree is not None: template_name = os.path.basename(template_path) directory_tree[template_name] = None self.output.set_warning( ("'package' parameter value '{0}' does not " "match its current target package '{1}'. " "Template: {2}"). format(parameters.package.atom, self.for_package.atom, template_path) ) return False return True