You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/calculate/templates/template_processor.py

2255 lines
110 KiB

# vim: fileencoding=utf-8
#
from pprint import pprint
from ..utils.package import PackageAtomParser, Package, PackageNotFound,\
PackageAtomName, Version, NonePackage
from ..utils.files import join_paths, write_file, read_file_lines, FilesError,\
check_directory_link, read_link, Process,\
get_target_from_link
from .template_engine import TemplateEngine, Variables, ConditionFailed,\
ParametersProcessor, DIR, FILE,\
ParametersContainer
from calculate.variables.datavars import StringType, ListType, NamespaceNode
from calculate.variables.loader import Datavars
from .format.base_format import Format
from ..utils.io_module import IOModule
from collections import OrderedDict, abc
from ..utils.mount import Mounts
import hashlib
import fnmatch
import shutil
import errno
import stat
import glob
import copy
import os
# Наверное временно.
CALCULATE_VERSION = Version('4.0')
class TemplateExecutorError(Exception):
pass
class TemplateTypeConflict(Exception):
pass
class TemplateCollisionError(Exception):
pass
class CalculateConfigFile:
'''Класс для работы с файлом /var/lib/calculate/config.'''
def __init__(self, cl_config_path='/var/lib/calculate/config',
cl_chroot_path='/'):
self.chroot_path = cl_chroot_path
self.cl_config_path = cl_config_path
self._config_dictionary = self._get_cl_config_dictionary()
self._unsaved_changes = False
def __contains__(self, file_path: str) -> bool:
file_path = self._remove_chroot(file_path)
return file_path in self._config_dictionary
def _get_cl_config_dictionary(self) -> OrderedDict:
'''Метод для загрузки словаря файла /var/lib/calculate/config.'''
config_dictionary = OrderedDict()
if os.path.exists(self.cl_config_path):
if os.path.isdir(self.cl_config_path):
raise TemplateExecutorError(
"directory instead calculate config file in: {}".
format(self.cl_config_path))
else:
write_file(self.cl_config_path).close()
return config_dictionary
try:
config_file_lines = read_file_lines(self.cl_config_path)
except FilesError as error:
raise TemplateExecutorError(
"cannot read calculate config file in: {0}. Reason: {1}".
format(self.cl_config_path, str(error)))
# TODO Продумать проверку корректности найденного файла.
for file_line in config_file_lines:
filename, md5_sum = file_line.split(' ')
config_dictionary.update({filename: md5_sum})
self._unsaved_changes = False
return config_dictionary
def set_files_md5(self, file_path: str, file_md5: str) -> None:
'''Метод для установки в config соответствия файла некоторой
контрольной сумме.'''
file_path = self._remove_chroot(file_path)
self._config_dictionary[file_path] = file_md5
self._unsaved_changes = True
def remove_file(self, file_path: str) -> None:
'''Метод для удаления файла из config.'''
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
self._config_dictionary.pop(file_path)
self._unsaved_changes = True
def compare_md5(self, file_path: str, file_md5: str) -> None:
'''Метод для сравнения хэш-суммы из config и некоторой заданной.'''
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
return self._config_dictionary[file_path] == file_md5
else:
return False
def save_changes(self) -> None:
'''Метод для записи изменений, внессенных в файл config.'''
if not self._unsaved_changes:
return
config_file = write_file(self.cl_config_path)
for file_name, file_md5 in self._config_dictionary.items():
config_file.write('{} {}\n'.format(file_name, file_md5))
config_file.close()
self._unsaved_changes = False
def _remove_chroot(self, file_path: str) -> str:
'''Метод для удаления корневого пути из указанного пути.'''
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return file_path
class TemplateWrapper:
'''Класс связывающий шаблон с целевым файлом и определяющий параметры
наложения шаблона, обусловленные состоянием целевого файла.'''
type_checks = {DIR: os.path.isdir,
FILE: os.path.isfile}
_protected_is_set = False
_protected_set = set()
_unprotected_set = set()
def __new__(cls, *args, **kwargs):
if not cls._protected_is_set:
# Устанавливаем значения PROTECTED, если не заданы.
if 'chroot_path' in kwargs:
chroot_path = kwargs['chroot_path']
else:
chroot_path = '/'
cls._set_protected(chroot_path)
return super().__new__(cls)
def __init__(self, target_file_path,
parameters,
template_type,
template_path,
template_text='',
target_package=None,
chroot_path='/',
config_archive_path='/var/lib/calculate/config-archive',
dbpkg=True):
self.target_path = target_file_path
self.template_path = template_path
self.chroot_path = chroot_path
self.config_archive_path = config_archive_path
self.target_package_name = None
self.package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
# Вспомогательный флаг, включается, если по целевому пути лежит файл,
# для которого не определился никакой пакет.
self.target_without_package = False
self.parameters = parameters
self.output_path = self.target_path
self.input_path = None
self.template_type = template_type
self.template_text = template_text
# Флаг, указывающий, что нужно удалить файл из target_path перед
# применением шаблона.
self.remove_original = False
# Флаг, указывающий, что целевой путь был изменен.
self.target_path_is_changed = False
# Флаг, указывающий, что файл по целевому пути является ссылкой.
self.target_is_link = False
# Пакет, к которому относится файл.
self.target_package = target_package
# Флаг, разрешающий работу с CONTENTS. Если False, то выключает
# protected для всех файлов блокирует все операции с CONTENTS и ._cfg.
self.dbpkg = dbpkg
# Флаг, указывающий, что файл является PROTECTED.
self.protected = False
# Временный флаг для определения того, является ли шаблон userspace.
self.is_userspace = False
self.format_class = None
if self.parameters.run or self.parameters.exec:
# Если есть параметр run или exec, то кроме текста шаблона ничего
# не нужно.
return
if self.parameters.append in {'join', 'before', 'after', 'replace'}:
# Получаем класс соответствующего формата файла.
if self.parameters.format:
self.format_class = ParametersProcessor.\
available_formats[self.parameters.format]
else:
# TODO Здесь будет детектор форматов. Когда-нибудь.
pass
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
if checker(target_file_path):
self.target_type = file_type
break
self.target_is_link = os.path.islink(target_file_path)
# Если установлен параметр mirror и есть параметр source,
# содержащий несуществующий путь -- удаляем целевой файл.
if self.parameters.source is True and self.parameters.mirror:
self.remove_original = True
else:
if self.parameters.mirror:
raise TemplateExecutorError("target file does not exist, while"
" 'mirror' parameter is set")
self.target_type = None
if self.format_class is not None and self.format_class.EXECUTABLE:
# Если формат исполняемый -- проверяем, существует ли директория,
# из которой будет выполняться шаблон.
if not os.path.exists(self.target_path):
# Если не существует -- создаем ее.
os.makedirs(self.target_path)
elif os.path.isfile(self.target_path):
# Если вместо директории файл -- определяем по файлу
# директорию.
self.target_path = os.path.dirname(self.target_path)
# Если есть параметр package, определяем по нему пакет.
if self.parameters.package:
self.target_package_name = self.parameters.package
if (self.target_package is None or
self.target_package.package_name !=
self.target_package_name):
self.target_package = Package(self.parameters.package,
chroot_path=self.chroot_path)
return
self._check_type_conflicts()
self._check_package_collision()
self._check_user_changes()
def _check_type_conflicts(self) -> None:
'''Метод для проверки конфликтов типов.'''
if self.parameters.append == 'link':
if self.parameters.force:
self.remove_original = True
elif self.target_is_link:
if self.template_type != self.target_type:
raise TemplateTypeConflict(
"the target is a link to {} while the template"
"is {} and has append = 'link'".
format('directory' if self.template_type ==
DIR else 'file',
'file' if self.template_type ==
DIR else 'directory'))
else:
self.remove_original = True
elif self.target_type == DIR:
raise TemplateTypeConflict("the target is a directory while "
"the template has append = 'link'")
elif self.target_type == FILE:
raise TemplateTypeConflict("the target is a file while the"
" template has append = 'link'")
elif self.template_type == DIR:
if self.target_type == FILE:
if self.parameters.force:
self.remove_original = True
else:
raise TemplateTypeConflict("the target is a file while the"
" template is a directory")
elif self.target_is_link:
if self.parameters.force:
self.remove_original = True
else:
try:
link_source = check_directory_link(
self.target_path,
chroot_path=self.chroot_path)
self.target_path = link_source
self.target_path_is_changed = True
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
elif self.template_type == FILE:
if self.parameters.force:
if self.target_type == DIR:
self.remove_original = True
elif self.target_is_link and self.target_type == FILE:
try:
link_source = read_link(self.target_path)
self.target_path = get_target_from_link(
self.target_path,
link_source,
chroot_path=self.chroot_path)
self.target_path_is_changed = True
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
elif self.target_is_link:
if self.target_type == DIR:
raise TemplateTypeConflict("the target file is a link to a"
" directory while the template"
" is a file")
else:
raise TemplateTypeConflict("the target file is a link to"
" a file while the template"
" is a file")
elif self.target_type == DIR:
raise TemplateTypeConflict("the target file is a directory"
" while the template is a file")
def _check_package_collision(self) -> None:
'''Метод для проверки на предмет коллизии, то есть конфликта пакета
шаблона и целевого файла.'''
if self.parameters.package:
parameter_package = self.parameters.package
else:
parameter_package = None
if self.target_type is not None:
try:
file_package = self.package_atom_parser.get_file_package(
self.target_path)
except PackageNotFound:
file_package = None
self.target_without_package = True
else:
file_package = None
# Если для шаблона и целевого файла никаким образом не удается
# определить пакет и есть параметр append -- шаблон пропускаем.
if parameter_package is None and file_package is None:
if self.parameters.append and self.parameters.append != 'skip':
raise TemplateCollisionError(
"'package' parameter is not defined for"
" template with 'append' parameter.")
else:
return
elif parameter_package is None:
self.target_package_name = file_package
elif file_package is None:
self.target_package_name = parameter_package
elif file_package != parameter_package and self.template_type != DIR:
raise TemplateCollisionError((
"The template package is {0} while target"
" file package is {1}").format(
parameter_package.atom,
file_package.atom
))
else:
self.target_package_name = parameter_package
if (self.target_package is None or
self.target_package_name != self.target_package.package_name):
self.target_package = Package(self.target_package_name,
chroot_path=self.chroot_path)
def _check_user_changes(self) -> None:
'''Метод для проверки наличия пользовательских изменений в
конфигурационных файлах.'''
# Эта проверка только для файлов.
if self.template_type != FILE:
return
# Проверим, является ли файл защищенным.
# Сначала проверяем по переменной CONFIG_PROTECT.
if self.dbpkg:
for protected_path in self._protected_set:
if self.target_path.startswith(protected_path):
self.protected = True
break
# Затем по переменной CONFIG_PROTECT_MASK.
for unprotected_path in self._unprotected_set:
if self.target_path.startswith(unprotected_path):
self.protected = False
break
else:
self.protected = False
# Собираем список имеющихся ._cfg файлов.
cfg_pattern = os.path.join(os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(cfg_pattern)
self.cfg_list.sort()
# Путь к архивной версии файла.
self.archive_path = self._get_archive_path(self.target_path)
self.md5_matching = (self.parameters.autoupdate
or self.parameters.force)
if not self.protected:
self.md5_matching = True
elif self.parameters.unbound:
# Если присутствует unbound, то просто модифицируем файл и
# удаляем его из CONTENTS.
self.md5_matching = True
elif self.target_type is None:
# Если целевой файл отсутствует.
if self.target_path in self.target_package:
# Проверка -- был ли файл удален.
# self.md5_matching = self.md5_matching
pass
else:
self.md5_matching = True
elif self.target_without_package:
# Если файл по целевому пути не относится к какому-либо пакету.
# self.md5_matching = False
pass
elif not self.md5_matching:
# Если файл есть и он относится к текущему пакету.
# Если по каким-то причинам уже нужно считать, что хэш-суммы
# совпадают -- в дальнейшей проверке нет необходимости.
target_md5 = self.target_package.get_md5(self.target_path)
self.md5_matching = self.target_package.is_md5_equal(
self.target_path,
file_md5=target_md5)
# Если по целевому пути файл не относящийся к какому-либо пакету и
# присутствует параметр autoupdate -- удаляем этот файл.
if (self.target_without_package and
(self.parameters.autoupdate or self.parameters.force)):
self.remove_original = True
# Определяем пути входных и выходных файлов.
if self.md5_matching:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
elif self.cfg_list and not self.parameters.unbound:
self.input_path = self.archive_path
else:
self.input_path = self.target_path
self.output_path = self.target_path
else:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.archive_path
self.output_path = self._get_cfg_path(self.target_path)
def _get_archive_path(self, file_path: str) -> str:
'''Метод для получения пути к архивной версии указанного файла.'''
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return join_paths(self.config_archive_path, file_path)
def _get_cfg_path(self, file_path: str) -> str:
'''Метод для получения пути для создания нового ._cfg????_ файла.'''
if self.cfg_list:
last_cfg_name = os.path.basename(self.cfg_list[-1])
slice_value = len('._cfg')
cfg_number = int(last_cfg_name[slice_value: slice_value + 4])
else:
cfg_number = 0
cfg_number = str(cfg_number + 1)
if len(cfg_number) < 4:
cfg_number = '0' * (4 - len(cfg_number)) + cfg_number
new_cfg_name = "._cfg{}_{}".format(cfg_number,
os.path.basename(file_path))
new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name)
return new_cfg_path
def remove_from_contents(self) -> None:
'''Метод для удаления целевого файла из CONTENTS.'''
if self.target_package is None:
return
if self.template_type == DIR:
self.target_package.remove_dir(self.target_path)
elif self.template_type == FILE:
self.target_package.remove_obj(self.target_path)
def clear_dir_contents(self) -> None:
'''Метод для удаления из CONTENTS всего содержимого директории после
применения append = "clear".'''
if self.template_type == DIR and self.target_package is not None:
self.target_package.clear_dir(self.target_path)
def add_to_contents(self, file_md5=None) -> None:
'''Метод для добавления целевого файла в CONTENTS.'''
if self.target_package is None:
return
# В подавляющем большинстве случаев берем хэш-сумму из выходного файла,
# но если по какой-то причине выходного файла нет -- пытаемся
# по целевому. Такое поведение маловероятно, но, наверное, стоит
# учесть возможность такой ситуации.
if os.path.exists(self.output_path):
source_path = self.output_path
else:
source_path = self.target_path
if self.parameters.append == 'link':
self.target_package.add_sym(source_path,
self.parameters.source)
elif self.template_type == DIR:
self.target_package.add_dir(source_path)
elif self.template_type == FILE:
self.target_package.add_obj(source_path, file_md5=file_md5)
def update_contents_from_list(self, changed_list: dict) -> None:
'''Метод для изменения CONTENTS по списку измененных файлов.'''
if self.target_package is None:
return
for file_path, mode in changed_list.items():
if mode == "modify":
if os.path.islink(file_path):
self.target_package.add_sym(file_path)
elif os.path.isdir(file_path):
self.target_package.add_dir(file_path)
elif os.path.isfile(file_path):
self.target_package.add_obj(file_path)
elif mode == "remove":
if os.path.islink(file_path) or os.path.isfile(file_path):
self.target_package.remove_obj(file_path)
elif os.path.isdir(file_path):
self.target_package.add_dir(file_path)
@classmethod
def _set_protected(cls, chroot_path: str) -> None:
'''Метод для получения множества защищенных директорий.'''
cls._protected_set = set()
cls._unprotected_set = set()
cls._protected_set.add(join_paths(chroot_path, '/etc'))
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
protected_path = join_paths(chroot_path,
protected_path.strip())
cls._protected_set.add(protected_path)
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
unprotected_path = join_paths(chroot_path,
unprotected_path.strip())
cls._unprotected_set.add(unprotected_path)
cls._protected_is_set = True
def save_changes(self) -> None:
'''Метод для сохранения изменений внесенных в CONTENTS.'''
if self.target_package:
self.target_package.remove_empty_directories()
self.target_package.write_contents_file()
class TemplateExecutor:
'''Класс исполнительного модуля.'''
def __init__(self, datavars_module=Variables(), chroot_path='/',
cl_config_archive='/var/lib/calculate/config-archive',
cl_config_path='/var/lib/calculate/config',
execute_archive_path='/var/lib/calculate/.execute/',
dbpkg=True):
# TODO добавить список измененных файлов.
self.datavars_module = datavars_module
self.chroot_path = chroot_path
# Объект для проверки файловых систем. Пока не инициализируем.
self.mounts = None
# Директория для хранения полученных при обработке exec скриптов.
self.execute_archive_path = execute_archive_path
self.execute_files = OrderedDict()
self.dbpkg = dbpkg
# Список целевых путей измененных файлов. Нужен для корректиного
# формирования calculate-заголовка.
self.processed_targets = []
self.directory_default_parameters =\
ParametersProcessor.directory_default_parameters
self.file_default_parameters =\
ParametersProcessor.file_default_parameters
# Отображение имен действий для директорий на методы их реализующие.
self.directory_appends = {'join': self._append_join_directory,
'remove': self._append_remove_directory,
'skip': self._append_skip_directory,
'clear': self._append_clear_directory,
'link': self._append_link_directory,
'replace': self._append_replace_directory}
# Отображение имен действий для файлов на методы их реализующие.
self.file_appends = {'join': self._append_join_file,
'after': self._append_after_file,
'before': self._append_before_file,
'replace': self._append_replace_file,
'remove': self._append_remove_file,
'skip': self._append_skip_file,
'clear': self._append_clear_file,
'link': self._append_link_file}
self.formats_classes = ParametersProcessor.available_formats
self.calculate_config_file = CalculateConfigFile(
cl_config_path=cl_config_path,
cl_chroot_path=chroot_path)
self.cl_config_archive_path = cl_config_archive
Format.CALCULATE_VERSION = CALCULATE_VERSION
@property
def available_appends(self) -> set:
'''Метод для получения множества возможных значений append.'''
appends_set = set(self.directory_appends.keys()).union(
set(self.file_appends.keys()))
return appends_set
def execute_template(self, target_path: str,
parameters: ParametersContainer, template_type: int,
template_path: str, template_text='',
save_changes=True, target_package=None) -> dict:
'''Метод для запуска выполнения шаблонов.'''
# Словарь с данными о результате работы исполнительного метода.
self.executor_output = {'target_path': None,
'stdout': None,
'stderr': None}
if parameters.append == 'skip':
return self.executor_output
try:
template_object = TemplateWrapper(
target_path, parameters,
template_type,
template_path,
template_text=template_text,
target_package=target_package,
chroot_path=self.chroot_path,
config_archive_path=self.cl_config_archive_path,
dbpkg=self.dbpkg)
except TemplateTypeConflict as error:
raise TemplateExecutorError("type conflict: {}".format(str(error)))
except TemplateCollisionError as error:
raise TemplateExecutorError("collision: {}".format(str(error)))
# Удаляем оригинал, если это необходимо из-за наличия force или по
# другим причинам.
if template_object.remove_original:
if template_object.target_type == DIR:
self._remove_directory(template_object.target_path)
else:
self._remove_file(template_object.target_path)
if self.dbpkg:
template_object.remove_from_contents()
template_object.target_type = None
# Если был включен mirror, то после удаления файла завершаем
# выполнение шаблона.
if template_object.parameters.mirror:
if save_changes:
template_object.save_changes()
return
template_object.target_type = None
if template_object.parameters.run:
# Если есть параметр run -- запускаем текст шаблона.
self._run_template(template_object)
elif template_object.parameters.exec:
# Если есть параметр exec -- запускаем текст шаблона после
# обработки всех шаблонов.
self._exec_template(template_object)
elif template_object.parameters.append:
if template_object.template_type == DIR:
self.directory_appends[template_object.parameters.append](
template_object)
elif template_object.template_type == FILE:
self.file_appends[template_object.parameters.append](
template_object)
# Сохраняем изменения в CONTENTS внесенные согласно шаблону.
if save_changes:
template_object.save_changes()
# Возвращаем целевой путь, если он был изменен, или
# None если не был изменен.
if template_object.target_path_is_changed:
self.executor_output['target_path'] =\
template_object.target_path
return self.executor_output
def save_changes(self):
'''Метод для сохранения чего-нибудь после выполнения всех шаблонов.'''
# Пока сохраняем только получившееся содержимое config-файла.
self.calculate_config_file.save_changes()
def _append_join_directory(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия для append = "join", если шаблон --
директория. Создает директорию, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
if self.dbpkg:
template_object.add_to_contents()
def _append_remove_directory(self,
template_object: TemplateWrapper) -> None:
'''Метод описывающий действия для append = "remove", если шаблон --
директория. Удаляет директорию со всем содержимым, если она есть.'''
if template_object.target_type is not None:
self._remove_directory(template_object.target_path)
if self.dbpkg:
template_object.remove_from_contents()
def _append_skip_directory(self,
template_object: TemplateWrapper) -> None:
pass
def _append_clear_directory(self,
template_object: TemplateWrapper) -> None:
'''Метод описывающий действия для append = "clear", если шаблон --
директория. Удаляет все содержимое директории, если она есть.'''
if template_object.target_type is not None:
self._clear_directory(template_object.target_path)
# Меняем права и владельца очищенной директории, если это
# необходимо.
if template_object.parameters.chmod:
self._chmod_directory(template_object.target_path,
template_object.parameters.chmod)
if template_object.parameters.chown:
self._chown_directory(template_object.target_path,
template_object.parameters.chown)
if self.dbpkg:
template_object.clear_dir_contents()
def _append_link_directory(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия для append = "link", если шаблон --
директория. Создает ссылку на директорию, если она есть.'''
self._link_directory(template_object.parameters.source,
template_object.target_path)
# Меняем права и владельца файла, на который указывает ссылка.
if template_object.parameters.chmod:
self._chmod_directory(template_object.parameters.source,
template_object.parameters.chmod)
if template_object.parameters.chown:
self._chown_directory(template_object.parameters.source,
template_object.parameters.chown)
if self.dbpkg:
template_object.add_to_contents()
def _append_replace_directory(self,
template_object: TemplateWrapper) -> None:
'''Метод описывающий действия для append = "replace", если шаблон --
директория. Очищает директорию или создает, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
if self.dbpkg:
template_object.add_to_contents()
else:
self._clear_directory(template_object.target_path)
if self.dbpkg:
template_object.clear_dir_contents()
def _append_join_file(self, template_object: TemplateWrapper,
join_before=False, replace=False) -> None:
'''Метод описывающий действия при append = "join", если шаблон -- файл.
Объединяет шаблон с целевым файлом.'''
input_path = template_object.input_path
output_path = template_object.output_path
template_format = template_object.format_class
# Задаемся значениями chmod и chown в зависимости от наличия или
# отсутствия файла, принадлежности его пакету и наличия значений
# параметров по умолчанию.
chmod = template_object.parameters.chmod
if not chmod:
if (template_object.target_type is not None and
not template_object.target_without_package):
chmod = self._get_file_mode(template_object.target_path)
else:
chmod = self.file_default_parameters.get('chmod', False)
chown = template_object.parameters.chown
if not chown:
if (template_object.target_type is not None and
not template_object.target_without_package):
chown = self._get_file_owner(template_object.target_path)
else:
chown = self.file_default_parameters.get('chown', False)
if template_format.EXECUTABLE or template_object.md5_matching:
# Действия при совпадении md5 из CONTENTS и md5 целевого файла.
# А также если шаблон просто исполнительный.
output_paths = [output_path]
# Если целевой файл защищен, а шаблон не userspace.
if template_object.protected and not template_object.is_userspace:
# Тогда также обновляем архив.
output_paths.append(template_object.archive_path)
if template_object.target_type is not None and not replace:
# Если целевой файл есть и нет параметра replace -- используем
# текст целевого файла.
if (not input_path.startswith(self.cl_config_archive_path) or
os.path.exists(input_path)):
# Если входной файл просто не из архива, или из архива и
# при этом существует -- используем его
with open(input_path, 'r') as input_file:
input_text = input_file.read()
else:
# В противном случае используем пустой файл. (!)
input_text = ''
else:
input_text = ''
parsed_template = template_format(template_object.template_text,
template_object.template_path,
ignore_comments=True)
if not template_object.format_class.EXECUTABLE:
# Если шаблон не исполнительный разбираем входной текст.
parsed_input = template_format(
input_text,
template_object.template_path,
add_header=True,
join_before=join_before,
already_changed=(template_object.target_path
in self.processed_targets),
parameters=template_object.parameters)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
for save_path in output_paths:
if not os.path.exists(os.path.dirname(save_path)):
self._create_directory(
template_object,
path_to_create=os.path.dirname(save_path))
with open(save_path, 'w') as output_file:
output_file.write(output_text)
# Меняем права доступа и владельца всех сохраняемых файлов,
# если это необходимо.
if chown:
self._chown_file(save_path, chown)
if chmod:
self._chmod_file(save_path, chmod)
if self.dbpkg:
# Убираем все ._cfg файлы.
if template_object.cfg_list:
for cfg_file_path in template_object.cfg_list:
self._remove_file(cfg_file_path)
# Убираем целевой файл из CL.
self.calculate_config_file.remove_file(
template_object.target_path)
# Обновляем CONTENTS.
if template_object.protected:
if template_object.parameters.unbound:
template_object.remove_from_contents()
else:
template_object.add_to_contents(
file_md5=output_text_md5)
else:
changed_files = parsed_template.execute_format(
input_text=input_text,
target_path=template_object.target_path)
# Удаляем форматный объект входного файла.
del(parsed_template)
# Если исполняемый формат выдал список измененных файлов для
# изменения CONTENTS и при этом задан пакет -- обновляем
# CONTENTS.
if (self.dbpkg and changed_files and
template_object.target_package):
template_object.update_contents_from_list(changed_files)
else:
if template_object.target_type is not None and not replace:
if (not input_path.startswith(self.cl_config_archive_path) or
os.path.exists(input_path)):
with open(input_path, 'r') as input_file:
input_text = input_file.read()
else:
input_text = ''
else:
input_text = ''
parsed_input = template_format(
input_text,
template_object.template_path,
add_header=True,
join_before=join_before,
already_changed=False,
parameters=template_object.parameters)
parsed_template = template_format(template_object.template_text,
template_object.template_path,
ignore_comments=True)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
if not self.calculate_config_file.compare_md5(
template_object.target_path,
output_text_md5):
if not os.path.exists(os.path.dirname(output_path)):
self._create_directory(
template_object,
path_to_create=os.path.dirname(output_path))
with open(output_path, 'w') as output_file:
output_file.write(output_text)
# Меняем права доступа и владельца ._cfg????_ файлов, если
# это необходимо.
if chown:
self._chown_file(output_path, chown)
if chmod:
self._chmod_file(output_path, chmod)
# Обновляем CL.
self.calculate_config_file.set_files_md5(
template_object.target_path,
output_text_md5)
# Обновляем CONTENTS.
template_object.add_to_contents(file_md5=output_text_md5)
else:
# Действия если CL совпало. Пока ничего не делаем.
pass
def _append_after_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "after", если шаблон --
файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся
в конец файла и в конец каждой секции файла.'''
self._append_join_file(template_object, join_before=False)
def _append_before_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "after", если шаблон --
файл. Объединяет шаблон с целевым файлом так, чтобы текст добавлялся
в начало файла и в начало каждой секции файла.'''
self._append_join_file(template_object, join_before=True)
def _append_skip_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "skip". Пока никаких
действий.'''
pass
def _append_replace_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "replace", если шаблон --
файл. Очищает файл и затем накладывает на него шаблон.'''
self._append_join_file(template_object, replace=True)
def _append_remove_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "remove", если шаблон --
файл. Удаляет файл.'''
if template_object.target_type is not None:
self._remove_file(template_object.target_path)
if self.dbpkg:
template_object.remove_from_contents()
def _append_clear_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "clear", если шаблон --
файл. Очищает файл.'''
if template_object.target_type is not None:
self._clear_file(template_object.target_path)
# Меняем владельца и права доступа к очищенному файлу, если нужно.
if template_object.parameters.chown:
self._chown_file(template_object.target_path,
template_object.parameters.chown)
if template_object.parameters.chmod:
self._chmod_file(template_object.target_path,
template_object.parameters.chmod)
if self.dbpkg:
template_object.add_to_contents()
def _append_link_file(self, template_object: TemplateWrapper) -> None:
'''Метод описывающий действия при append = "link", если шаблон --
файл. Создает ссылку на файл, указанный в параметре source.'''
self._link_file(template_object.parameters.source,
template_object.target_path)
# Меняем права и владельца файла, на который указывает ссылка.
if template_object.parameters.chmod:
self._chmod_file(template_object.parameters.source,
template_object.parameters.chmod)
if template_object.parameters.chown:
self._chown_file(template_object.parameters.source,
template_object.parameters.chown)
if self.dbpkg:
template_object.add_to_contents()
def _create_directory(self, template_object: TemplateWrapper,
path_to_create=None) -> None:
'''Метод для создания директории и, при необходимости, изменения
владельца и доступа все директорий на пути к целевой.'''
if path_to_create is None:
target_path = template_object.target_path
else:
target_path = path_to_create
template_parameters = template_object.parameters
# Если файл есть, но указан chmod или chown -- используем их.
if os.access(target_path, os.F_OK):
if template_parameters.chmod:
self._chmod_directory(target_path, template_parameters.chmod)
if template_parameters.chown:
self._chown_directory(target_path, template_parameters.chown)
return
directories_to_create = [target_path]
directory_path = os.path.dirname(target_path)
# Составляем список путей к директориям, которые нужно создать.
while not os.access(directory_path, os.F_OK) and directory_path:
directories_to_create.append(directory_path)
directory_path = os.path.dirname(directory_path)
# получаем информацию о владельце и правах доступа ближайшей
# существующей директории.
chmod = template_parameters.chmod
if not chmod:
chmod = self._get_file_mode(directory_path)
chown = template_parameters.chown
if not chown:
chown = self._get_file_owner(directory_path)
directories_to_create.reverse()
# создаем директории.
for create_path in directories_to_create:
try:
os.mkdir(create_path)
# Для каждой созданной директории меняем права и владельца
# если это необходимо.
if chmod:
self._chmod_directory(create_path, chmod)
if chown:
self._chown_directory(create_path, chown)
except OSError as error:
raise TemplateExecutorError(
'Failed to create directory: {}, reason: {}'.
format(create_path, str(error)))
def _remove_directory(self, target_path: str) -> None:
'''Метод для удаления директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
return
except Exception as error:
raise TemplateExecutorError(
("Failed to delete the directory: {0},"
" reason: {1}").format(target_path,
str(error)))
else:
error_message = "target file is not directory"
else:
error_message = "target file does not exist"
raise TemplateExecutorError(("Failed to delete the directory: {0},"
"reason: {1}").format(target_path,
error_message))
def _clear_directory(self, target_path: str) -> None:
'''Метод для очистки содержимого целевой директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
# Удаляем все содержимое директории.
for node in os.scandir(target_path):
if node.is_dir():
self._remove_directory(node.path)
else:
self._remove_file(node.path)
return
else:
error_message = "target file is not directory"
else:
error_message = "target directory does not exist"
raise TemplateExecutorError(("failed to clear directory: {},"
" reason: {}").format(target_path,
error_message))
def _link_directory(self, source: str, target_path: str) -> None:
'''Метод для создания по целевому пути ссылки на директорию
расположенную на пути, указанному в source.'''
try:
os.symlink(source, target_path, target_is_directory=True)
except OSError as error:
raise TemplateExecutorError(
"failed to create symlink: {0} -> {1}, reason: {2}".
format(target_path, source, str(error)))
def _remove_file(self, target_path: str) -> None:
'''Метод для удаления файлов.'''
if os.path.exists(target_path):
if os.path.isfile(target_path):
if os.path.islink(target_path):
try:
os.unlink(target_path)
return
except OSError as error:
error_message = str(error)
try:
os.remove(target_path)
return
except OSError as error:
error_message = str(error)
else:
error_message = 'target is not a file'
elif os.path.islink(target_path):
try:
os.unlink(target_path)
return
except OSError as error:
error_message = str(error)
else:
error_message = 'target file does not exist'
raise TemplateExecutorError(("failed to remove the file: {0},"
"reason: {1}").format(target_path,
error_message))
def _clear_file(self, target_path: str) -> None:
'''Метод для очистки файлов.'''
if os.path.exists(target_path):
if os.path.isfile(target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
return
except IOError as error:
error_message = str(error)
else:
error_message = 'target is not a file'
else:
error_message = 'target file does not exist'
raise TemplateExecutorError(("failed to clear the file: {0},"
"reason: {1}").format(target_path,
error_message))
def _link_file(self, source: str, target_path: str) -> None:
'''Метод для создания по целевому пути ссылки на файл расположенный на
пути, указанному в source.'''
try:
os.symlink(source, target_path)
except OSError as error:
raise TemplateExecutorError(
"failed to create symlink to the file: {0} -> {1}, reason: {2}".
format(target_path, source, str(error)))
def _run_template(self, template_object: TemplateWrapper) -> None:
'''Метод для сохранения текста шаблонов, который должен быть исполнен
интерпретатором указанным в run прямо во время обработки шаблонов.'''
text_to_run = template_object.template_text
interpreter = template_object.parameters.run
if template_object.template_type == FILE:
cwd_path = os.path.dirname(template_object.target_path)
else:
cwd_path = template_object.target_path
if not os.path.exists(cwd_path):
raise TemplateExecutorError(("can not run template, directory from"
" target path does not exist: {}").
format(template_object.target_path))
elif not os.path.isdir(cwd_path):
raise TemplateExecutorError(("can not exec template, {} is not a"
" directory.").format(cwd_path))
try:
run_process = Process(interpreter, cwd=cwd_path)
run_process.write(text_to_run)
if run_process.readable:
stdout = run_process.read()
if stdout:
self.executor_output['stdout'] = stdout
if run_process.readable_errors:
stderr = run_process.read_error()
if stderr:
self.executor_output['stderr'] = stderr
except FilesError as error:
raise TemplateExecutorError(("can not run template using the"
" interpreter '{}', reason: {}").
format(interpreter, str(error)))
def _exec_template(self, template_object: TemplateWrapper) -> None:
'''Метод для сохранения текста шаблонов, который должен быть исполнен
интерпретатором указанным в exec после выполнения всех прочих шаблонов.
'''
text_to_run = template_object.template_text
interpreter = template_object.parameters.exec
if template_object.template_type == FILE:
cwd_path = os.path.dirname(template_object.target_path)
else:
cwd_path = template_object.target_path
if not os.path.exists(cwd_path):
raise TemplateExecutorError(
("can not exec template, directory from"
" target path does not exist: {}").
format(cwd_path))
elif not os.path.isdir(cwd_path):
raise TemplateExecutorError(("can not exec template, {} is not a"
" directory.").format(cwd_path))
# Получаем путь к директории для хранения файлов .execute.
if (self.chroot_path != '/' and not
self.execute_archive_path.startswith(self.chroot_path)):
self.execute_archive_path = join_paths(
self.chroot_path,
self.execute_archive_path)
# Если директория уже существует получаем номер очередного файла для
# exec по номеру последнего.
exec_number = 0
if not self.execute_files:
if os.path.exists(self.execute_archive_path):
exec_files_list = os.listdir(self.execute_archive_path)
if exec_files_list:
exec_number = int(exec_files_list[-1][-4:])
exec_number = str(exec_number + 1)
else:
exec_number = str(len(self.execute_files) + 1)
# Получаем название нового exec_???? файла.
if len(exec_number) < 4:
exec_number = '0' * (4 - len(exec_number)) + exec_number
exec_file_name = 'exec_{}'.format(exec_number)
exec_file_path = join_paths(self.execute_archive_path,
exec_file_name)
exec_file = write_file(exec_file_path)
exec_file.write(text_to_run)
exec_file.close()
self.execute_files[exec_file_path] = {'interpreter': interpreter,
'cwd_path': cwd_path,
'template_path':
template_object.template_path}
def execute_file(self, interpreter: str, exec_path: str,
cwd_path: str) -> dict:
"""Метод для выполнения скриптов сохраненных в результате обработки
шаблонов с параметром exec. Скрипт всегда удаляется вне зависимости
от успешности его выполнения."""
exec_output = {'stdout': None, 'stderr': None}
with open(exec_path, 'r') as exec_file:
script_text = exec_file.read()
os.remove(exec_path)
try:
run_process = Process(interpreter, cwd=cwd_path)
run_process.write(script_text)
if run_process.readable:
stdout = run_process.read()
if stdout:
exec_output['stdout'] = stdout
if run_process.readable_errors:
stderr = run_process.read_error()
if stderr:
exec_output['stderr'] = stderr
return exec_output
except FilesError as error:
raise TemplateExecutorError(("can not run template using the"
" interpreter '{}', reason: {}").
format(interpreter, str(error)))
def _chown_directory(self, target_path: str, chown_value: dict) -> None:
"""Метод для смены владельца директории."""
try:
if os.path.exists(target_path):
os.chown(target_path, chown_value['uid'], chown_value['gid'])
else:
raise TemplateExecutorError(
'The target directory does not exist: {0}'.
format(target_path))
except (OSError, Exception) as error:
if not self._check_os_error(error, target_path):
raise TemplateExecutorError(
'Can not chown file: {0} to {1}, reason: {2}'.
format(target_path, self._translate_uid_gid(
chown_value['uid'],
chown_value['gid']),
str(error)))
def _chmod_directory(self, target_path: str, chmod_value: int) -> None:
'''Метод для смены прав доступа к директории.'''
try:
if os.path.exists(target_path):
os.chmod(target_path, chmod_value)
else:
raise TemplateExecutorError(
'The target directory does not exist: {0}'.
format(target_path))
except (OSError, Exception) as error:
if not self._check_os_error(error, target_path):
raise TemplateExecutorError(
'Can not chmod directory: {0}, reason: {1}'.
format(target_path, str(error)))
def _chown_file(self, target_path: str, chown_value: dict) -> None:
'''Метод для смены владельца файла.'''
try:
if os.path.exists(target_path):
os.lchown(target_path, chown_value['uid'], chown_value['gid'])
else:
raise TemplateExecutorError(
'The target file does not exist: {0}'.
format(target_path))
except (OSError, Exception) as error:
if not self._check_os_error(error, target_path):
raise TemplateExecutorError(
'Can not chown file: {0} to {1}, reason: {2}'.
format(target_path, self._translate_uid_gid(
chown_value['uid'],
chown_value['gid']),
str(error)))
def _chmod_file(self, target_path: str, chmod_value: int) -> None:
'''Метод для смены прав доступа к директории.'''
try:
if not os.path.exists(target_path):
raise TemplateExecutorError(
'The target file does not exist: {0}'.
format(target_path))
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
if not self._check_os_error(error, target_path):
raise TemplateExecutorError(
'Can not chmod file: {0}, reason: {1}'.
format(target_path, str(error)))
def _get_file_mode(self, file_path: str) -> int:
'''Метод для получения прав доступа для указанного файла.'''
if not os.path.exists(file_path):
raise TemplateExecutorError(
'The file to get mode does not exist: {0}'.
format(file_path))
file_stat = os.stat(file_path)
return stat.S_IMODE(file_stat.st_mode)
def _get_file_owner(self, file_path: str) -> dict:
'''Метод для получения uid и gid значений для владельца указанного
файла.'''
if not os.path.exists(file_path):
raise TemplateExecutorError(
'The file to get owner does not exist: {0}'.
format(file_path))
file_stat = os.stat(file_path)
return {'uid': file_stat.st_uid, 'gid': file_stat.st_gid}
def _translate_uid_gid(self, uid: int, gid: int) -> str:
'''Метод для получения из uid и gid имен пользователя и группы при,
необходимых для выдачи сообщения об ошибке при попытке chown.'''
import pwd
import grp
try:
if self.chroot_path == '/':
user_name = pwd.getpwuid(uid).pw_name
else:
user_name = self._get_user_name_from_uid(uid)
except (TypeError, KeyError):
user_name = str(uid)
try:
if self.chroot_path == '/':
group_name = grp.getgrgid(gid).gr_name
else:
group_name = self._get_group_name_form_gid(gid)
except (TypeError, KeyError):
group_name = str(gid)
return '{0}:{1}'.format(user_name, group_name)
def _get_user_name_from_uid(self, uid: int) -> str:
'''Метод для получения имени пользователя по его uid.'''
passwd_file_path = os.path.join(self.chroot_path, 'etc/passwd')
passwd_dictionary = dict()
if os.path.exists(passwd_file_path):
with open(passwd_file_path, 'r') as passwd_file:
for line in passwd_file:
if line.startswith('#'):
continue
passwd_line = line.split(':')
line_uid = int(passwd_line[2])
line_username = passwd_line[0]
if line_uid and line_username:
passwd_dictionary[line_uid] = line_username
if uid in passwd_dictionary:
return passwd_dictionary[uid]
else:
return str(uid)
def _get_group_name_form_gid(self, gid: int) -> str:
'''Метод для получения названия группы по его gid.'''
group_file_path = os.path.join(self.chroot_path, 'etc/group')
group_dictionary = dict()
if os.path.exists(group_file_path):
with open(group_file_path, 'r') as group_file:
for line in group_file:
if line.startswith('#'):
continue
group_line = line.split(':')
line_gid = int(group_line[2])
line_group = group_line[0]
if line_gid and line_group:
group_dictionary[line_gid] = line_group
if gid in group_dictionary:
return group_dictionary[gid]
else:
return str(gid)
def _check_os_error(self, error: Exception, path_to_check: str) -> True:
'''Метод для проверки причины, по которой не удалось изменить владельца
или права доступа файла.'''
if hasattr(error, 'errno') and error.errno == errno.EPERM:
if self._is_vfat(path_to_check):
return True
return hasattr(error, 'errno') and error.errno == errno.EACCES and\
'var/calculate/remote' in path_to_check
def _is_vfat(self, path_to_check):
'''Метод, проверяющий является ли файловая система vfat. Нужно для того,
чтобы знать о возможности применения chown, chmod и т.д.'''
# Инициализируем объект для проверки примонтированных файловых систем.
if self.mounts is None:
self.mounts = Mounts()
# Проверяем файловую систему на пути.
fstab_info = self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path_to_check)[0]
return fstab_info in {'vfat', 'ntfs-3g', 'ntfs'}
class DirectoryTree:
'''Класс реализующий дерево каталогов для пакета.'''
def __init__(self, base_directory):
self.base_directory = base_directory
self._tree = {}
def update_tree(self, tree: dict) -> None:
'''Метод, инициирующий наложение заданного дерева каталогов на данный
экземпляр дерева.'''
self._update(self._tree, tree)
def _update(self, original_tree: dict, tree: dict) -> dict:
'''Метод для рекурсивного наложения одного дерева на другое.'''
for parent, child in tree.items():
if isinstance(child, abc.Mapping):
original_tree[parent] = self._update(original_tree.get(parent,
dict()),
child)
else:
original_tree[parent] = child
return original_tree
def show_tree(self) -> None:
pprint(self._tree)
def get_directory_tree(self, directory: str):
'''Метод для получения нового дерева из ветви данного дерева,
соответствующей некоторому каталогу, содержащемуся в корне данного
дерева.'''
directory_tree = DirectoryTree(os.path.join(self.base_directory,
directory))
if directory in self._tree:
directory_tree._tree = self._tree[directory]
return directory_tree
def __getitem__(self, name: str) -> dict:
if name in self._tree:
return self._tree[name]
else:
return None
def __setitem__(self, name: str, value):
self._tree[name] = value
def __iter__(self):
if self._tree is not None:
return iter(self._tree.keys())
else:
return iter([])
def __repr__(self) -> str:
return '<DirectoryTree: {}>'.format(self._tree)
def __bool__(self) -> bool:
return bool(self._tree)
class DirectoryProcessor:
'''Класс обработчика директорий шаблонов.'''
def __init__(self, action: str, datavars_module=Variables(), package='',
output_module=IOModule(), dbpkg=True):
if isinstance(action, list):
self.action = action
else:
self.action = [action]
self.output = output_module
self.datavars_module = datavars_module
# Корневая директория.
if 'cl_chroot_path' in datavars_module.main:
self.cl_chroot_path = datavars_module.main.cl_chroot_path
else:
self.cl_chroot_path = '/'
self.cl_ignore_files = self._get_cl_ignore_files()
# Путь к файлу config с хэш-суммами файлов, для которых уже
# предлагались изменения.
if 'cl_config_path' in datavars_module.main:
self.cl_config_path = self._add_chroot_path(
self.datavars_module.main.cl_config_path)
else:
self.cl_config_path = self._add_chroot_path(
'/var/lib/calculate/config')
# Путь к директории config-archive для хранения оригинальной ветки
# конфигурационных файлов.
if 'cl_config_archive' in datavars_module.main:
self.cl_config_archive = self._add_chroot_path(
self.datavars_module.main.cl_config_archive)
else:
self.cl_config_archive = self._add_chroot_path(
'/var/lib/calculate/config-archive')
# Путь к директории .execute для хранения хранения файлов скриптов,
# полученных из шаблонов с параметром exec.
if 'cl_exec_dir_path' in datavars_module.main:
self.cl_exec_dir_path = self._add_chroot_path(
self.datavars_module.main.cl_exec_dir_path)
else:
self.cl_exec_dir_path = self._add_chroot_path(
'/var/lib/calculate/.execute/')
# Инициализируем исполнительный модуль.
self.template_executor = TemplateExecutor(
datavars_module=self.datavars_module,
chroot_path=self.cl_chroot_path,
cl_config_archive=self.cl_config_archive,
cl_config_path=self.cl_config_path,
execute_archive_path=self.cl_exec_dir_path,
dbpkg=dbpkg)
# Инициализируем шаблонизатор.
self.template_engine = TemplateEngine(
datavars_module=self.datavars_module,
chroot_path=self.cl_chroot_path,
appends_set=self.template_executor.available_appends)
# Разбираем atom имя пакета, для которого накладываем шаблоны.
self.for_package = False
if package:
if isinstance(package, PackageAtomName):
self.for_package = package
elif isinstance(package, str):
try:
self.for_package = self.template_engine.\
parameters_processor.check_package_parameter(package)
except ConditionFailed as error:
# ConfitionFailed потому что для проверки значения пакета,
# используется тот же метод, что проверяет параметр package
# в шаблонах, а в них этот параметр играет роль условия.
self.output.set_error(str(error))
return
# Получаем список директорий шаблонов.
# TODO переменная список.
if isinstance(self.datavars_module, (Datavars, NamespaceNode)):
var_type = self.datavars_module.main[
'cl_template_path'].variable_type
else:
var_type = StringType
if var_type is StringType:
self.template_paths = (self.datavars_module.
main.cl_template_path.split(','))
elif var_type is ListType:
self.template_paths = self.datavars_module.main.cl_template_path
# Список обработанных пакетов.
self.processed_packages = []
# Список пакетов, взятый из значений параметра merge.
self.packages_to_merge = []
# Словарь для хранения деревьев директорий для различных пакетов.
self.packages_file_trees = {}
def _get_cl_ignore_files(self) -> list:
'''Метод для получения из соответствующей переменной списка паттернов
для обнаружения игнорируемых в ходе обработки шаблонов файлов.'''
if 'cl_ignore_files' in self.datavars_module.main:
if isinstance(self.datavars_module, (Datavars, NamespaceNode)):
var_type = self.datavars_module.main[
'cl_ignore_files'].variable_type
else:
var_type = StringType
cl_ignore_files = self.datavars_module.main.cl_ignore_files
cl_ignore_files_list = []
if var_type is StringType:
for pattern in cl_ignore_files.split(','):
cl_ignore_files_list.append(pattern.strip())
elif var_type is ListType:
cl_ignore_files_list = cl_ignore_files
return cl_ignore_files_list
else:
return []
def _add_chroot_path(self, path_to_add: str) -> str:
'''Метод для добавления корневого пути к заданному пути, если таковой
задан и отсутствует в заданном пути.'''
if (self.cl_chroot_path != '/' and
not path_to_add.startswith(self.cl_chroot_path)):
return join_paths(self.cl_chroot_path, path_to_add)
else:
return path_to_add
def process_template_directories(self) -> None:
'''Метод для обхода шаблонов, содержащихся в каталогах из
main.cl_template.path.'''
# Режим заполнения очередей директорий пакетов, необходимых для более
# быстрой обработки параметра merge.
self.fill_trees = bool(self.for_package)
if self.for_package:
if self.for_package is NonePackage:
package = self.for_package
else:
package = Package(self.for_package,
chroot_path=self.cl_chroot_path)
else:
package = None
for directory_path in self.template_paths:
self.base_directory = directory_path.strip()
entries = os.scandir(self.base_directory)
for node in entries:
self.directory_tree = {}
self._walk_directory_tree(node.path,
self.cl_chroot_path,
ParametersContainer(),
directory_tree=self.directory_tree,
package=package)
# Теперь когда дерево заполнено, можно выключить этот режим.
self.fill_trees = False
if self.for_package:
self.output.set_info('Processing packages from merge parameter...')
self.processed_packages.append(self.for_package)
self._merge_packages()
if self.template_executor.execute_files:
self._run_exec_files()
self.template_executor.save_changes()
def _merge_packages(self):
'''Метод для выполнения шаблонов относящихся к пакетам, указанным во
всех встреченных значениях параметра merge.'''
not_merged_packages = []
while self.packages_to_merge:
self.for_package = self.packages_to_merge.pop()
if self.for_package not in self.packages_file_trees:
self.output.set_error(
"Error: package '{0}' not found for action{1} '{2}'.".
format(self.for_package,
's' if len(self.action) > 1 else '',
', '.join(self.action)))
not_merged_packages.append(self.for_package)
continue
package = Package(self.for_package,
chroot_path=self.cl_chroot_path)
for directory_name in self.packages_file_trees[self.for_package]:
directory_tree = self.packages_file_trees[self.for_package].\
get_directory_tree(directory_name)
self._walk_directory_tree(directory_tree.base_directory,
self.cl_chroot_path,
ParametersContainer(),
directory_tree=directory_tree,
package=package)
self.processed_packages.append(self.for_package)
if not_merged_packages:
self.output.set_error('Packages {} is not merged.'.
format(','.join(self.packages_to_merge)))
else:
self.output.set_success('All packages are merged.')
def _run_exec_files(self):
'''Метод для выполнения скриптов, полученных в результате обработки
шаблонов с параметром exec.'''
for exec_file_path, exec_info in\
self.template_executor.execute_files.items():
try:
output = self.template_executor.execute_file(
exec_info['interpreter'],
exec_file_path,
exec_info['cwd_path'])
if output['stdout'] is not None:
self.output.set_info("stdout from template: {}:\n{}\n".
format(exec_info['template_path'],
output['stdout']))
if output['stderr'] is not None:
self.output.set_error("stderr from template: {}:\n{}\n".
format(exec_info['template_path'],
output['stderr']))
except TemplateExecutorError as error:
self.output.set_error(str(error))
def _get_directories_queue(self, path: str) -> tuple:
'''Уже не актуальный метод для построение очередей из путей к
шаблонам. Хотя возможно еще пригодится.'''
directories_queue = []
for base_dir in self.template_paths:
base_dir = base_dir.strip()
if path.startswith(base_dir):
base_directory = base_dir
break
while path != base_directory:
path, dir_name = os.path.split(path)
directories_queue.append(dir_name)
return base_directory, directories_queue
def _walk_directory_tree(self, current_directory_path: str,
current_target_path: str,
directory_parameters: ParametersContainer(),
directory_tree={},
package=None) -> None:
'''Метод для рекурсивного обхода директорий с шаблонами, а также, при
необходимости, заполнения деревьев директорий шаблонов, с помощью
которых далее выполняются шаблоны пакетов из merge.'''
directory_name = os.path.basename(current_directory_path)
# Если включено заполнение дерева создаем пустой словарь для сбора
# содержимого текущей директории.
if self.fill_trees:
directory_tree[directory_name] = {}
self.template_engine.change_directory(current_directory_path)
template_directories, template_files = self._scan_directory(
current_directory_path)
# обрабатываем в первую очередь шаблон директории.
if '.calculate_directory' in template_files:
template_files.remove('.calculate_directory')
template_text = self._parse_template(directory_parameters,
'.calculate_directory',
DIR, current_directory_path)
if template_text is False:
directory_tree = {}
return
# directory_parameters.print_parameters_for_debug()
# Корректируем путь к целевой директории.
current_target_path = self._make_target_path(current_target_path,
directory_name,
directory_parameters)
# Если нужно заполнять дерево директорий, отправляем в метод для
# проверки параметров package и action текущее дерево.
if not self._check_package_and_action(
directory_parameters,
current_directory_path,
directory_tree=(directory_tree if
self.fill_trees else None)):
# Если проверка не пройдена и включено заполнение дерева, то,
# используя нынешнее состояние дерева директорий, обновляем
# дерево пакета текущего шаблона директории.
if self.fill_trees:
self._update_package_tree(directory_parameters.package,
directory_tree[directory_name])
# Перед выходом из директории очищаем текущий уровень
# дерева.
directory_tree = {}
return
# Если есть параметр merge -- сохраняем присутствующие в нем пакеты
# для последующей обработки.
if self.for_package and directory_parameters.merge:
self.packages_to_merge.extend(directory_parameters.merge)
# Если присутствует параметр package -- проверяем, изменился ли он
# и был ли задан до этого. Если не был задан или изменился, меняем
# текущий пакет ветки шаблонов на этот.
if directory_parameters.package:
if (package is None or
package.package_name != directory_parameters.package):
package = Package(directory_parameters.package,
chroot_path=self.cl_chroot_path)
else:
# Если .calculate_directory отсутствует -- создаем директорию,
# используя унаследованные параметры и имя самой директории.
if not self._check_package_and_action(
directory_parameters,
current_directory_path,
directory_tree=(directory_tree if
self.fill_trees else None)):
# Обновляем дерево директорий для данного пакета.
if self.fill_trees:
self._update_package_tree(directory_parameters.package,
directory_tree[directory_name])
# Перед выходом из директории очищаем текущий уровень
# дерева.
directory_tree = {}
return
# Для того чтобы директория была создана, просто добавляем параметр
# append = join.
directory_parameters.set_parameter({'append': 'join'})
template_text = ''
current_target_path = os.path.join(current_target_path,
directory_name)
# Выполняем наложение шаблона.
current_target_path = self._execute_template(
current_target_path,
directory_parameters, DIR,
current_directory_path,
template_text=template_text,
package=package)
if not current_target_path:
directory_tree = {}
return
# Далее обрабатываем файлы шаблонов хранящихся в директории.
# Если в данный момент обходим дерево -- берем список файлов и
# директорий из него.
if not self.fill_trees and directory_tree:
template_directories, template_file =\
self._get_files_and_dirs_from_tree(template_files,
template_directories,
directory_tree)
# Просто псевдоним, чтобы меньше путаницы было далее.
template_parameters = directory_parameters
# Обрабатываем файлы шаблонов.
for template_name in template_files:
# Удаляем все параметры, которые не наследуются и используем
# полученный контейнер для сбора параметров файлов шаблонов.
template_parameters.remove_not_inheritable()
template_path = os.path.join(current_directory_path, template_name)
# Применяем к файлу шаблона шаблонизатор.
template_text = self._parse_template(template_parameters,
template_name,
FILE, current_directory_path)
if template_text is False:
continue
# template_parameters.print_parameters_for_debug()
# Если находимся на стадии заполнения дерева директорий --
# проверяем параметры package и action с заполнением дерева.
if not self._check_package_and_action(
template_parameters,
template_path,
directory_tree=(directory_tree[directory_name] if
self.fill_trees else None)):
continue
# Если есть параметр merge добавляем его содержимое в список
# пакетов для последующей обработки.
if self.for_package and template_parameters.merge:
self.packages_to_merge.extend(template_parameters.merge)
# Корректируем путь к целевому файлу.
target_file_path = self._make_target_path(current_target_path,
template_name,
template_parameters)
# Создаем объект пакета для файлов шаблонов этой директории.
template_package = package
if template_parameters.package:
if (template_package is None or
package.package_name != directory_parameters.package):
template_package = Package(directory_parameters.package,
chroot_path=self.cl_chroot_path)
# Выполняем действия, указанные в шаблоне.
target_file_path = self._execute_template(
target_file_path,
template_parameters,
FILE, template_path,
template_text=template_text,
package=template_package)
if not target_file_path:
continue
# * * * ПРИДУМАТЬ ОПТИМИЗАЦИЮ * * *
# TODO Потому что накладывать дерево каждый раз, когда обнаружены
# файлы какого-то пакета не рационально.
# Обновляем дерево директорий для данного пакета, если происходит
# его заполнение.
if self.fill_trees:
self._update_package_tree(template_parameters.package,
directory_tree[directory_name])
directory_tree[directory_name] = {}
# Проходимся далее по директориям.
for directory in template_directories:
if self.fill_trees:
self._walk_directory_tree(
directory, current_target_path,
directory_parameters.get_inheritables(),
directory_tree=directory_tree[directory_name],
package=package)
directory_tree[directory_name] = {}
else:
if isinstance(directory, DirectoryTree):
# Если директории взяты из дерева -- путь к директории
# соответствует корню каждой взятой ветви дерева.
directory_path = directory.base_directory
else:
directory_path = directory
self._walk_directory_tree(
directory_path,
current_target_path,
directory_parameters.get_inheritables(),
package=package)
if self.fill_trees:
directory_tree = {}
return
def _scan_directory(self, directory_path: str) -> tuple:
'''Метод для получения и фильтрования списка файлов и директорий,
содержащихся в директории шаблонов.'''
template_files = []
template_directories = []
entries = os.scandir(directory_path)
for node in entries:
if not self._check_file_name(node.name):
continue
if node.is_symlink():
self.output.set_warning(
'symlink: {0} is ignored in the template directory: {1}'.
format(node.path, directory_path))
continue
elif node.is_dir():
template_directories.append(node.path)
elif node.is_file():
template_files.append(node.name)
return template_directories, template_files
def _check_file_name(self, filename: str) -> bool:
'''Метод для проверки соответствия имени файла содержимому переменной
main.cl_ignore_files.'''
for pattern in self.cl_ignore_files:
if fnmatch.fnmatch(filename, pattern):
return False
return True
def _get_files_and_dirs_from_tree(self, template_files,
template_directories,
directory_tree: DirectoryTree):
'''Метод для получения списков файлов и директорий из дерева
директорий.'''
tree_files = []
tree_directories = []
for template in directory_tree:
if template in template_files:
tree_files.append(template)
else:
next_directory_tree =\
directory_tree.get_directory_tree(template)
tree_directories.append(next_directory_tree)
return tree_directories, tree_files
def _make_target_path(self, target_path, template_name,
parameters):
'''Метод для получения пути к целевому файлу с учетом наличия
параметров name, path и append = skip.'''
# Если есть параметр name -- меняем имя шаблона.
if parameters.name:
template_name = parameters.name
# Если для шаблона задан путь -- меняем путь к директории шаблона.
if parameters.path:
target_path = join_paths(self.cl_chroot_path,
parameters.path)
# Если параметр append не равен skip -- добавляем имя шаблона к
# целевому пути.
if not parameters.append == 'skip':
target_path = os.path.join(target_path,
template_name)
return target_path
def _parse_template(self, parameters,
template_name,
template_type,
template_directory):
'''Метод для разбора шаблонов, получения значений их параметров и их
текста после отработки шаблонизитора.'''
if template_type == DIR:
template_path = template_directory
else:
template_path = join_paths(template_directory, template_name)
try:
self.template_engine.process_template(template_name,
template_type,
parameters=parameters)
return self.template_engine.template_text
except ConditionFailed as error:
self.output.set_warning('{0}. Template: {1}'.
format(str(error),
template_path))
return False
except Exception as error:
self.output.set_error('Template error: {0} Template: {1}'.
format(str(error),
template_path))
return False
def _execute_template(self, target_path,
parameters,
template_type,
template_path,
template_text='',
package=None):
'''Метод для наложения шаблонов и обработки информации полученной после
наложения.'''
try:
output = self.template_executor.execute_template(
target_path,
parameters,
template_type,
template_path,
template_text=template_text,
target_package=package)
# Если во время выполнения шаблона был изменен целевой путь,
# например, из-за ссылки на директорию в source -- обновляем
# целевой путь.
if output['target_path'] is not None:
target_path = output['target_path']
# Если есть вывод от параметра run -- выводим как info.
if output['stdout'] is not None:
self.output.set_info("stdout from template: {}:\n{}\n".format(
template_path,
output['stdout']))
# Если есть ошибки от параметра run -- выводим их как error.
if output['stderr'] is not None:
self.output.set_error("stderr from template: {}:\n{}\n".
format(template_path,
output['stderr']))
# Если run выполнен с ошибками -- пропускаем директорию.
return False
except TemplateExecutorError as error:
self.output.set_error('Template execution error: {} Template: {}'.
format(str(error),
template_path))
return False
if template_type == DIR:
self.output.set_success('Processed directory: {}'.
format(template_path))
else:
self.output.set_success('Processed template: {}'.
format(template_path))
return target_path
def _update_package_tree(self, package, current_level_tree):
'''Метод для обновления деревьев директорий пакетов, необходимых для
обработки шаблонов пакетов из значения параметра merge.'''
# Если текущему уровню соответствует заглушка None или он содержит
# файлы, то есть не пустой -- тогда есть смысл обновлять.
if current_level_tree is None or current_level_tree:
if package in self.packages_file_trees:
# Если для данного пакета уже есть дерево --
# накладываем на него текущее.
self.packages_file_trees[package].update_tree(
copy.deepcopy(self.directory_tree)
)
else:
# Если для данного пакета еще нет дерева --
# копируем для него текущее.
directory_tree = DirectoryTree(self.base_directory)
directory_tree.update_tree(
copy.deepcopy(self.directory_tree))
self.packages_file_trees[package] = directory_tree
def _check_package_and_action(self, parameters, template_path,
directory_tree=None):
'''Метод для проверки параметров action и package во время обработки
каталогов с шаблонами. Если среди аргументов указано также
дерево каталогов, то в случае несовпадения значений package для файла
или директории, им в дереве присваивается значение None.'''
if parameters.append != 'skip' or parameters.action:
if not parameters.action:
self.output.set_warning(
("Action parameter is not set for template:"
" {0}").format(template_path))
return False
elif parameters.action not in self.action:
self.output.set_warning(
("Action parameter value '{0}' does not match its"
" current value{1} '{2}'. Template: {3}").format(
parameters.action,
's' if len(self.action) > 1
else '',
', '.join(self.action),
template_path))
return False
if self.for_package:
if not parameters.package:
if self.for_package is not NonePackage:
self.output.set_warning(
"'package' parameter is not defined. Template: {}".
format(template_path))
elif parameters.package != self.for_package:
if directory_tree is not None:
template_name = os.path.basename(template_path)
directory_tree[template_name] = None
self.output.set_warning(
("'package' parameter value '{0}' does not "
"match its current target package '{1}'. "
"Template: {2}").
format(parameters.package.atom,
self.for_package.atom,
template_path)
)
return False
return True