Some appends is added or directories. Protecred and unprotected template division is supported now.

packages
Иванов Денис 4 years ago
parent c3e5cdb9e1
commit 3ac3973731

@ -35,7 +35,7 @@ class ConditionFailed(TemplateSyntaxError):
class Variables(MutableMapping):
'''Класс заглушка вместо модуля переменных для тестов.'''
'''Класс-заглушка вместо модуля переменных для тестов.'''
def __init__(self, *args, **kwargs):
self.__attrs = dict(*args, **kwargs)
@ -336,12 +336,18 @@ class ParametersProcessor:
else:
real_path = parameter_value
if not os.path.exists(real_path):
raise IncorrectParameter(
"File from 'source' parameter does not exist")
source_file_type = DIR if os.path.isdir(real_path) else FILE
if not parameter_value or isinstance(parameter_value, bool):
raise IncorrectParameter("'source' parameter value is empty")
elif not os.path.exists(real_path):
if (self._parameters_container.append == 'link' and
self.template_type != source_file_type):
raise IncorrectParameter(
"File from 'source' parameter does not exist")
"the type of the 'source' file does not match")
return os.path.normpath(real_path)

@ -11,23 +11,30 @@ import operator
class PackageError(Exception):
'''Исключение выбрасываемое при ошибках в объектах Package, работающих
с CONTENTS-файлами пакетов.'''
pass
# Коды ошибок ATOM-парсера.
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
class PackageAtomError(Exception):
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.'''
def __init__(self, message='Package atom error', errno=DEFAULT):
self.message = message
self.errno = errno
class VersionError(Exception):
'''Исключение выбрасываемое объектами Version.'''
pass
class PackageNotFound(Exception):
'''Специальное исключение выбрасываемое, если не удалось найти пакет, к
которому принадлежит файл.'''
pass
@ -344,6 +351,7 @@ class PackageAtomParser:
self.parse_package_parameter(package_atom)
return True
else:
# Используем glob-паттерн для поиска.
if 'version' in self._atom_dictionary:
full_name = self._atom_dictionary['name'] + '-' +\
self._atom_dictionary['version']._version_string
@ -364,12 +372,14 @@ class PackageAtomParser:
full_name))
if not glob_result:
# Если ничего не нашлось.
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
if len(glob_result) == 1:
# Если нашелся один пакет.
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path)
self._check_use_flags_value(pkg_path)
@ -381,6 +391,8 @@ class PackageAtomParser:
self._atom_dictionary['pkg_path'] = pkg_path
else:
packages = dict()
# Если подходящих пакетов много -- проверяем по use-флагам,
# слотам и версии, если таковые заданы.
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
@ -394,6 +406,7 @@ class PackageAtomParser:
continue
if not packages:
# Если после проверки отсеялись все кандидаты.
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
@ -401,11 +414,12 @@ class PackageAtomParser:
errno=NOTEXIST)
if len(packages) == 1:
# Если был найден только один кандидат -- выдаем его.
pkg_path = next(iter(packages.keys()))
self._atom_dictionary['pkg_path'] = pkg_path
self._atom_dictionary['version'] = packages[pkg_path]
else:
# Берем старшую версию.
# Если подходящих пакетов много -- берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
self._atom_dictionary['pkg_path'] = pkg_path
@ -492,26 +506,24 @@ class PackageAtomParser:
else:
raise PackageNotFound("The file does not belong to any package")
def set_chroot(self, chroot_path):
if chroot_path != '/':
self.pkg_path = join_paths(chroot_path, self.default_pkg_path)
else:
self.pkg_path = self.default_pkg_path
@property
def atom_dictionary(self):
'''Метод для получения ATOM-словаря.'''
return self._atom_dictionary
@property
def atom_name(self):
'''Метод для получения из ATOM-словаря объекта PackageAtomName.'''
return PackageAtomName(self._atom_dictionary)
@atom_dictionary.setter
def set_atom_dictionary(self, atom_dictionary):
'''Метод для установки ATOM-словаря.'''
self._atom_dictionary = atom_dictionary
@property
def category(self):
'''Метод для получения категории пакета из ATOM-словаря.'''
if 'category' in self._atom_dictionary:
return self._atom_dictionary['category']
else:
@ -519,6 +531,7 @@ class PackageAtomParser:
@property
def name(self):
'''Метод для получения имени пакета из ATOM-словаря.'''
if 'name' in self._atom_dictionary:
return self._atom_dictionary['name']
else:
@ -526,6 +539,7 @@ class PackageAtomParser:
@property
def slot(self):
'''Метод для получения значения slot из ATOM-словаря.'''
if 'slot' in self._atom_dictionary:
return self._atom_dictionary['slot']
else:
@ -535,6 +549,7 @@ class PackageAtomParser:
@property
def use_flags(self):
'''Метод для получения use-флагов из ATOM-словаря.'''
if 'use_flags' in self._atom_dictionary:
return self._atom_dictionary['use_flags']
else:
@ -546,6 +561,7 @@ class PackageAtomParser:
class Package:
'''Класс для работы с принадлежностью файлов пакетам.'''
re_cfg = re.compile(r'/\._cfg\d{4}_')
def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'):
@ -565,6 +581,8 @@ class Package:
self.read_contents_file()
def _get_contents_path(self, package_atom):
'''Метод для получения из ATOM-названия или готового объекта
PackageAtomName пути к файлу CONTENTS.'''
if isinstance(package_atom, str):
package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
@ -581,15 +599,19 @@ class Package:
format(package_atom, type(package_atom)))
def remove_cfg_prefix(self, file_name):
'''Метод для удаления префикса ._cfg????_.'''
return self.re_cfg.sub('/', file_name)
def remove_chroot_path(self, file_name):
'''Метод для удаления из пути файла корневого пути, если он не
является /.'''
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
return file_name[len(self.chroot_path):]
else:
return file_name
def read_contents_file(self):
'''Метод для чтения файла CONTENTS.'''
try:
contents_text = read_file(self.contents_file_path)
except FilesError as error:
@ -604,17 +626,20 @@ class Package:
return False
def write_contents_file(self):
'''Метод для записи файла CONTENTS.'''
contents_file = open(self.contents_file_path, 'w')
contents_text = self.render_contents_file()
contents_file.write(contents_text)
contents_file.close()
def render_contents_file(self):
'''Метод для получения текста файла CONTENTS.'''
contents_format = ContentsFormat('', template_parser=False)
contents_format._document_dictionary = self.contents_dictionary
return contents_format.document_text
def add_dir(self, file_name):
'''Метод для добавления в CONTENTS директорий.'''
file_name = self.remove_cfg_prefix(file_name)
file_name = self.remove_chroot_path(file_name)
@ -626,6 +651,7 @@ class Package:
self.contents_dictionary[file_name] = contents_item
def add_sym(self, file_name, target_path=None):
'''Метод для добавления в CONTENTS символьных ссылок.'''
file_name = self.remove_cfg_prefix(file_name)
real_path = file_name
@ -651,6 +677,7 @@ class Package:
self.contents_dictionary[file_name] = contents_item
def add_obj(self, file_name, file_md5=None):
'''Метод для добавления в CONTENTS обычных файлов как obj.'''
file_name = self.remove_cfg_prefix(file_name)
real_path = file_name
@ -675,6 +702,7 @@ class Package:
self.contents_dictionary[file_name] = contents_item
def add_file(self, file_name):
'''Метод для добавления в CONTENTS файла любого типа.'''
if file_name != '/':
real_path = file_name
file_name = self.remove_chroot_path(file_name)
@ -688,16 +716,38 @@ class Package:
elif os.path.isfile(real_path):
self.add_obj(file_name)
def remove_file(self, file_path):
def remove_obj(self, file_path):
'''Метод для удаления файлов и ссылок.'''
file_path = self.remove_chroot_path(file_path)
if file_path in self.contents_dictionary:
self.contents_dictionary.remove(file_path)
def remove_dir(self, file_path):
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри удаляемой директории и самой директории.'''
directory_path = self.remove_chroot_path(file_path)
for file_path in self.contents_dictionary:
if file_path.startswith(directory_path):
self.contents_dictionary.remove(file_path)
def clear_dir(self, file_path):
'''Метод для удаления из CONTENTS файлов и директорий находящихся
внутри очищаемой директории.'''
directory_path = self.remove_chroot_path(file_path)
for file_path in self.contents_dictionary:
if file_path == directory_path:
continue
if file_path.startswith(directory_path):
self.contents_dictionary.remove(file_path)
def remove_empty_directories(self):
'''Метод для удаления из CONTENTS директорий, которые после удаления
тех или иных файлов больше не находятся на пути к тем файлам, которые
по-прежнему принадлежат пакету.'''
used_directories = set()
not_directory_list = [path for path, value in
self.contents_dictionary.items()
if value[type] != 'dir']
if value['type'] != 'dir']
for filepath in not_directory_list:
file_directory = os.path.dirname(filepath)
while file_directory != '/':
@ -708,6 +758,7 @@ class Package:
self.contents_dictionary.remove(filepath)
def get_md5(self, file_path):
'''Метод для получения md5 хэш-суммы указанного файла.'''
try:
file_text = read_file(file_path).encode()
except FilesError as error:
@ -717,7 +768,7 @@ class Package:
return file_md5
def is_md5_equal(self, file_path, file_md5=None):
'''Метод для проверки соответствия md5 хэш суммы той, что указана в
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана
в файле CONTENTS.'''
if file_md5 is None:
file_md5 = self.get_md5(file_path)

@ -144,15 +144,20 @@ class TemplateWrapper:
config_archive_path = '/var/lib/calculate/config-archive'
_protected_is_set = False
_protected_set = {'/etc'}
_protected_set = set()
_unprotected_set = set()
def __new__(cls, *args, **kwargs):
if not cls._protected_is_set:
# Устанавливаем значения PROTECTED, если не заданы.
cls._set_protected()
return super().__new__(cls)
def __init__(self, target_file_path, parameters, template_type,
template_text=''):
self.target_path = target_file_path
self.target_package_name = None
print("TemplateWrapper init chroot: {}".format(self.chroot_path))
self.package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
@ -172,10 +177,15 @@ class TemplateWrapper:
# применением шаблона.
self.remove_original = False
# Флаг, указывающий, что целевой путь был изменен.
self.target_path_is_changed = False
# Флаг, указывающий, что файл является PROTECTED.
self.protected = False
# Временный флаг для определения того, является ли шаблон userspace.
self.is_userspace = False
print("Getting format class...")
# Получаем класс соответствующего формата файла.
if self.parameters.format:
self.format_class = ParametersProcessor.\
@ -184,7 +194,6 @@ class TemplateWrapper:
# Здесь будет детектор форматов.
pass
print("Checking file types conflicts...")
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
@ -197,7 +206,6 @@ class TemplateWrapper:
self.check_conflicts()
print("Checking package collision...")
self.check_package_collision()
# Если целью является файл -- проверяем наличие ._cfg0000_filename
@ -217,7 +225,6 @@ class TemplateWrapper:
if self.parameters.append == 'link':
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == DIR:
raise TemplateTypeConflict(
"The target is a directory while the"
@ -227,40 +234,34 @@ class TemplateWrapper:
"The target is a file while the"
" template has append = 'link'.")
if self.template_type == DIR:
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == FILE:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.template_type == DIR:
if self.target_type == FILE:
if self.parameters.force:
self.remove_original = True
else:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.target_is_link and self.target_type == DIR:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
if self.parameters.force:
self.remove_original = True
else:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
if self.template_type == FILE:
elif self.template_type == FILE:
if self.parameters.force:
if self.target_type == DIR:
self.remove_original = True
return
elif self.target_is_link and self.target_type == FILE:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
elif self.target_type == FILE:
return
else:
elif self.target_type == DIR:
raise TemplateTypeConflict("The target file is a directory"
" while the template is a file.")
@ -315,7 +316,20 @@ class TemplateWrapper:
if self.target_type != FILE:
return
print("Looking for ._cfg files...")
# Проверим, является ли файл защищенным.
# Сначала проверяем по переменной CONFIG_PROTECT.
for protected_path in self._protected_set:
if self.target_path.startswith(protected_path):
self.protected = True
break
# Затем по переменной CONFIG_PROTECT_MASK.
for unprotected_path in self._unprotected_set:
if self.target_path.startswith(unprotected_path):
self.protected = False
break
print('Is protected:', self.protected)
# Собираем список имеющихся ._cfg файлов.
cfg_pattern = os.path.join(os.path.dirname(self.target_path),
"._cfg????_{}".format(
@ -325,12 +339,13 @@ class TemplateWrapper:
# Путь к архивной версии файла.
self.archive_path = self._get_archive_path(self.target_path)
if self.parameters.unbound:
if not self.protected:
self.md5_matching = True
elif self.parameters.unbound:
# Если присутствует unbound, то просто модифицируем файл и
# удаляем его из CONTENTS.
self.md5_matching = True
elif self.target_type is None:
print("There is no files in target path...")
# Если целевой файл отсутствует.
if self.target_path in self.target_package:
# Проверка -- был ли файл удален.
@ -338,14 +353,12 @@ class TemplateWrapper:
else:
self.md5_matching = True
elif self.target_without_package:
print("There is file without package in target path...")
# Если файл по целевому пути не относится к какому-либо пакету.
if self.parameters.unbound:
self.md5_matching = True
else:
self.md5_matching = False
else:
print("Check md5 hash...")
# Если файл есть и он относится к текущему пакету.
target_md5 = self.target_package.get_md5(self.target_path)
self.md5_matching = self.target_package.is_md5_equal(
@ -361,7 +374,6 @@ class TemplateWrapper:
# Определяем путей входных и выходных файлов.
if self.md5_matching:
print("md5 matches")
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
@ -370,7 +382,6 @@ class TemplateWrapper:
self.output_path = self.target_path
else:
print("md5 does not match")
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
@ -380,13 +391,13 @@ class TemplateWrapper:
self.output_path = self._get_cfg_path(self.target_path)
def _get_archive_path(self, file_path):
print("Getting archive path...")
'''Метод для получения пути к архивной версии указанного файла.'''
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return join_paths(self.config_archive_path, file_path)
def _get_cfg_path(self, file_path):
print("Getting ._cfg path...")
'''Метод для получения пути для создания нового ._cfg????_ файла.'''
if self.cfg_list:
last_cfg_name = os.path.basename(self.cfg_list[-1])
@ -406,9 +417,20 @@ class TemplateWrapper:
return new_cfg_path
def remove_from_contents(self):
self.target_package.remove_file(self.target_path)
'''Метод для удаления целевого файла из CONTENTS.'''
if self.template_type == DIR:
self.target_package.remove_dir(self.target_path)
elif self.template_type == FILE:
self.target_package.remove_obj(self.target_path)
def clear_dir_contents(self):
'''Метод для удаления из CONTENTS всего содержимого директории после
применения append = "clear".'''
if self.template_type == DIR:
self.target_package.clear_dir(self.target_path)
def add_to_contents(self, file_md5=None):
'''Метод для добавления целевого файла в CONTENTS.'''
if self.parameters.append == 'link':
self.target_package.add_sym(target_path, self.parameters.source)
elif self.template_type == DIR:
@ -418,23 +440,32 @@ class TemplateWrapper:
@classmethod
def _set_protected(cls):
'''Метод для получения множества защищенных директорий.'''
if cls._protected_is_set:
return
cls._protected_set = set()
cls._protected_set.add(join_paths(cls.chroot_path, '/etc'))
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
cls._protected_set.add(protected_path.strip())
protected_path = join_paths(cls.chroot_path,
protected_path.strip())
cls._protected_set.add(protected_path)
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
cls._unprotected_set.add(protected_path.strip())
unprotected_path = join_paths(cls.chroot_path,
unprotected_path.strip())
cls._unprotected_set.add(unprotected_path)
cls._protected_is_set = True
def save_changes(self):
'''Метод для сохранения изменений внесенных в CONTENTS.'''
if self.target_package and not self.parameters.unbound:
self.target_package.remove_empty_directories()
self.target_package.write_contents_file()
@ -453,7 +484,10 @@ class TemplateExecutor:
self.directory_appends = {'join': self._append_join_directory,
'remove': self._append_remove_directory,
'clear': self._append_clear_directory}
'skip': self._append_skip_directory,
'clear': self._append_clear_directory,
'link': self._append_link_directory,
'replace': self._append_replace_directory}
self.file_appends = {'join': self._append_join_file}
@ -467,6 +501,7 @@ class TemplateExecutor:
@property
def available_appends(self):
'''Метод для получения множества возможных значений append.'''
appends_set = set(self.directory_appends.keys()).union(
set(self.file_appends.keys()))
@ -474,25 +509,18 @@ class TemplateExecutor:
def execute_template(self, target_path, parameters, template_type,
template_text=''):
print('Template parameters:')
parameters.print_parameters_for_debug()
'''Метод для запуска выполнения шаблонов.'''
try:
template_object = TemplateWrapper(target_path, parameters,
template_type,
template_text=template_text)
except TemplateTypeConflict as error:
print('type conflict: {}'.format(str(error)))
return
except TemplateCollisionError as error:
print('collision: {}'.format(str(error)))
return
# Temporary
print("Wrapper succesfully initialized.")
print("Input path:", template_object.input_path)
print("Output path:", template_object.output_path)
# Удаляем оригинал, если это необходимо из-за наличия force или по
# другим причинам.
if template_object.remove_original:
if template_object.target_type == DIR:
self._remove_directory(template_object.target_path)
@ -500,48 +528,87 @@ class TemplateExecutor:
self._remove_file(template_object.target_path)
template_object.target_type = None
# Добавить поддержку run, execute и т.д.
# (!) Добавить поддержку run, execute и т.д.
if template_object.template_type == DIR:
self.directory_appends[template_object.parameters.append](
template_object)
elif template_object.template_type == FILE:
self.file_appends[template_object.parameters.append](
template_object)
# Сохраняем изменения в CONTENTS внесенные согласно шаблону.
template_object.save_changes()
# Возвращаем целевой путь, если он был изменен.
if template_object.target_path_is_changed:
return template_object.target_path
else:
return
def save_changes(self):
'''Метод для сохранения чего-нибудь после выполнения всех шаблонов.'''
# Пока сохраняем только получившееся содержимое config-файла.
self.calculate_config_file.save_changes()
def _append_join_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "join", если шаблон --
директория. Создает директорию, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
template_object.add_to_contents()
else:
pass
def _append_remove_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "remove", если шаблон --
директория. Удаляет директорию со всем содержимым, если она есть.'''
if template_object.target_type is not None:
self._remove_directory(template_object.target_path)
template_object.remove_from_contents()
else:
pass
def _append_skip_directory(self, template_object: TemplateWrapper):
pass
def _append_clear_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "clear", если шаблон --
директория. Удаляет все содержимое директории, если она есть.'''
if template_object.target_type is not None:
# Подумать об организации очистки CONTENTS для этого append.
pass
self._clear_directory(template_object.target_path)
template_object.clear_dir_contents()
def _append_link_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "link", если шаблон --
директория. Создает ссылку на директорию, если она есть.'''
target = template_object.parameters.source
link_path = template_object.target_path
self._link_directory(link_path, target)
template_object.add_to_contents()
def _append_replace_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "replace", если шаблон --
директория. Очищает директорию или создает, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
template_object.add_to_contents()
else:
self._clear_directory(template_object.target_path)
template_object.clear_dir_contents()
def _append_join_file(self, template_object: TemplateWrapper):
'''Метод описывающий действия при append = "join".'''
'''Метод описывающий действия при append = "join", если шаблон --
файл. Объединяет шаблон с целевым файлом.'''
input_path = template_object.input_path
output_path = template_object.output_path
template_format = template_object.format_class
if template_object.md5_matching:
# Действия при совпадении md5 из CONTENTS и md5 целевого файла.
output_paths = [output_path]
# Проверка на предмет userspace.
if not template_object.is_userspace:
# Если целевой файл защищен, а шаблон не userspace.
if template_object.protected and not template_object.is_userspace:
output_paths.append(template_object.archive_path)
if template_object.target_type is not None:
@ -565,6 +632,16 @@ class TemplateExecutor:
with open(save_path, 'w') as output_file:
output_file.write(output_text)
if template_object.parameters.chown:
self.chown_file(save_path,
template_object.parameters.chown,
check_existation=False)
if template_object.parameters.chmod:
self.chmod_file(save_path,
template_object.parameters.chmod,
check_existation=False)
# Убираем все ._cfg файлы.
if template_object.cfg_list:
for cfg_file_path in template_object.cfg_list:
@ -574,10 +651,11 @@ class TemplateExecutor:
self.calculate_config_file.remove_file(template_object.target_path)
# Обновляем CONTENTS.
if template_object.parameters.unbound:
template_object.remove_from_contents()
else:
template_object.add_to_contents(file_md5=output_text_md5)
if template_object.protected:
if template_object.parameters.unbound:
template_object.remove_from_contents()
else:
template_object.add_to_contents(file_md5=output_text_md5)
else:
with open(input_path, 'r') as input_file:
input_text = input_file.read()
@ -594,23 +672,32 @@ class TemplateExecutor:
if not self.calculate_config_file.compare_md5(target_path,
output_text_md5):
print("cl is different, create ._cfg")
with open(output_path, 'w') as output_file:
output_file.write(output_text)
if template_object.parameters.chown:
self.chown_file(output_path,
template_object.parameters.chown,
check_existation=False)
if template_object.parameters.chmod:
self.chmod_file(output_file,
template_object.parameters.chmod,
check_existation=False)
self.calculate_config_file.set_files_md5(
template_object.target_path,
output_text_md5)
print("current config: ",
self.calculate_config_file._config_dictionary)
else:
# Действия если CL совпало. Пока ничего не делаем.
print("cl is similar...")
pass
# Обновляем CONTENTS.
template_object.add_to_contents(file_md5=output_text_md5)
def _create_directory(self, template_object: TemplateWrapper):
'''Метод для создания директории и, при необходимости, изменения
владельца и доступа все директорий на пути к целевой.'''
target_path = template_object.target_path
template_parameters = template_object.parameters
@ -655,7 +742,7 @@ class TemplateExecutor:
if (template_parameters.chown and
template_parameters.chown != current_owner):
self.chown_directory
self.chown_directory(create_path)
elif 'chown' in self.directory_default_parameters:
self.chown_directory(
create_path,
@ -667,6 +754,7 @@ class TemplateExecutor:
format(create_path, str(error)))
def _remove_directory(self, target_path):
'''Метод для удаления директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
try:
@ -689,6 +777,7 @@ class TemplateExecutor:
error_message))
def _clear_directory(self, target_path):
'''Метод для очистки содержимого целевой директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
for node in os.scandir(target_path):
@ -706,18 +795,16 @@ class TemplateExecutor:
error_message))
def _link_directory(self, target_path, source):
'''Метод для создания по целевому пути ссылки на директорию
расположенную на пути, указанному в source.'''
try:
os.symlink(source, target_path, target_is_directory=True)
print('linked: {0} -> {1}'.format(os.path.basename(target_path),
os.path.basename(source)))
except OSError:
raise TemplateExecutorError("Failed to create symlink: {0} -> {1}".
format(target_path, self.source))
def _execute_template(self, template_object):
pass
def _remove_file(self, target_path):
'''Метод для удаления файлов.'''
if os.path.islink(target_path):
try:
os.unlink(target_path)
@ -728,10 +815,11 @@ class TemplateExecutor:
try:
os.remove(target_path)
except OSError:
self.output.set_error('failed to delete the file: {}'.
format(target_path))
raise TemplateExecutorError('failed to delete the file: {}'.
format(target_path))
def _clear_file(self, target_path):
'''Метод для очистки файлов.'''
try:
with open(target_path, 'w') as f:
f.truncate(0)
@ -739,11 +827,18 @@ class TemplateExecutor:
raise TemplateExecutorError("failed to clear the file: {}".
format(target_path))
def _link_file(self, target_path):
pass
def _link_file(self, target_path, source):
'''Метод для создания по целевому пути ссылки на файл расположенный на
пути, указанному в source.'''
try:
os.symlink(source, target_path)
except OSError:
raise TemplateExecutorError(
"Failed to create symlink to the file: {0} -> {1}".
format(target_path, self.source))
def chown_directory(self, target_path, chown_value={}):
"""Сменить владельца директории."""
"""Метод для смены владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
@ -751,47 +846,42 @@ class TemplateExecutor:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
raise TemplateExecutorError(
'Can not chown directory: {0}, reason: {1}'.
format(target_path, str(error)))
def chmod_directory(self, target_path, chmod_value=False):
"""Сменить права доступа к директории."""
'''Метод для смены прав доступа к директории.'''
if not chmod_value:
chmod_value = self.template_parameters.chmod
print('chmod value = {}'.format(chmod_value))
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
self.output.set_error('reason: {}'.format(str(error)))
return False
self.output.set_error('Can not chmod directory: {0}, reason: {1}'.
format(target_path, str(error)))
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
def chown_file(self, target_path, chown_value, check_existation=True):
'''Метод для смены владельца файла.'''
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
os.lchown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
raise TemplateExecutorError('Can not chown file: {0}, reason: {1}'.
format(target_path, str(error)))
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
def chmod_file(self, target_path, chmod_value, check_existation=True):
'''Метод для смены прав доступа к директории.'''
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
raise TemplateExecutorError('Can not chmod file: {0}, reason: {1}'.
format(target_path, str(error)))
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)

@ -1,3 +1,3 @@
dir /etc
dir /etc/dir
obj /etc/dir/file.conf f050e31b0c059cc6b1edbd4871db1b91 1590420040
obj /etc/dir/file.conf f050e31b0c059cc6b1edbd4871db1b91 1590505223

@ -22,7 +22,8 @@ class TestUtils():
pipe = Process('grep', 'VGA',
stdin=Process('/usr/sbin/lspci')
)
pipe_result = run('/usr/sbin/lspci | grep "VGA"', shell=True, stdout=PIPE)
pipe_result = run('/usr/sbin/lspci | grep "VGA"', shell=True,
stdout=PIPE)
except Exception as error:
print('error:', str(error))
assert False

Loading…
Cancel
Save