from calculate.templates.template_engine import TemplateEngine, Variables,\ FILE, DIR, ParametersProcessor from calculate.utils.package import PackageAtomParser, Package, PackageNotFound from calculate.utils.files import write_file, read_link, read_file_lines,\ FilesError, join_paths,\ check_directory_link, Process from calculate.utils.mount import Mounts from collections import OrderedDict import hashlib import stat import glob import shutil import os template_text = '''{% calculate append = 'before', source = '/etc/file' -%} {% calculate package = 'test-category/test-package', format = 'samba', unbound -%} [section one] parameter_1 = {{ vars_1.value_1 }} !parameter_2 ''' template_to_run = '''{% calculate run = "python" -%} with open('etc/dir/file.conf', 'r') as input_file: print(input_file.read()) ''' backup_template_text = '''{% calculate append = 'join', format = 'samba', autoupdate, package = 'test-category/test-package' -%} [section one] parameter_1 = value parameter_2 = value_2 [section two] other_parameter = other_value [!section_name] ''' other_template_text = '''{% calculate append = 'join', format = 'bind', autoupdate, package = 'test-category/other-package' -%} options { parameter_1 yes; response-policy { mood "almost.blue"; }; } ''' another_template_text = '''{% calculate append = 'join', format = 'bind', autoupdate, package = 'test-category/other-package' -%} options { parameter no; parameter_1 yes; }; awful_section { parameter_2 12; inner_section { parameter_4 "grief"; } } ''' one_more_template_text = '''{% calculate append = 'join', format = 'json', autoupdate, package = 'test-category/test-package' -%} { "languageserver": { "clangd": { "command": "clangd", "rootPatterns": ["compile_flags.txt", "compile_commands.json"], "filetypes": ["c", "cpp", "objc", "objcpp"] } } } ''' class TemplateExecutorError(Exception): pass class TemplateTypeConflict(Exception): pass class TemplateCollisionError(Exception): pass class CalculateConfigFile: '''Класс для работы с файлом /var/lib/calculate/config.''' def __init__(self, cl_config_path='/var/lib/calculate/config', cl_chroot_path='/'): self.chroot_path = cl_chroot_path self.cl_config_path = cl_config_path self._config_dictionary = self._get_cl_config_dictionary() self._unsaved_changes = False def __contains__(self, file_path): file_path = self._remove_chroot(file_path) return file_path in self._config_dictionary def _get_cl_config_dictionary(self): '''Метод для загрузки словаря файла /var/lib/calculate/config.''' config_dictionary = OrderedDict() if os.path.exists(self.cl_config_path): if os.path.isdir(self.cl_config_path): raise TemplateExecutorError( "directory instead calculate config file in: {}". format(self.cl_config_path)) else: write_file(self.cl_config_path).close() return config_dictionary try: config_file_lines = read_file_lines(self.cl_config_path) except FilesError as error: raise TemplateExecutorError( "cannot read calculate config file in: {0}. Reason: {1}". format(self.cl_config_path, str(error))) # Продумать проверку корректности найденного файла. for file_line in config_file_lines: filename, md5_sum = file_line.split(' ') config_dictionary.update({filename: md5_sum}) self._unsaved_changes = False return config_dictionary def set_files_md5(self, file_path, file_md5): '''Метод для установки в config соответствия файла некоторой контрольной сумме.''' file_path = self._remove_chroot(file_path) self._config_dictionary[file_path] = file_md5 self._unsaved_changes = True def remove_file(self, file_path): '''Метод для удаления файла из config.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: self._config_dictionary.pop(file_path) self._unsaved_changes = True def compare_md5(self, file_path, file_md5): '''Метод для сравнения хэш-суммы из config и некоторой заданной.''' file_path = self._remove_chroot(file_path) if file_path in self._config_dictionary: return self._config_dictionary[file_path] == file_md5 else: return False def save_changes(self): '''Метод для записи изменений, внессенных в файл config.''' if not self._unsaved_changes: return config_file = write_file(self.cl_config_path) for file_name, file_md5 in self._config_dictionary.items(): config_file.write('{} {}\n'.format(file_name, file_md5)) config_file.close() self._unsaved_changes = False def _remove_chroot(self, file_path): '''Метод для удаления корневого пути из указанного пути.''' if self.chroot_path != '/' and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return file_path class TemplateWrapper: '''Класс связывающий шаблон с целевым файлом и определяющий параметры наложения шаблона, обусловленные состоянием целевого файла.''' type_checks = {DIR: os.path.isdir, FILE: os.path.isfile} chroot_path = '/' config_archive_path = '/var/lib/calculate/config-archive' _protected_is_set = False _protected_set = set() _unprotected_set = set() def __new__(cls, *args, **kwargs): if not cls._protected_is_set: # Устанавливаем значения PROTECTED, если не заданы. cls._set_protected() return super().__new__(cls) def __init__(self, target_file_path, parameters, template_type, template_text=''): self.target_path = target_file_path self.target_package_name = None self.package_atom_parser = PackageAtomParser( chroot_path=self.chroot_path) # Вспомогательный флаг, включается, если по целевому пути лежит файл, # для которого не определился никакой пакет. self.target_without_package = False self.parameters = parameters self.output_path = self.target_path self.input_path = self.parameters.source self.template_type = template_type self.template_text = template_text # Флаг, указывающий, что нужно удалить файл из target_path перед # применением шаблона. self.remove_original = False # Флаг, указывающий, что целевой путь был изменен. self.target_path_is_changed = False # Флаг, указывающий, что файл является PROTECTED. self.protected = False # Временный флаг для определения того, является ли шаблон userspace. self.is_userspace = False if self.parameters.run or self.parameters.exec: # Если есть параметр run или exec, то кроме текста шаблона ничего # не нужно. return if self.parameters.append in {'join', 'before', 'after'}: # Получаем класс соответствующего формата файла. if self.parameters.format: self.format_class = ParametersProcessor.\ available_formats[self.parameters.format] else: # Здесь будет детектор форматов. pass # Если по этому пути что-то есть -- проверяем конфликты. if os.path.exists(target_file_path): for file_type, checker in self.type_checks.items(): if checker(target_file_path): self.target_type = file_type break self.target_is_link = os.path.islink(target_file_path) # Если установлен параметр mirror и есть параметр source, # содержащий несуществующий путь -- удаляем целевой файл. if self.parameters.source is True and self.parameters.mirror: self.remove_original = True else: if self.parameters.mirror: raise TemplateExecutorError("target file does not exist, while" " 'mirror' parameter is set") self.target_type = None if self.format_class.EXECUTABLE: # Если формат исполняемый, но проверяем, существует ли директория, # из которой будет выполняться шаблон. if not os.path.exists(self.target_path): if self.template_type == DIR: os.makedirs(self.target_path) else: self.target_path = os.path.dirname(self.target_path) os.makedirs(self.target_path) return self.check_conflicts() self.check_package_collision() # Если целью является файл -- проверяем наличие ._cfg0000_filename # файлов. if self.target_type is FILE: self._cfg_pattern = os.path.join( os.path.dirname(self.target_path), "._cfg????_{}".format( os.path.basename(self.target_path))) self.cfg_list = glob.glob(self._cfg_pattern) self.check_user_changes() def check_conflicts(self): '''Проверка конфликтов типов.''' if self.parameters.append == 'link': if self.parameters.force: self.remove_original = True elif self.target_type == DIR: raise TemplateTypeConflict("the target is a directory while " "the template has append = 'link'") else: raise TemplateTypeConflict("the target is a file while the" " template has append = 'link'") elif self.template_type == DIR: if self.target_type == FILE: if self.parameters.force: self.remove_original = True else: raise TemplateTypeConflict("the target is a file while the" " template is a directory") elif self.target_is_link: if self.parameters.force: self.remove_original = True else: try: self.target_path = check_directory_link( self.target_path) except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.template_type == FILE: if self.parameters.force: if self.target_type == DIR: self.remove_original = True elif self.target_is_link and self.target_type == FILE: try: self.target_path = read_link(self.target_path) except FilesError as error: raise TemplateExecutorError("files error: {}". format(str(error))) elif self.target_type == DIR: raise TemplateTypeConflict("the target file is a directory" " while the template is a file") def check_package_collision(self): '''Проверка на предмет коллизии, то есть конфликта пакета шаблона и целевого файла.''' if self.parameters.package: parameter_package = self.parameters.package else: parameter_package = None if self.target_type is not None: try: file_package = self.package_atom_parser.get_file_package( self.target_path) except PackageNotFound: file_package = None self.target_without_package = True else: file_package = None # Если для шаблона и целевого файла никаким образом не удается # определить пакет и есть параметр append -- шаблон пропускаем. if parameter_package is None and file_package is None: if self.parameters.append: raise TemplateCollisionError( "'package' parameter is not defined for" " template with 'append' parameter.") elif parameter_package is None: self.target_package_name = file_package elif file_package is None: self.target_package_name = parameter_package elif file_package != parameter_package: raise TemplateCollisionError( "The template package is {0} while target" " file package is {1}").format( self.target_package_name.atom, file_package.atom ) else: self.target_package_name = parameter_package self.target_package = Package(self.target_package_name, chroot_path=self.chroot_path) def check_user_changes(self): '''Метод для проверки наличия пользовательских изменений в конфигурационных файлах.''' # Эта проверка только для файлов. if self.template_type != FILE: return # Проверим, является ли файл защищенным. # Сначала проверяем по переменной CONFIG_PROTECT. for protected_path in self._protected_set: if self.target_path.startswith(protected_path): self.protected = True break # Затем по переменной CONFIG_PROTECT_MASK. for unprotected_path in self._unprotected_set: if self.target_path.startswith(unprotected_path): self.protected = False break # Собираем список имеющихся ._cfg файлов. cfg_pattern = os.path.join(os.path.dirname(self.target_path), "._cfg????_{}".format( os.path.basename(self.target_path))) self.cfg_list = glob.glob(cfg_pattern) # Путь к архивной версии файла. self.archive_path = self._get_archive_path(self.target_path) if not self.protected: self.md5_matching = True elif self.parameters.unbound: # Если присутствует unbound, то просто модифицируем файл и # удаляем его из CONTENTS. self.md5_matching = True elif self.target_type is None: # Если целевой файл отсутствует. if self.target_path in self.target_package: # Проверка -- был ли файл удален. self.md5_matching = False else: self.md5_matching = True elif self.target_without_package: # Если файл по целевому пути не относится к какому-либо пакету. if self.parameters.unbound: self.md5_matching = True else: self.md5_matching = False else: # Если файл есть и он относится к текущему пакету. target_md5 = self.target_package.get_md5(self.target_path) self.md5_matching = self.target_package.is_md5_equal( self.target_path, file_md5=target_md5) # Если по целевому пути файл не относящийся к какому-либо пакету и # присутствует параметр autoupdate -- удаляем этот файл. if self.target_without_package and self.parameters.autoupdate: self.remove_original = True self.md5_matching = self.md5_matching or self.parameters.autoupdate # Определяем путей входных и выходных файлов. if self.md5_matching: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source else: self.input_path = self.target_path self.output_path = self.target_path else: # Приоритет отдаем пути из параметра source. if self.parameters.source: self.input_path = self.parameters.source else: self.input_path = self.archive_path self.output_path = self._get_cfg_path(self.target_path) def _get_archive_path(self, file_path): '''Метод для получения пути к архивной версии указанного файла.''' if self.chroot_path != "/" and file_path.startswith(self.chroot_path): file_path = file_path[len(self.chroot_path):] return join_paths(self.config_archive_path, file_path) def _get_cfg_path(self, file_path): '''Метод для получения пути для создания нового ._cfg????_ файла.''' if self.cfg_list: last_cfg_name = os.path.basename(self.cfg_list[-1]) slice_value = len('._cfg') cfg_number = int(last_cfg_name[slice_value: slice_value + 4]) else: cfg_number = 0 cfg_number = str(cfg_number + 1) if len(cfg_number) < 4: cfg_number = '0' * (4 - len(cfg_number)) + cfg_number new_cfg_name = "._cfg{}_{}".format(cfg_number, os.path.basename(file_path)) new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name) return new_cfg_path def remove_from_contents(self): '''Метод для удаления целевого файла из CONTENTS.''' if self.template_type == DIR: self.target_package.remove_dir(self.target_path) elif self.template_type == FILE: self.target_package.remove_obj(self.target_path) def clear_dir_contents(self): '''Метод для удаления из CONTENTS всего содержимого директории после применения append = "clear".''' if self.template_type == DIR: self.target_package.clear_dir(self.target_path) def add_to_contents(self, file_md5=None, file_path=None): '''Метод для добавления целевого файла в CONTENTS.''' if self.parameters.append == 'link': self.target_package.add_sym(self.target_path, self.parameters.source) elif self.template_type == DIR: self.target_package.add_dir(self.target_path) elif self.template_type == FILE: self.target_package.add_obj(self.target_path, file_md5) def update_contents_from_list(self, changed_list: dict): for file_path, mode in changed_list.items(): if mode == "modify": if os.path.islink(file_path): self.target_package.add_sym(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) elif os.path.isfile(file_path): self.target_package.add_obj(file_path) elif mode == "remove": if os.path.islink(file_path) or os.path.isfile(file_path): self.target_package.remove_obj(file_path) elif os.path.isdir(file_path): self.target_package.add_dir(file_path) @classmethod def _set_protected(cls): '''Метод для получения множества защищенных директорий.''' if cls._protected_is_set: return cls._protected_set = set() cls._protected_set.add(join_paths(cls.chroot_path, '/etc')) config_protect_env = os.environ.get('CONFIG_PROTECT', False) if config_protect_env: for protected_path in config_protect_env.split(): protected_path = join_paths(cls.chroot_path, protected_path.strip()) cls._protected_set.add(protected_path) config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False) if config_protect_mask_env: for unprotected_path in config_protect_mask_env.split(): unprotected_path = join_paths(cls.chroot_path, unprotected_path.strip()) cls._unprotected_set.add(unprotected_path) cls._protected_is_set = True def save_changes(self): '''Метод для сохранения изменений внесенных в CONTENTS.''' if self.target_package: self.target_package.remove_empty_directories() self.target_package.write_contents_file() class TemplateExecutor: def __init__(self, datavars_module=Variables(), chroot_path='/', cl_config_archive='/var/lib/calculate/config-archive', cl_config_path='/var/lib/calculate/config', exec_dir_path='/var/lib/calculate/.execute/'): self.datavars_module = datavars_module self.chroot_path = chroot_path self.exec_files_directory = '/var/lib/calculate/.execute/' self.directory_default_parameters =\ ParametersProcessor.directory_default_parameters self.file_default_parameters =\ ParametersProcessor.file_default_parameters self.directory_appends = {'join': self._append_join_directory, 'remove': self._append_remove_directory, 'skip': self._append_skip_directory, 'clear': self._append_clear_directory, 'link': self._append_link_directory, 'replace': self._append_replace_directory} self.file_appends = {'join': self._append_join_file, 'after': self._append_after_file, 'before': self._append_before_file, 'replace': self._append_replace_file, 'remove': self._append_remove_file, 'skip': self._append_skip_file, 'clear': self._append_clear_file, 'link': self._append_link_file} self.formats_classes = ParametersProcessor.available_formats self.calculate_config_file = CalculateConfigFile( cl_config_path=cl_config_path, cl_chroot_path=chroot_path) TemplateWrapper.chroot_path = self.chroot_path TemplateWrapper.config_archive_path = cl_config_archive @property def available_appends(self): '''Метод для получения множества возможных значений append.''' appends_set = set(self.directory_appends.keys()).union( set(self.file_appends.keys())) return appends_set def execute_template(self, target_path, parameters, template_type, template_text=''): '''Метод для запуска выполнения шаблонов.''' self.executor_output = {'target_path': None, 'stdout': None, 'stderr': None, 'exec_file': None} try: template_object = TemplateWrapper(target_path, parameters, template_type, template_text=template_text) except TemplateTypeConflict as error: raise TemplateExecutorError("type conflict: {}".format(str(error))) except TemplateCollisionError as error: raise TemplateExecutorError("collision: {}".format(str(error))) # Удаляем оригинал, если это необходимо из-за наличия force или по # другим причинам. if template_object.remove_original: if template_object.target_type == DIR: self._remove_directory(template_object.target_path) else: self._remove_file(template_object.target_path) template_object.remove_from_contents() # Если был включен mirror, то после удаления файла завершаем # выполнение шаблона. if template_object.parameters.mirror: template_object.save_changes() return template_object.target_type = None if template_object.parameters.run: # Если есть параметр run -- запускаем текст шаблона. self._run_template(template_object) elif template_object.parameters.exec: # Если есть параметр exec -- запускаем текст шаблона после # обработки всех шаблонов. self._exec_template(template_object) elif template_object.parameters.append: print('Using append: {}'.format(template_object.parameters.append)) print('input path: {}'.format(template_object.input_path)) print('output path: {}'.format(template_object.output_path)) if template_object.template_type == DIR: self.directory_appends[template_object.parameters.append]( template_object) elif template_object.template_type == FILE: self.file_appends[template_object.parameters.append]( template_object) # Сохраняем изменения в CONTENTS внесенные согласно шаблону. template_object.save_changes() # Возвращаем целевой путь, если он был изменен, или # None если не был изменен. if template_object.target_path_is_changed: self.executor_output['target_path'] =\ template_object.target_path if self.executor_output['stdout']: print(self.executor_output['stdout']) if self.executor_output['stderr']: print(self.executor_output['stderr']) return self.executor_output def save_changes(self): '''Метод для сохранения чего-нибудь после выполнения всех шаблонов.''' # Пока сохраняем только получившееся содержимое config-файла. self.calculate_config_file.save_changes() def _append_join_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "join", если шаблон -- директория. Создает директорию, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) template_object.add_to_contents() def _append_remove_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "remove", если шаблон -- директория. Удаляет директорию со всем содержимым, если она есть.''' if template_object.target_type is not None: self._remove_directory(template_object.target_path) template_object.remove_from_contents() def _append_skip_directory(self, template_object: TemplateWrapper): pass def _append_clear_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "clear", если шаблон -- директория. Удаляет все содержимое директории, если она есть.''' if template_object.target_type is not None: self._clear_directory(template_object.target_path) # Меняем права и владельца очищенной директории, если это # необходимо. if template_object.parameters.chmod: self._chmod_directory(target_path, template_object.parameters.chmod) if template_object.parameters.chown: self._chown_directory(target_path, template_object.parameters.chown) template_object.clear_dir_contents() def _append_link_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "link", если шаблон -- директория. Создает ссылку на директорию, если она есть.''' self._link_directory(template_object.parameters.source, template_object.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_directory(template_object.parameters.source, template_object.parameters.chmod) if template_object.parameters.chown: self._chmod_directory(template_object.parameters.source, template_object.parameters.chown) template_object.add_to_contents() def _append_replace_directory(self, template_object: TemplateWrapper): '''Метод описывающий действия для append = "replace", если шаблон -- директория. Очищает директорию или создает, если ее нет.''' if template_object.target_type is None: self._create_directory(template_object) template_object.add_to_contents() else: self._clear_directory(template_object.target_path) template_object.clear_dir_contents() def _append_join_file(self, template_object: TemplateWrapper, join_before=False, replace=False): '''Метод описывающий действия при append = "join", если шаблон -- файл. Объединяет шаблон с целевым файлом.''' input_path = template_object.input_path output_path = template_object.output_path template_format = template_object.format_class # Задаемся значениями chmod и chown в зависимости от наличия или # отсутствия файла, принадлежности его пакету и наличию дефолтных # значений параметров. try: chmod = template_object.parameters.chmod if not chmod: if (template_object.target_type is not None and not template_object.target_without_package): chmod = self._get_file_mode(template_object.target_path) else: chmod = self.file_default_parameters.get('chmod', False) chown = template_object.parameters.chown if not chown: if (template_object.target_type is not None and not template_object.target_without_package): chown = self._get_file_owner(template_object.target_path) else: chown = self.file_default_parameters.get('chown', False) except OSError: raise TemplateExecutorError('No access to the directory: {}'. format(template_object.target_path)) if template_object.format_class.EXECUTABLE or\ template_object.md5_matching: # Действия при совпадении md5 из CONTENTS и md5 целевого файла. output_paths = [output_path] # Если целевой файл защищен, а шаблон не userspace. if template_object.protected and not template_object.is_userspace: output_paths.append(template_object.archive_path) if template_object.target_type is not None and not replace: # Если целевой файл есть и нет параметра replace -- используем # текст целевого файла. with open(input_path, 'r') as input_file: input_text = input_file.read() else: input_text = '' parsed_template = template_format(template_object.template_text, join_before=join_before) if not template_object.format_class.EXECUTABLE: parsed_input = template_format(input_text, join_before=join_before) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() for save_path in output_paths: if not os.path.exists(os.path.dirname(save_path)): self._create_directory( template_object, path_to_create=os.path.dirname(save_path)) with open(save_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца всех сохраняемых файлов, # если это необходимо. if chown: self._chown_file(save_path, chown, check_existation=False) if chmod: self._chmod_file(save_path, chmod, check_existation=False) # Убираем все ._cfg файлы. if template_object.cfg_list: for cfg_file_path in template_object.cfg_list: self._remove_file(cfg_file_path) # Убираем целевой файл из CL. self.calculate_config_file.remove_file( template_object.target_path) # Обновляем CONTENTS. if template_object.protected: if template_object.parameters.unbound: template_object.remove_from_contents() else: template_object.add_to_contents( file_md5=output_text_md5) else: changed_files = parsed_template.execute_format( input_text=input_text, target_path=template_object.target_path) # Удаляем форматный объект входного файла. del(parsed_template) # Если исполняемый формат выдал список измененных файлов для # изменения CONTENTS -- обновляем CONTENTS. if changed_files: template_object.update_contents_from_list(changed_files) else: if template_object.target_type is not None and not replace: with open(input_path, 'r') as input_file: input_text = input_file.read() else: input_text = '' parsed_input = template_format(input_text) parsed_template = template_format(template_object.template_text) parsed_input.join_template(parsed_template) # Результат наложения шаблона. output_text = parsed_input.document_text # Удаляем форматный объект входного файла. del(parsed_input) output_text_md5 = hashlib.md5(output_text.encode()).hexdigest() if not self.calculate_config_file.compare_md5(target_path, output_text_md5): if not os.path.exists(os.path.dirname(output_path)): self._create_directory( template_object, path_to_create=os.path.dirname(output_path)) with open(output_path, 'w') as output_file: output_file.write(output_text) # Меняем права доступа и владельца ._cfg????_ файлов, если # это необходимо. if chown: self._chown_file(output_path, chown, check_existation=False) if chmod: self._chmod_file(output_path, chmod, check_existation=False) # Обновляем CL. self.calculate_config_file.set_files_md5( template_object.target_path, output_text_md5) else: # Действия если CL совпало. Пока ничего не делаем. pass # Обновляем CONTENTS. template_object.add_to_contents(file_md5=output_text_md5) def _append_after_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в конец файла и в конец каждой секции файла.''' self._append_join_file(template_object, join_before=False) def _append_before_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "after", если шаблон -- файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся в начало файла и в начало каждой секции файла.''' self._append_join_file(template_object, join_before=True) def _append_skip_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "skip". Пока никаких действий.''' pass def _append_replace_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "replace", если шаблон -- файл. Очищает файл и затем накладывает на него шаблон.''' self._append_join_file(template_object, replace=True) def _append_remove_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "remove", если шаблон -- файл. Удаляет файл.''' if template_object.target_type is not None: self._remove_file(template_object.target_path) template_object.remove_from_contents() def _append_clear_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "clear", если шаблон -- файл. Очищает файл.''' if template_object.target_type is not None: self._clear_file(template_object.target_path) # Меняем владельца и права доступа к очищенному файлу, если нужно. if template_object.chown: self._chown_file(template_object.target_path, template_object.parameters.chown, check_existation=False) if template_object.chmod: self._chmod_file(template_object.target_path, template_object.parameters.chmod, check_existation=False) template_object.add_to_contents() def _append_link_file(self, template_object: TemplateWrapper): '''Метод описывающий действия при append = "link", если шаблон -- файл. Создает ссылку на файл, указанный в параметре source.''' self._link_file(template_object.parameters.source, template_object.parameters.target_path) # Меняем права и владельца файла, на который указывает ссылка. if template_object.parameters.chmod: self._chmod_file(template_object.parameters.source, template_object.parameters.chmod, check_existation=False) if template_object.parameters.chown: self._chown_file(template_object.parameters.source, template_object.parameters.chown, check_existation=False) template_object.add_to_contents() def _create_directory(self, template_object: TemplateWrapper, path_to_create=None): '''Метод для создания директории и, при необходимости, изменения владельца и доступа все директорий на пути к целевой.''' if path_to_create is None: target_path = template_object.target_path else: target_path = path_to_create template_parameters = template_object.parameters # Если файл есть, но указан chmod или chown -- используем их. if os.access(target_path, os.F_OK): if template_parameters.chmod: self._chmod_directory(target_path, template_parameters.chmod) if template_parameters.chown: self._chown_directory(target_path, template_parameters.chown) return directories_to_create = [target_path] directory_path = os.path.dirname(target_path) # Составляем список путей к директориям, которые нужно создать. while not os.access(directory_path, os.F_OK) and directory_path: directories_to_create.append(directory_path) directory_path = os.path.dirname(directory_path) # получаем информацию о владельце и правах доступа ближайшей # существующей директории. try: chmod = template_parameters.chmod if not chmod: chmod = self._get_file_mode(directory_path) else: chmod = self.directory_default_parameters.get('chmod', False) chown = template_parameters.chown if not chown: chown = self._get_file_owner(directory_path) else: chown = self.directory_default_parameters.get('chown', False) except OSError: raise TemplateExecutorError('No access to the directory: {}'. format(directory_path)) directories_to_create.reverse() # создаем директории. for create_path in directories_to_create: try: os.mkdir(create_path) # Для каждой созданной директории меняем права и владельца # если это необходимо. if chmod: self._chmod_directory(create_path, chmod) if chown: self._chown_directory(create_path, chown) except OSError as error: raise TemplateExecutorError( 'Failed to create directory: {}, reason: {}'. format(create_path, str(error))) def _remove_directory(self, target_path): '''Метод для удаления директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): try: if os.path.islink(target_path): os.unlink(target_path) else: shutil.rmtree(target_path) except Exception as error: raise TemplateExecutorError( ("Failed to delete the directory: {0}," " reason: {1}").format(target_path, str(error))) else: error_message = "target file is not directory" else: error_message = "target file does not exist" raise TemplateExecutorError(("Failed to delete the directory: {0}," "reason: {1}").format(target_path, error_message)) def _clear_directory(self, target_path): '''Метод для очистки содержимого целевой директории.''' if os.path.exists(target_path): if os.path.isdir(target_path): # Удаляем все содержимое директории. for node in os.scandir(target_path): if node.is_dir(): self._remove_directory(node.path) else: self._remove_file(node.path) return else: error_message = "target file is not directory" else: error_message = "target directory does not exist" raise TemplateExecutorError(("failed to delete directory: {}," " reason: {}").format(target_path, error_message)) def _link_directory(self, source, target_path): '''Метод для создания по целевому пути ссылки на директорию расположенную на пути, указанному в source.''' try: os.symlink(source, target_path, target_is_directory=True) except OSError: raise TemplateExecutorError("Failed to create symlink: {0} -> {1}". format(target_path, self.source)) def _remove_file(self, target_path): '''Метод для удаления файлов.''' if os.path.islink(target_path): try: os.unlink(target_path) except OSError: raise TemplateExecutorError('failed to delete the link: {}'. format(target_path)) if os.path.isfile(target_path): try: os.remove(target_path) except OSError: raise TemplateExecutorError('failed to delete the file: {}'. format(target_path)) def _clear_file(self, target_path): '''Метод для очистки файлов.''' try: with open(target_path, 'w') as f: f.truncate(0) except IOError: raise TemplateExecutorError("failed to clear the file: {}". format(target_path)) def _link_file(self, source, target_path): '''Метод для создания по целевому пути ссылки на файл расположенный на пути, указанному в source.''' try: os.symlink(source, target_path) except OSError: raise TemplateExecutorError( "Failed to create symlink to the file: {0} -> {1}". format(target_path, self.source)) def _run_template(self, template_object: TemplateWrapper): '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в run прямо во время обработки шаблонов.''' text_to_run = template_object.template_text interpreter = template_object.parameters.run try: run_process = Process(interpreter, cwd=self.chroot_path) run_process.write(text_to_run) if run_process.readable: stdout = run_process.read() if stdout: self.executor_output['stdout'] = stdout if run_process.readable_errors: stderr = run_process.read_error() if stderr: self.executor_output['stderr'] = stderr except FilesError as error: raise TemplateExecutorError(("can not run template using the" " interpreter '{}', reason: {}"). format(interpreter, str(error))) def _exec_template(self, template_object: TemplateWrapper): '''Метод для сохранения текста шаблонов, который должен быть исполнен интерпретатором указанным в exec после выполнения всех прочих шаблонов. ''' text_to_run = template_object.template_text interpreter = template_object.parameters.exec # Получаем путь к директории для хранения файлов .execute. if (self.chroot_path != '/' and not self.exec_files_directory.startswith(self.chroot_path)): exec_files_directory = join_paths(self.chroot_path, '/var/lib/calculate/.execute/') # Если директория уже существует получаем номер очередного файла для # exec по номеру последнего. exec_number = 0 if os.path.exists(exec_files_directory): exec_files_list = os.listdir(exec_files_directory) if exec_files_list: exec_number = int(exec_files_list[-1][-4:]) exec_number = str(exec_number + 1) # Получаем название нового exec_???? файла. if len(exec_number) < 4: exec_number = '0' * (4 - len(exec_number)) + exec_number exec_file_name = 'exec_{}'.format(exec_number) exec_file_path = join_paths(exec_files_directory, exec_file_name) exec_file = write_file(exec_file_path) exec_file.write(text_to_run) exec_file.close() # Добавляем новый файл в словарь файлов для дальнейшего исполнения. self.executor_output['exec_file'] = {interpreter: exec_file_name} def _chown_directory(self, target_path, chown_value): """Метод для смены владельца директории.""" try: os.chown(target_path, chown_value['uid'], chown_value['gid']) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( target_path, chown_value['uid'], chown_value['gid']), str(error))) def _chmod_directory(self, target_path, chmod_value): '''Метод для смены прав доступа к директории.''' try: os.chmod(target_path, chmod_value) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): self.output.set_error( 'Can not chmod directory: {0}, reason: {1}'. format(target_path, str(error))) def _chown_file(self, target_path, chown_value, check_existation=True): '''Метод для смены владельца файла.''' try: if check_existation and not os.path.exists(target_path): open(target_path, 'w').close() os.lchown(target_path, chown_value['uid'], chown_value['gid']) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chown file: {0} to {1}, reason: {2}'. format(target_path, self._translate_uid_gid( target_path, chown_value['uid'], chown_value['gid']), str(error))) def _chmod_file(self, target_path, chmod_value, check_existation=True): '''Метод для смены прав доступа к директории.''' try: if check_existation and not os.path.exists(target_path): open(target_path, 'w').close() os.chmod(target_path, chmod_value) except (OSError, Exception) as error: if not self._check_os_error(error, target_path): raise TemplateExecutorError( 'Can not chmod file: {0}, reason: {1}'. format(target_path, str(error))) def _get_file_mode(self, file_path): '''Метод для получения прав доступа для указанного файла.''' file_stat = os.stat(file_path) return stat.S_IMODE(file_stat.st_mode) def _get_file_owner(self, file_path): '''Метод для получения uid и gid значений для владельца указанного файла.''' file_stat = os.stat(file_path) return {'uid': file_stat.st_uid, 'gid': file_stat.st_gid} def _translate_uid_gid(self, target_path, uid, gid): '''Метод для получения из uid и gid имен пользователя и группы при, необходимых для выдачи сообщения об ошибке при попытке chown.''' import pwd import grp try: if self.chroot_path == '/': user_name = pwd.getpwuid(uid).pw_name else: user_name = str(uid) except (TypeError, KeyError): user_name = str(uid) try: if self.chroot_path == '/': group_name = grp.getgrgid(gid).gr_name else: group_name = str(gid) except (TypeError, KeyError): group_name = str(gid) return '{0}:{1}'.format(user_name, group_name) def _check_os_error(self, error, path_to_check): '''Метод для проверки причины, по которой не удалось изменить владельца или права доступа файла.''' if hasattr(error, 'errno') and error.errno == os.errno.EPERM: if self.is_vfat(path_to_check): return True return hasattr(error, 'errno') and error.errno == os.errno.EACCES and\ 'var/calculate/remote' in path_to_check def _is_vfat(self, path_to_check): '''Метод, проверяющий является ли файловая система vfat. Нужно для того, чтобы знать о возможности применения chown, chmod и т.д.''' # Инициализируем объект для проверки примонтированных файловых систем. if self.mounts is None: self.mounts = Mounts() # Проверяем файловую систему на пути. return self.mounts.get_from_fstab(what=self.mounts.TYPE, where=self.mounts.DIR, is_in=path_to_check) in {'vfat', 'ntfs-3g', 'ntfs'} vars_1 = Variables({'value_1': 'value_1', 'value_2': 'value_to_print', 'value_3': 5}) DATAVARS_MODULE = Variables({'vars_1': vars_1}) CHROOT_PATH = os.path.join(os.getcwd(), 'tests/templates/testfiles/test_wrapper_root') CL_CONFIG_PATH = os.path.join(CHROOT_PATH, 'var/lib/calculate/config') CL_CONFIG_ARCHIVE_PATH = os.path.join(CHROOT_PATH, 'var/lib/calculate/config-archive') template_executor_obj = TemplateExecutor( datavars_module=DATAVARS_MODULE, chroot_path=CHROOT_PATH, cl_config_archive=CL_CONFIG_ARCHIVE_PATH, cl_config_path=CL_CONFIG_PATH) APPENDS_SET = template_executor_obj.available_appends template_engine = TemplateEngine(datavars_module=DATAVARS_MODULE, appends_set=APPENDS_SET, chroot_path=CHROOT_PATH) target_path = os.path.join(CHROOT_PATH, 'etc/dir/file.conf') other_target_path = os.path.join(CHROOT_PATH, 'etc/other_file.conf') another_target_path = os.path.join(CHROOT_PATH, 'not_protected/file.conf') one_more_target_path = os.path.join(CHROOT_PATH, 'etc/dir/subdir/config.json') run_target_path = os.path.join(CHROOT_PATH, 'file_to_run.py') # Применение основного шаблона: # template_engine.process_template_from_string(template_text, FILE) # template_parameters = template_engine.parameters # template_text = template_engine.template_text # template_executor_obj.execute_template(target_path, # template_parameters, # FILE, template_text=template_text) # template_executor_obj.save_changes() # input() # template_engine.process_template_from_string(template_to_run, FILE) # template_parameters = template_engine.parameters # template_text = template_engine.template_text # template_executor_obj.execute_template(target_path, # template_parameters, # FILE, template_text=template_text) # template_executor_obj.save_changes() # input() # template_engine.process_template_from_string(backup_template_text, FILE) # template_parameters = template_engine.parameters # template_text = template_engine.template_text # template_executor_obj.execute_template(target_path, # template_parameters, # FILE, template_text=template_text) # template_executor_obj.save_changes() # template_engine.process_template_from_string(other_template_text, FILE) # template_parameters = template_engine.parameters # template_text = template_engine.template_text # template_executor_obj.execute_template(other_target_path, # template_parameters, # FILE, template_text=template_text) # template_executor_obj.save_changes() template_engine.process_template_from_string(one_more_template_text, FILE) template_parameters = template_engine.parameters template_text = template_engine.template_text template_executor_obj.execute_template(one_more_target_path, template_parameters, FILE, template_text=template_text) template_executor_obj.save_changes()