# vim: fileencoding=utf-8 # from pprint import pprint from ..utils.package import PackageAtomParser, Package, PackageNotFound from ..utils.files import join_paths, write_file, read_file_lines, FilesError,\ check_directory_link, read_link, Process,\ get_target_from_link from .template_engine import TemplateEngine, Variables, ConditionFailed,\ ParametersProcessor, DIR, FILE,\ ParametersContainer from ..utils.io_module import IOModule from collections import OrderedDict, abc from ..utils.mount import Mounts import hashlib import fnmatch import shutil import stat import glob import copy import re import os class TemplateExecutorError(Exception): pass class TemplateTypeConflict(Exception): pass class TemplateCollisionError(Exception): pass class CalculateConfigFile: '''Класс для работы с файлом /var/lib/calculate/config.''' def __init__(self, cl_config_path='/var/lib/calculate/config', cl_chroot_path='/'): self.chroot_path = cl_chroot_path self.cl_config_path = cl_config_path self._config_dictionary = self._get_cl_config_dictionary() self._unsaved_changes = False def __contains__(self, file_path): file_path = self._remove_chroot(file_path) return file_path in self._config_dictionary def _get_cl_config_dictionary(self): '''Метод для загрузки словаря файла /var/lib/calculate/config.''' config_dictionary = OrderedDict() if os.path.exists(self.cl_config_path): if os.path.isdir(self.cl_config_path): raise TemplateExecutorError( "directory instead calculate config file in: {}". format(self.cl_config_path)) else: write_file(self.cl_config_path).close() return config_dictionary try: config_file_lines = read_file_lines(self.cl_config_path) except FilesError as error: raise TemplateExecutorError( "cannot read calculate config file in: {0}. Reason: {1}". format(self.cl_config_path, str(error))) # Продумать проверку корректности найденного файла. for file_line in config_file_lines: filename, md5_sum = file_line.split(' ') config_dictionary.update({filename: md5_sum}) self._unsaved_changes = False return config_dictionary def set_files_md5(self, file_path, file_md5): '''Метод для установки в config соответствия файла некоторой контрольной сумме.''' file_path = self._remove_chroot(file_path) self._config_dictionary[file_path] = file_md5 self._unsaved_changes = True def remove_file(self, file_path): '''Метод для удаления файла из config.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: self._config_dictionary.pop(file_path) self._unsaved_changes = True def compare_md5(self, file_path, file_md5): '''Метод для сравнения хэш-суммы из config и некоторой заданной.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: return self._config_dictionary[file_path] == file_md5 else: return False def save_changes(self): '''Метод для записи изменений, внессенных в файл config.''' if not self._unsaved_changes: return config_file = write_file(self.cl_config_path) for file_name, file_md5 in self._config_dictionary.items(): config_file.write('{} {}\n'.format(file_name, file_md5)) config_file.close() self._unsaved_changes = False def _remove_chroot(self, file_path): '''Метод для удаления корневого пути из указанного пути.''' if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return file_path class TemplateWrapper: '''Класс связывающий шаблон с целевым файлом и определяющий параметры наложения шаблона, обусловленные состоянием целевого файла.''' type_checks = {DIR: os.path.isdir, FILE: os.path.isfile} _protected_is_set = False _protected_set = set() _unprotected_set = set() def __new__(cls, *args, **kwargs): if not cls._protected_is_set: # Устанавливаем значения PROTECTED, если не заданы. if 'chroot_path' in kwargs: chroot_path = kwargs['chroot_path'] else: chroot_path = '/' cls._set_protected(chroot_path) return super().__new__(cls) def __init__(self, target_file_path, parameters, template_type, template_text='', chroot_path='/', config_archive_path='/var/lib/calculate/config-archive'): self.target_path = target_file_path self.chroot_path = chroot_path self.config_archive_path = config_archive_path self.target_package_name = None self.package_atom_parser = PackageAtomParser( chroot_path=self.chroot_path) # Вспомогательный флаг, включается, если по целевому пути лежит файл, # для которого не определился никакой пакет. self.target_without_package = False self.parameters = parameters self.output_path = self.target_path self.input_path = self.parameters.source self.template_type = template_type self.template_text = template_text # Флаг, указывающий, что нужно удалить файл из target_path перед # применением шаблона. self.remove_original = False # Флаг, указывающий, что целевой путь был изменен. self.target_path_is_changed = False # Флаг, указывающий, что файл по целевому пути является ссылкой. self.target_is_link = False # Пакет, к которому относится файл. self.target_package = None # Флаг, указывающий, что файл является PROTECTED. self.protected = False # Временный флаг для определения того, является ли шаблон userspace. self.is_userspace = False self.format_class = None if self.parameters.run or self.parameters.exec: # Если есть параметр run или exec, то кроме текста шаблона ничего # не нужно. return if self.parameters.append in {'join', 'before', 'after'}: # Получаем класс соответствующего формата файла. if self.parameters.format: self.format_class = ParametersProcessor.\ available_formats[self.parameters.format] else: # Здесь будет детектор форматов. pass # Если по этому пути что-то есть -- проверяем конфликты. if os.path.exists(target_file_path): for file_type, checker in self.type_checks.items(): if checker(target_file_path): self.target_type = file_type break self.target_is_link = os.path.islink(target_file_path) # Если установлен параметр mirror и есть параметр source, # содержащий несуществующий путь -- удаляем целевой файл. if self.parameters.source is True and self.parameters.mirror: self.remove_original = True else: if self.parameters.mirror: raise TemplateExecutorError("target file does not exist, while" " 'mirror' parameter is set") self.target_type = None if self.format_class is not None and self.format_class.EXECUTABLE: # Если формат исполняемый -- проверяем, существует ли директория, # из которой будет выполняться шаблон. if not os.path.exists(self.target_path): # Если не существует -- создаем ее. os.makedirs(self.target_path) elif os.path.isfile(self.target_path): # Если вместо директории файл -- определяем по файлу # директорию. self.target_path = os.path.dirname(self.target_path) # Если есть параметр package, определяем по нему пакет. if self.parameters.package: self.target_package_name = self.parameters.package self.target_package = Package(self.parameters.package, chroot_path=self.chroot_path) return self.check_conflicts() self.check_package_collision() # Если целью является файл -- проверяем наличие ._cfg0000_filename # файлов. if self.target_type is FILE: self._cfg_pattern = os.path.join( os.path.dirname(self.target_path), "._cfg????_{}".format( os.path.basename(self.target_path))) self.cfg_list = glob.glob(self._cfg_pattern) self.check_user_changes() def check_conflicts(self): '''Проверка конфликтов типов.''' if self.parameters.append == 'link': if self.parameters.force: self.remove_original = True elif self.target_is_link: if self.template_type != self.target_type: raise TemplateTypeConflict( "the target is a link to {} while the template" "is {} and has append = 'link'". format('directory' if self.template_type == DIR else 'file', 'file' if self.template_type == DIR else 'directory')) else: self.remove_original = True elif self.target_type == DIR: raise TemplateTypeConflict("the target is a directory while " "the template has append = 'link'") else: raise TemplateTypeConflict("the target is a file while the" " template has append = 'link'") elif self.template_type == DIR: if self.target_type == FILE: if self.parameters.force: self.remove_original = True else: raise TemplateTypeConflict("the target is a file while the" " template is a directory") elif self.target_is_link: if self.parameters.force: self.remove_original = True else: try: link_source = check_directory_link( self.target_path, chroot_path=self.chroot_path) self.target_path = link_source self.target_path_is_changed = True except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.template_type == FILE: if self.parameters.force: if self.target_type == DIR: self.remove_original = True elif self.target_is_link and self.target_type == FILE: try: link_source = read_link(self.target_path) self.target_path = get_target_from_link( self.target_path, link_source, chroot_path=self.chroot_path) self.target_path_is_changed = True except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.target_is_link: if self.target_type == DIR: raise TemplateTypeConflict("the target file is a link to a" " directory while the template" " is a file") else: raise TemplateTypeConflict("the target file is a link to" " a file while the template" " is a file") elif self.target_type == DIR: raise TemplateTypeConflict("the target file is a directory" " while the template is a file") def check_package_collision(self): '''Проверка на предмет коллизии, то есть конфликта пакета шаблона и целевого файла.''' if self.parameters.package: parameter_package = self.parameters.package else: parameter_package = None if self.target_type is not None: try: file_package = self.package_atom_parser.get_file_package( self.target_path) except PackageNotFound: file_package = None self.target_without_package = True else: file_package = None # Если для шаблона и целевого файла никаким образом не удается # определить пакет и есть параметр append -- шаблон пропускаем. if parameter_package is None and file_package is None: if self.parameters.append: raise TemplateCollisionError( "'package' parameter is not defined for" " template with 'append' parameter.") elif parameter_package is None: self.target_package_name = file_package elif file_package is None: self.target_package_name = parameter_package elif file_package != parameter_package: raise TemplateCollisionError(( "The template package is {0} while target" " file package is {1}").format( parameter_package.atom, file_package.atom )) else: self.target_package_name = parameter_package self.target_package = Package(self.target_package_name, chroot_path=self.chroot_path) def check_user_changes(self): '''Метод для проверки наличия пользовательских изменений в конфигурационных файлах.''' # Эта проверка только для файлов. if self.template_type != FILE: return # Проверим, является ли файл защищенным. # Сначала проверяем по переменной CONFIG_PROTECT. for protected_path in self._protected_set: if self.target_path.startswith(protected_path): self.protected = True break # Затем по переменной CONFIG_PROTECT_MASK. for unprotected_path in self._unprotected_set: if self.target_path.startswith(unprotected_path): self.protected = False break # Собираем список имеющихся ._cfg файлов. cfg_pattern = os.path.join(os.path.dirname(self.target_path), "._cfg????_{}".format( os.path.basename(self.target_path))) self.cfg_list = glob.glob(cfg_pattern) # Путь к архивной версии файла. self.archive_path = self._get_archive_path(self.target_path) if not self.protected: self.md5_matching = True elif self.parameters.unbound: # Если присутствует unbound, то просто модифицируем файл и # удаляем его из CONTENTS. self.md5_matching = True elif self.target_type is None: # Если целевой файл отсутствует. if self.target_path in self.target_package: # Проверка -- был ли файл удален. self.md5_matching = False else: self.md5_matching = True elif self.target_without_package: # Если файл по целевому пути не относится к какому-либо пакету. self.md5_matching = False else: # Если файл есть и он относится к текущему пакету. target_md5 = self.target_package.get_md5(self.target_path) self.md5_matching = self.target_package.is_md5_equal( self.target_path, file_md5=target_md5) # Если по целевому пути файл не относящийся к какому-либо пакету и # присутствует параметр autoupdate -- удаляем этот файл. if self.target_without_package and self.parameters.autoupdate: self.remove_original = True self.md5_matching = self.md5_matching or self.parameters.autoupdate # Определяем пути входных и выходных файлов. if self.md5_matching: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source else: self.input_path = self.target_path self.output_path = self.target_path else: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source else: self.input_path = self.archive_path self.output_path = self._get_cfg_path(self.target_path) def _get_archive_path(self, file_path): '''Метод для получения пути к архивной версии указанного файла.''' if self.chroot_path != "/" and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return join_paths(self.config_archive_path, file_path) def _get_cfg_path(self, file_path): '''Метод для получения пути для создания нового ._cfg????_ файла.''' if self.cfg_list: last_cfg_name = os.path.basename(self.cfg_list[-1]) slice_value = len('._cfg') cfg_number = int(last_cfg_name[slice_value: slice_value + 4]) else: cfg_number = 0 cfg_number = str(cfg_number + 1) if len(cfg_number) < 4: cfg_number = '0' * (4 - len(cfg_number)) + cfg_number new_cfg_name = "._cfg{}_{}".format(cfg_number, os.path.basename(file_path)) new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name) return new_cfg_path def remove_from_contents(self): '''Метод для удаления целевого файла из CONTENTS.''' if self.template_type == DIR: self.target_package.remove_dir(self.target_path) elif self.template_type == FILE: self.target_package.remove_obj(self.target_path) def clear_dir_contents(self): '''Метод для удаления из CONTENTS всего содержимого директории после применения append = "clear".''' if self.template_type == DIR: self.target_package.clear_dir(self.target_path) def add_to_contents(self, file_md5=None): '''Метод для добавления целевого файла в CONTENTS.''' if self.parameters.append == 'link': self.target_package.add_sym(self.target_path, self.parameters.source) elif self.template_type == DIR: self.target_package.add_dir(self.target_path) elif self.template_type == FILE: self.target_package.add_obj(self.target_path, file_md5=file_md5) def update_contents_from_list(self, changed_list: dict): '''Метод для изменения CONTENTS по списку измененных файлов.''' for file_path, mode in changed_list.items(): if mode == "modify": if os.path.islink(file_path): self.target_package.add_sym(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) elif os.path.isfile(file_path): self.target_package.add_obj(file_path) elif mode == "remove": if os.path.islink(file_path) or os.path.isfile(file_path): self.target_package.remove_obj(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) @classmethod def _set_protected(cls, chroot_path): '''Метод для получения множества защищенных директорий.''' if cls._protected_is_set: return cls._protected_set = set() cls._protected_set.add(join_paths(chroot_path, '/etc')) config_protect_env = os.environ.get('CONFIG_PROTECT', False) if config_protect_env: for protected_path in config_protect_env.split(): protected_path = join_paths(chroot_path, protected_path.strip()) cls._protected_set.add(protected_path) config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False) if config_protect_mask_env: for unprotected_path in config_protect_mask_env.split(): unprotected_path = join_paths(chroot_path, unprotected_path.strip()) cls._unprotected_set.add(unprotected_path) cls._protected_is_set = True def save_changes(self): '''Метод для сохранения изменений внесенных в CONTENTS.''' if self.target_package: self.target_package.remove_empty_directories() self.target_package.write_contents_file() class TemplateExecutor: '''Класс исполнительного модуля.''' def __init__(self, datavars_module=Variables(), chroot_path='/', cl_config_archive='/var/lib/calculate/config-archive', cl_config_path='/var/lib/calculate/config', execute_archive_path='/var/lib/calculate/.execute/'): self.datavars_module = datavars_module self.chroot_path = chroot_path # Директория для хранения полученных при обработке exec скриптов. self.execute_archive_path = execute_archive_path self.directory_default_parameters =\ ParametersProcessor.directory_default_parameters self.file_default_parameters =\ ParametersProcessor.file_default_parameters # Отображение имен действий для директорий на методы их реализующие. self.directory_appends = {'join': self._append_join_directory, 'remove': self._append_remove_directory, 'skip': self._append_skip_directory, 'clear': self._append_clear_directory, 'link': self._append_link_directory, 'replace': self._append_replace_directory} # Отображение имен действий для файлов на методы их реализующие. self.file_appends = {'join': self._append_join_file, 'after': self._append_after_file, 'before': self._append_before_file, 'replace': self._append_replace_file, 'remove': self._append_remove_file, 'skip': self._append_skip_file, 'clear': self._append_clear_file, 'link': self._append_link_file} self.formats_classes = ParametersProcessor.available_formats self.calculate_config_file = CalculateConfigFile( cl_config_path=cl_config_path, cl_chroot_path=chroot_path) self.cl_config_archive_path = cl_config_archive @property def available_appends(self): '''Метод для получения множества возможных значений append.''' appends_set = set(self.directory_appends.keys()).union( set(self.file_appends.keys())) return appends_set def execute_template(self, target_path, parameters, template_type, template_text=''): '''Метод для запуска выполнения шаблонов.''' # Словарь с данными о результате работы исполнительного метода. self.executor_output = {'target_path': None, 'stdout': None, 'stderr': None, 'exec_file': None} try: template_object = TemplateWrapper( target_path, parameters, template_type, template_text=template_text, chroot_path=self.chroot_path, config_archive_path=self.cl_config_archive_path) except TemplateTypeConflict as error: raise TemplateExecutorError("type conflict: {}".format(str(error))) except TemplateCollisionError as error: raise TemplateExecutorError("collision: {}".format(str(error))) # Удаляем оригинал, если это необходимо из-за наличия force или по # другим причинам. if template_object.remove_original: if template_object.target_type == DIR: self._remove_directory(template_object.target_path) else: self._remove_file(template_object.target_path) template_object.remove_from_contents() # Если был включен mirror, то после удаления файла завершаем # выполнение шаблона. if template_object.parameters.mirror: template_object.save_changes() return template_object.target_type = None if template_object.parameters.run: # Если есть параметр run -- запускаем текст шаблона. self._run_template(template_object) elif template_object.parameters.exec: # Если есть параметр exec -- запускаем текст шаблона после # обработки всех шаблонов. self._exec_template(template_object) elif template_object.parameters.append: print('Using append: {}'.format(template_object.parameters.append)) print('input path: {}'.format(template_object.input_path)) print('output path: {}'.format(template_object.output_path)) if template_object.template_type == DIR: self.directory_appends[template_object.parameters.append]( template_object) elif template_object.template_type == FILE: self.file_appends[template_object.parameters.append]( template_object) # Сохраняем изменения в CONTENTS внесенные согласно шаблону. template_object.save_changes() # Возвращаем целевой путь, если он был изменен, или # None если не был изменен. if template_object.target_path_is_changed: self.executor_output['target_path'] =\ template_object.target_path if self.executor_output['stdout']: print(self.executor_output['stdout']) if self.executor_output['stderr']: print(self.executor_output['stderr']) return self.executor_output def save_changes(self): '''Метод для сохранения чего-нибудь после выполнения всех шаблонов.''' # Пока сохраняем только получившееся содержимое config-файла. self.calculate_config_file.save_changes() def _append_join_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "join", если шаблон -- директория. Создает директорию, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) template_object.add_to_contents() def _append_remove_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "remove", если шаблон -- директория. Удаляет директорию со всем содержимым, если она есть.''' if template_object.target_type is not None: self._remove_directory(template_object.target_path) template_object.remove_from_contents() def _append_skip_directory(self, template_object: TemplateWrapper): pass def _append_clear_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "clear", если шаблон -- директория. Удаляет все содержимое директории, если она есть.''' if template_object.target_type is not None: self._clear_directory(template_object.target_path) # Меняем права и владельца очищенной директории, если это # необходимо. if template_object.parameters.chmod: self._chmod_directory(template_object.target_path, template_object.parameters.chmod) if template_object.parameters.chown: self._chown_directory(template_object.target_path, template_object.parameters.chown) template_object.clear_dir_contents() def _append_link_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "link", если шаблон -- директория. Создает ссылку на директорию, если она есть.''' self._link_directory(template_object.parameters.source, template_object.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_directory(template_object.parameters.source, template_object.parameters.chmod) if template_object.parameters.chown: self._chmod_directory(template_object.parameters.source, template_object.parameters.chown) template_object.add_to_contents() def _append_replace_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "replace", если шаблон -- директория. Очищает директорию или создает, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) template_object.add_to_contents() else: self._clear_directory(template_object.target_path) template_object.clear_dir_contents() def _append_join_file(self, template_object: TemplateWrapper, join_before=False, replace=False): '''Метод описывающий действия при append = "join", если шаблон -- файл. Объединяет шаблон с целевым файлом.''' input_path = template_object.input_path output_path = template_object.output_path template_format = template_object.format_class # Задаемся значениями chmod и chown в зависимости от наличия или # отсутствия файла, принадлежности его пакету и наличию значений # параметров по умолчанию. try: chmod = template_object.parameters.chmod if not chmod: if (template_object.target_type is not None and not template_object.target_without_package): chmod = self._get_file_mode(template_object.target_path) else: chmod = self.file_default_parameters.get('chmod', False) chown = template_object.parameters.chown if not chown: if (template_object.target_type is not None and not template_object.target_without_package): chown = self._get_file_owner(template_object.target_path) else: chown = self.file_default_parameters.get('chown', False) except OSError: raise TemplateExecutorError('No access to the directory: {}'. format(template_object.target_path)) if template_object.format_class.EXECUTABLE or\ template_object.md5_matching: # Действия при совпадении md5 из CONTENTS и md5 целевого файла. # А также если шаблон просто исполнительный. output_paths = [output_path] # Если целевой файл защищен, а шаблон не userspace. if template_object.protected and not template_object.is_userspace: # Тогда также обновляем архив. output_paths.append(template_object.archive_path) if template_object.target_type is not None and not replace: # Если целевой файл есть и нет параметра replace -- используем # текст целевого файла. with open(input_path, 'r') as input_file: input_text = input_file.read() else: input_text = '' parsed_template = template_format(template_object.template_text, join_before=join_before) if not template_object.format_class.EXECUTABLE: parsed_input = template_format(input_text, join_before=join_before) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() for save_path in output_paths: if not os.path.exists(os.path.dirname(save_path)): self._create_directory( template_object, path_to_create=os.path.dirname(save_path)) with open(save_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца всех сохраняемых файлов, # если это необходимо. if chown: self._chown_file(save_path, chown, check_existation=False) if chmod: self._chmod_file(save_path, chmod, check_existation=False) # Убираем все ._cfg файлы. if template_object.cfg_list: for cfg_file_path in template_object.cfg_list: self._remove_file(cfg_file_path) # Убираем целевой файл из CL. self.calculate_config_file.remove_file( template_object.target_path) # Обновляем CONTENTS. if template_object.protected: if template_object.parameters.unbound: template_object.remove_from_contents() else: template_object.add_to_contents( file_md5=output_text_md5) else: changed_files = parsed_template.execute_format( input_text=input_text, target_path=template_object.target_path) # Удаляем форматный объект входного файла. del(parsed_template) # Если исполняемый формат выдал список измененных файлов для # изменения CONTENTS и при этом задан пакет -- обновляем # CONTENTS. if changed_files and template_object.target_package: template_object.update_contents_from_list(changed_files) else: if template_object.target_type is not None and not replace: with open(input_path, 'r') as input_file: input_text = input_file.read() else: input_text = '' parsed_input = template_format(input_text) parsed_template = template_format(template_object.template_text) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() if not self.calculate_config_file.compare_md5( template_object.target_path, output_text_md5): if not os.path.exists(os.path.dirname(output_path)): self._create_directory( template_object, path_to_create=os.path.dirname(output_path)) with open(output_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца ._cfg????_ файлов, если # это необходимо. if chown: self._chown_file(output_path, chown, check_existation=False) if chmod: self._chmod_file(output_file, chmod, check_existation=False) # Обновляем CL. self.calculate_config_file.set_files_md5( template_object.target_path, output_text_md5) else: # Действия если CL совпало. Пока ничего не делаем. pass # Обновляем CONTENTS. template_object.add_to_contents(file_md5=output_text_md5) def _append_after_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в конец файла и в конец каждой секции файла.''' self._append_join_file(template_object, join_before=False) def _append_before_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в начало файла и в начало каждой секции файла.''' self._append_join_file(template_object, join_before=True) def _append_skip_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "skip". Пока никаких действий.''' pass def _append_replace_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "replace", если шаблон -- файл. Очищает файл и затем накладывает на него шаблон.''' self._append_join_file(template_object, replace=True) def _append_remove_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "remove", если шаблон -- файл. Удаляет файл.''' if template_object.target_type is not None: self._remove_file(template_object.target_path) template_object.remove_from_contents() def _append_clear_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "clear", если шаблон -- файл. Очищает файл.''' if template_object.target_type is not None: self._clear_file(template_object.target_path) # Меняем владельца и права доступа к очищенному файлу, если нужно. if template_object.chown: self._chown_file(template_object.target_path, template_object.parameters.chown, check_existation=False) if template_object.chmod: self._chmod_file(template_object.target_path, template_object.parameters.chmod, check_existation=False) template_object.add_to_contents() def _append_link_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "link", если шаблон -- файл. Создает ссылку на файл, указанный в параметре source.''' self._link_file(template_object.parameters.source, template_object.parameters.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_file(template_object.parameters.source, template_object.parameters.chmod, check_existation=False) if template_object.parameters.chown: self._chown_file(template_object.parameters.source, template_object.parameters.chown, check_existation=False) template_object.add_to_contents() def _create_directory(self, template_object: TemplateWrapper, path_to_create=None): '''Метод для создания директории и, при необходимости, изменения владельца и доступа все директорий на пути к целевой.''' if path_to_create is None: target_path = template_object.target_path else: target_path = path_to_create template_parameters = template_object.parameters # Если файл есть, но указан chmod или chown -- используем их. if os.access(target_path, os.F_OK): if template_parameters.chmod: self._chmod_directory(target_path, template_parameters.chmod) if template_parameters.chown: self._chown_directory(target_path, template_parameters.chown) return directories_to_create = [target_path] directory_path = os.path.dirname(target_path) # Составляем список путей к директориям, которые нужно создать. while not os.access(directory_path, os.F_OK) and directory_path: directories_to_create.append(directory_path) directory_path = os.path.dirname(directory_path) # получаем информацию о владельце и правах доступа ближайшей # существующей директории. try: chmod = template_parameters.chmod if not chmod: chmod = self._get_file_mode(directory_path) else: chmod = self.directory_default_parameters.get('chmod', False) chown = template_parameters.chown if not chown: chown = self._get_file_owner(directory_path) else: chown = self.directory_default_parameters.get('chown', False) except OSError: raise TemplateExecutorError('No access to the directory: {}'. format(directory_path)) directories_to_create.reverse() # создаем директории. for create_path in directories_to_create: try: os.mkdir(create_path) # Для каждой созданной директории меняем права и владельца # если это необходимо. if chmod: self._chmod_directory(create_path, chmod) if chown: self._chown_directory(create_path, chown) except OSError as error: raise TemplateExecutorError( 'Failed to create directory: {}, reason: {}'. format(create_path, str(error))) def _remove_directory(self, target_path): '''Метод для удаления директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): try: if os.path.islink(target_path): os.unlink(target_path) else: shutil.rmtree(target_path) except Exception as error: raise TemplateExecutorError( ("Failed to delete the directory: {0}," " reason: {1}").format(target_path, str(error))) else: error_message = "target file is not directory" else: error_message = "target file does not exist" raise TemplateExecutorError(("Failed to delete the directory: {0}," "reason: {1}").format(target_path, error_message)) def _clear_directory(self, target_path): '''Метод для очистки содержимого целевой директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): # Удаляем все содержимое директории. for node in os.scandir(target_path): if node.is_dir(): self._remove_directory(node.path) else: self._remove_file(node.path) return else: error_message = "target file is not directory" else: error_message = "target directory does not exist" raise TemplateExecutorError(("failed to delete directory: {}," " reason: {}").format(target_path, error_message)) def _link_directory(self, source, target_path): '''Метод для создания по целевому пути ссылки на директорию расположенную на пути, указанному в source.''' try: os.symlink(source, target_path, target_is_directory=True) except OSError: raise TemplateExecutorError("Failed to create symlink: {0} -> {1}". format(target_path, self.source)) def _remove_file(self, target_path): '''Метод для удаления файлов.''' if os.path.islink(target_path): try: os.unlink(target_path) except OSError: raise TemplateExecutorError('failed to delete the link: {}'. format(target_path)) if os.path.isfile(target_path): try: os.remove(target_path) except OSError: raise TemplateExecutorError('failed to delete the file: {}'. format(target_path)) def _clear_file(self, target_path): '''Метод для очистки файлов.''' try: with open(target_path, 'w') as f: f.truncate(0) except IOError: raise TemplateExecutorError("failed to clear the file: {}". format(target_path)) def _link_file(self, source, target_path): '''Метод для создания по целевому пути ссылки на файл расположенный на пути, указанному в source.''' try: os.symlink(source, target_path) except OSError: raise TemplateExecutorError( "Failed to create symlink to the file: {0} -> {1}". format(target_path, self.source)) def _run_template(self, template_object: TemplateWrapper): '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в run прямо во время обработки шаблонов.''' text_to_run = template_object.template_text interpreter = template_object.parameters.run try: run_process = Process(interpreter, cwd=self.chroot_path) run_process.write(text_to_run) if run_process.readable: stdout = run_process.read() if stdout: self.executor_output['stdout'] = stdout if run_process.readable_errors: stderr = run_process.read_error() if stderr: self.executor_output['stderr'] = stderr except FilesError as error: raise TemplateExecutorError(("can not run template using the" " interpreter '{}', reason: {}"). format(interpreter, str(error))) def _exec_template(self, template_object: TemplateWrapper): '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в exec после выполнения всех прочих шаблонов. ''' text_to_run = template_object.template_text interpreter = template_object.parameters.exec # Получаем путь к директории для хранения файлов .execute. if (self.chroot_path != '/' and not self.execute_archive_path.startswith(self.chroot_path)): self.execute_archive_path = join_paths( self.chroot_path, self.execute_archive_path) # Если директория уже существует получаем номер очередного файла для # exec по номеру последнего. exec_number = 0 if os.path.exists(self.execute_archive_path): exec_files_list = os.listdir(self.execute_archive_path) if exec_files_list: exec_number = int(exec_files_list[-1][-4:]) exec_number = str(exec_number + 1) # Получаем название нового exec_???? файла. if len(exec_number) < 4: exec_number = '0' * (4 - len(exec_number)) + exec_number exec_file_name = 'exec_{}'.format(exec_number) exec_file_path = join_paths(self.execute_archive_path, exec_file_name) exec_file = write_file(exec_file_path) exec_file.write(text_to_run) exec_file.close() # Добавляем новый файл в словарь файлов для дальнейшего исполнения. if self.executor_output is None: self.executor_output = dict() self.executor_output['exec_file'] = {interpreter: exec_file_name} def _chown_directory(self, target_path, chown_value): """Метод для смены владельца директории.""" try: os.chown(target_path, chown_value['uid'], chown_value['gid']) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( target_path, chown_value['uid'], chown_value['gid']), str(error))) def _chmod_directory(self, target_path, chmod_value): '''Метод для смены прав доступа к директории.''' try: os.chmod(target_path, chmod_value) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): self.output.set_error( 'Can not chmod directory: {0}, reason: {1}'. format(target_path, str(error))) def _chown_file(self, target_path, chown_value, check_existation=True): '''Метод для смены владельца файла.''' try: if check_existation and not os.path.exists(target_path): open(target_path, 'w').close() os.lchown(target_path, chown_value['uid'], chown_value['gid']) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( target_path, chown_value['uid'], chown_value['gid']), str(error))) def _chmod_file(self, target_path, chmod_value, check_existation=True): '''Метод для смены прав доступа к директории.''' try: if check_existation and not os.path.exists(target_path): open(target_path, 'w').close() os.chmod(target_path, chmod_value) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chmod file: {0}, reason: {1}'. format(target_path, str(error))) def _get_file_mode(self, file_path): '''Метод для получения прав доступа для указанного файла.''' file_stat = os.stat(file_path) return stat.S_IMODE(file_stat.st_mode) def _get_file_owner(self, file_path): '''Метод для получения uid и gid значений для владельца указанного файла.''' file_stat = os.stat(file_path) return {'uid': file_stat.st_uid, 'gid': file_stat.st_gid} def _translate_uid_gid(self, target_path, uid, gid): '''Метод для получения из uid и gid имен пользователя и группы при, необходимых для выдачи сообщения об ошибке при попытке chown.''' import pwd import grp try: if self.chroot_path == '/': user_name = pwd.getpwuid(uid).pw_name else: user_name = str(uid) except (TypeError, KeyError): user_name = str(uid) try: if self.chroot_path == '/': group_name = grp.getgrgid(gid).gr_name else: group_name = str(gid) except (TypeError, KeyError): group_name = str(gid) return '{0}:{1}'.format(user_name, group_name) def _check_os_error(self, error, path_to_check): '''Метод для проверки причины, по которой не удалось изменить владельца или права доступа файла.''' if hasattr(error, 'errno') and error.errno == os.errno.EPERM: if self.is_vfat(path_to_check): return True return hasattr(error, 'errno') and error.errno == os.errno.EACCES and\ 'var/calculate/remote' in path_to_check def _is_vfat(self, path_to_check): '''Метод, проверяющий является ли файловая система vfat. Нужно для того, чтобы знать о возможности применения chown, chmod и т.д.''' # Инициализируем объект для проверки примонтированных файловых систем. if self.mounts is None: self.mounts = Mounts() # Проверяем файловую систему на пути. return self.mounts.get_from_fstab(what=self.mounts.TYPE, where=self.mounts.DIR, is_in=path_to_check) in {'vfat', 'ntfs-3g', 'ntfs'} class DirectoryTree: '''Класс реализующий дерево каталогов для пакета.''' def __init__(self, base_directory): self.base_directory = base_directory self._tree = {} def update_tree(self, tree): self._update(self._tree, tree) def _update(self, original_tree, tree): for parent, child in tree.items(): if isinstance(child, abc.Mapping): original_tree[parent] = self._update(original_tree.get(parent, dict()), child) else: original_tree[parent] = child return original_tree def show_tree(self): pprint(self._tree) def get_directory_tree(self, directory): directory_tree = DirectoryTree(os.path.join(self.base_directory, directory)) directory_tree._tree = self._tree[directory] return directory_tree def __getitem__(self, name): if name in self._tree: return self._tree[name] else: return None def __setitem__(self, name, value): self._tree[name] = value def __iter__(self): if self._tree is not None: return iter(self._tree.keys()) else: return iter([]) def __repr__(self): return ''.format(self._tree) def __bool__(self): return bool(self._tree) class DirectoryProcessor: chmod_regex = re.compile(r'\d{3}') def __init__(self, action, datavars_module=Variables(), package='', output_module=IOModule(), without_execution=False, debug_mode=False): self.action = action self.debug_mode = debug_mode self.without_execution = without_execution self.output = output_module self.datavars_module = datavars_module # Корневая директория. if 'cl_chroot_path' in datavars_module.main: self.cl_chroot_path = datavars_module.main.cl_chroot_path else: self.cl_chroot_path = '/' self.cl_ignore_files = self.get_cl_ignore_files() # Путь к файлу config с хэш-суммами файлов, для которых уже # предлагались изменения. if 'cl_config_path' in datavars_module.main: self.cl_config_path = self._add_chroot_path( self.datavars_module.main.cl_config_path) else: self.cl_config_path = self._add_chroot_path( '/var/lib/calculate/config') # Путь к директории config-archive для хранения оригинальной ветки # конфигурационных файлов. if 'cl_config_archive' in datavars_module.main: self.cl_config_archive = self._add_chroot_path( self.datavars_module.main.cl_config_archive) else: self.cl_config_archive = self._add_chroot_path( '/var/lib/calculate/config-archive') if 'cl_exec_dir_path' in datavars_module.main: self.cl_exec_dir_path = self._add_chroot_path( self.datavars_module.main.cl_exec_dir_path) else: self.cl_exec_dir_path = self._add_chroot_path( '/var/lib/calculate/.execute/') # Инициализируем исполнительный модуль. self.template_executor = TemplateExecutor( datavars_module=self.datavars_module, chroot_path=self.cl_chroot_path, cl_config_archive=self.cl_config_archive, cl_config_path=self.cl_config_path, exec_dir_path=self.cl_exec_dir_path ) # Инициализируем шаблонизатор. self.template_engine = TemplateEngine( datavars_module=datavars_module, chroot_path=self.cl_chroot_path, appends_set=self.template_executor.available_appends) if package: try: self.for_package = self.template_engine.parameters_processor.\ check_package_parameter(package) except ConditionFailed as error: self.output.set_error(str(error)) return else: self.for_package = False self.template_paths = (self.datavars_module. main.cl_template_path.split(',')) self.inheritable_parameters = set( ParametersProcessor.inheritable_parameters) self.processed_packages = [] self.packages_to_merge = [] self.packages_file_trees = {} # Список файлов сохраненных в .execute для выполнения после всех # шаблонов. self.exec_files = dict() def get_cl_ignore_files(self): if 'cl_ignore_files' in self.datavars_module.main: cl_ignore_files = self.datavars_module.main.cl_ignore_files cl_ignore_files_list = [] for pattern in cl_ignore_files.split(','): cl_ignore_files_list.append(pattern.strip()) return cl_ignore_files_list else: return [] def _add_chroot_path(self, path_to_add: str): if (self.cl_chroot_path != '/' and not path_to_add.startswith(self.cl_chroot_path)): return join_paths(self.cl_chroot_path, path_to_add) else: return path_to_add def process_template_directories(self): # Проходим каталоги из main.cl_template.path # Режим заполнения очередей директорий пакетов, необходимых для более # быстрой обработки параметра merge. if self.for_package: self.fill_trees = True else: self.fill_trees = False for directory_path in self.template_paths: self.base_directory = directory_path.strip() entries = os.scandir(self.base_directory) for node in entries: self.directory_tree = {} self.walk_directory_tree(node.path, self.cl_chroot_path, ParametersContainer(), directory_tree=self.directory_tree) # Теперь когда дерево заполнено, можно выключить этот режим. self.fill_trees = False if self.for_package: self.output.set_info('Processing packages from merge parameter...') self.processed_packages.append(self.for_package) self.merge_packages() def merge_packages(self): not_merged_packages = [] while self.packages_to_merge: self.for_package = self.packages_to_merge.pop() if self.for_package not in self.packages_file_trees: self.output.set_error( "Error: package '{0}' not found for action '{1}'.". format(self.for_package, self.action) ) not_merged_packages.append(self.for_package) continue for directory_name in self.packages_file_trees[self.for_package]: directory_tree = self.packages_file_trees[self.for_package].\ get_directory_tree(directory_name) self.walk_directory_tree(directory_tree.base_directory, self.cl_chroot_path, ParametersContainer(), directory_tree=directory_tree) self.processed_packages.append(self.for_package) if not_merged_packages: self.output.set_error('Packages {} is not merged.'. format(','.join(self.packages_to_merge))) else: self.output.set_success('All packages are merged...') def get_directories_queue(self, path): '''Уже не актуальный метод для построение очередей из путей к шаблонам.''' directories_queue = [] for base_dir in self.template_paths: base_dir = base_dir.strip() if path.startswith(base_dir): base_directory = base_dir break while path != base_directory: path, dir_name = os.path.split(path) directories_queue.append(dir_name) return base_directory, directories_queue def check_file_name(self, filename: str): '''Метод для проверки соответствия имени файла содержимому переменной main.cl_ignore_files.''' for pattern in self.cl_ignore_files: if not fnmatch.fnmatch(filename, pattern): return False return True def walk_directory_tree(self, current_directory_path, target_path, directory_parameters, directory_tree={}): template_files = [] template_directories = [] directory_name = os.path.basename(current_directory_path) if self.fill_trees: directory_tree[directory_name] = {} current_target_path = target_path entries = os.scandir(current_directory_path) self.template_engine.change_directory(current_directory_path) for node in entries: if self.check_file_name(node.name): continue if node.is_symlink(): self.output.set_warning('symlink: {} is ignored'. format(node.path)) continue elif node.is_dir(): template_directories.append(node.path) elif node.is_file(): template_files.append(node.name) # обрабатываем в первую очередь шаблон директории. if '.calculate_directory' in template_files: template_files.remove('.calculate_directory') try: self.template_engine.process_template( '.calculate_directory', template_type=DIR, parameters=directory_parameters) except ConditionFailed as error: self.output.set_warning('{}.\nTemplate: {}'. format(str(error), current_directory_path)) directory_tree = {} return except Exception as error: self.output.set_error('Template error: {0}\nTemplate: {1}'. format(str(error), current_directory_path)) directory_tree = {} return # directory_parameters.print_parameters_for_debug() # Если есть параметр name -- меняем имя текущего каталога. if directory_parameters.name: directory_name = directory_parameters.name # Если есть параметр path -- меняем текущий путь к целевому # каталогу. if directory_parameters.path: current_target_path = join_paths(self.cl_chroot_path, directory_parameters.path) if self.fill_trees and self.check_package_and_action( directory_parameters, current_directory_path, directory_tree=directory_tree): current_target_path = os.path.join(current_target_path, directory_name) elif self.fill_trees and self.check_package_and_action( directory_parameters, current_directory_path): current_target_path = os.path.join(current_target_path, directory_name) else: # Обновляем дерево директорий для данного пакета. if (self.fill_trees and directory_tree[directory_name] is None): package_name = directory_parameters.package if package_name in self.packages_file_trees: self.packages_file_trees[package_name].update_tree( copy.deepcopy(self.directory_tree) ) else: directory_tree = DirectoryTree(self.base_directory) directory_tree.update_tree( copy.deepcopy(self.directory_tree)) self.packages_file_trees[package_name] = directory_tree directory_tree = {} return if self.for_package and directory_parameters.merge: self.packages_to_merge.extend(directory_parameters.merge) else: # Если .calculate_directory отсутствует -- создаем директорию # используя унаследованные параметры и имя самой директории. if not self.check_package_and_action( directory_parameters, current_directory_path, directory_tree=directory_tree): # Обновляем дерево директорий для данного пакета. if (directory_tree[directory_name] is None and self.fill_trees): package_name = directory_parameters.package if package_name in self.packages_file_trees: self.packages_file_trees[package_name].update_tree( copy.deepcopy(self.directory_tree) ) else: directory_tree = DirectoryTree(self.base_directory) directory_tree.update_tree( copy.deepcopy(self.directory_tree)) self.packages_file_trees[package_name] = directory_tree directory_tree = {} return directory_parameters.set_parameter({'append': 'join'}) current_target_path = os.path.join(current_target_path, directory_name) # Выполняем действие с директорией. if not self.without_execution: self.output.set_success('Processing directory: {}'. format(current_directory_path)) try: output = self.template_executor.execute_template( current_target_path, directory_parameters, DIR) # Если во время выполнения шаблона был изменен целевой путь, # например, из-за ссылки на директорию в source -- обновляем # целевой путь. if output['target_type'] is not None: current_target_path = output['target_path'] # Если есть вывод от параметра run -- выводим как info. if output['stdout'] is not None: self.output.set_info("stdout from template: {}\n{}".format( current_directory_path, output['stdout'] )) # Если есть ошибки от параметра run -- выводим их как error if output['stderr'] is not None: self.output.set_error("stderr from template: {}\n{}". format(current_directory_path, output['stderr'])) # Если run выполнен с ошибками -- пропускаем директорию. return if output['exec_file'] is not None: self.exec_files.update(output['exec_file']) except TemplateTypeConflict as error: self.output.set_error('Type conflict: {}\nTemplate: {}'. format(str(error), current_directory_path)) return else: self.output.set_success('Processing directory: {}'. format(current_directory_path)) # Если режим заполнения очередей выключен и дерево, которое обходим в # данный момент еще не пусто -- используем имеющееся дерево для # получения списков обрабатываемых файлов и директорий. if not self.fill_trees and directory_tree: tree_files = [] tree_directories = [] for template in directory_tree: if template in template_files: tree_files.append(template) else: next_directory_tree =\ directory_tree.get_directory_tree(template) tree_directories.append(next_directory_tree) template_files = tree_files template_directories = tree_directories # обрабатываем файлы шаблонов хранящихся в директории. for template_name in template_files: # Удаляем все параметры, которые не наследуются и используем # полученный контейнер для сбора параметров файлов шаблонов. directory_parameters.remove_not_inheritable() template_path = os.path.join(current_directory_path, template_name) # Применяем к файлу шаблона шаблонизатор. try: self.template_engine.process_template( template_name, FILE, parameters=directory_parameters) except ConditionFailed as error: self.output.set_warning('{0}.\nTemplate: {1}'. format(str(error), current_directory_path)) continue except Exception as error: self.output.set_error('Template error: {0} \nTemplate: {1}'. format(str(error), current_directory_path)) continue # directory_parameters.print_parameters_for_debug() template_text = self.template_engine.template_text if self.fill_trees and not self.check_package_and_action( directory_parameters, template_path, directory_tree=directory_tree[directory_name]): # Если находимся на стадии заполнения дерева директорий -- # проверяем параметры package и action с заполнением дерева. continue elif not self.fill_trees and not self.check_package_and_action( directory_parameters, template_path): # В противном случае проверяем без заполнения. continue # Если есть параметр merge добавляем в список if self.for_package and directory_parameters.merge: self.packages_to_merge.extend(directory_parameters.merge) # Если для шаблона задано имя -- меняем его if directory_parameters.name: template_name = directory_parameters.name # Если для шаблона задан путь -- меняем его if directory_parameters.path: target_file_path = join_paths(self.cl_chroot_path, directory_parameters.path, template_name) else: target_file_path = join_paths(current_target_path, template_name) # Выполняем действия, указанные в шаблоне. if not self.without_execution: output = self.template_executor.execute_template( target_file_path, directory_parameters, FILE, template_text=template_text) # Если во время выполнения шаблона был изменен целевой путь, # например, из-за ссылки на директорию в source -- обновляем # целевой путь. if output['target_type'] is not None: current_target_path = output['target_path'] # Если есть вывод от параметра run -- выводим как info. if output['stdout'] is not None: self.output.set_info("stdout from template: {}\n{}".format( current_directory_path, output['stdout'] )) # Если есть ошибки от параметра run -- выводим их как error if output['stderr'] is not None: self.output.set_error("stderr from template: {}\n{}". format(current_directory_path, output['stderr'])) # Если run выполнен с ошибками -- пропускаем директорию. return if output['exec_file'] is not None: # Обновляем список путей к файлам для выполнения после # обработки всех шаблонов. self.exec_files.update(output['exec_file']) self.output.set_success('Processed template: {}...'. format(template_path)) else: self.output.set_success('Processed template: {}...'. format(template_path)) # Обновляем дерево директорий для данного пакета, если происходит # его заполнение. if (self.fill_trees and directory_tree[directory_name]): package_name = directory_parameters.package if package_name in self.packages_file_trees: # Если для данного пакета дерево уже есть -- обновляем его. self.packages_file_trees[package_name].update_tree( copy.deepcopy(self.directory_tree)) else: # Если нет создаем новое. directory_tree = DirectoryTree(self.base_directory) directory_tree.update_tree(copy.deepcopy(self.directory_tree)) self.packages_file_trees[package_name] = directory_tree directory_tree[directory_name] = {} # проходимся далее по директориям. for directory in template_directories: if self.fill_trees: self.walk_directory_tree( directory, current_target_path, directory_parameters.get_inheritables(), directory_tree=directory_tree[directory_name]) directory_tree[directory_name] = {} else: if isinstance(directory, DirectoryTree): # Если директории берем из дерева -- путь к директории # соответствует корню текущего дерева. directory_path = directory.base_directory else: directory_path = directory self.walk_directory_tree( directory_path, current_target_path, directory_parameters.get_inheritables()) if self.fill_trees: directory_tree = {} return def check_package_and_action(self, parameters, template_path, directory_tree=None): if parameters.append != 'skip': if not parameters.action: self.output.set_warning( ("Action parameter value '{0}' not found." "\nTemplate: {1}").format(parameters.action, template_path)) return False elif parameters.action != self.action: self.output.set_warning( ("Action parameter value '{0}' does not match its" " current value '{1}'.\nTemplate: {2}").format( parameters.action, template_path)) return False if self.for_package: if not parameters.package: self.output.set_warning( "'package' parameter is not defined.\nTemplate: {}". format(template_path)) # считаем наличие параметра package необязательным. # return False elif parameters.package != self.for_package: if directory_tree is not None: template_name = os.path.basename(template_path) directory_tree[template_name] = None self.output.set_warning( ("'package' parameter value '{0}' does not " "match its current target package.\nTemplate: {1}"). format(parameters.package, template_path) ) return False return True