You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/template_action_draft.py

1008 lines
46 KiB

from calculate.templates.template_engine import TemplateEngine, Variables,\
FILE, DIR, ParametersProcessor
from calculate.templates.template_processor import TemplateAction
from calculate.utils.package import PackageAtomParser, Package, PackageNotFound
from calculate.utils.files import write_file, read_link, read_file_lines,\
FilesError, join_paths
from calculate.utils.mount import Mounts
from collections import OrderedDict
import hashlib
import stat
import glob
import shutil
import os
template_text = '''{% calculate append = 'link', source = '/etc/dir/dir_2' -%}
{% calculate package = 'test-category/test-package', format = 'samba' -%}
[section one]
parameter_1 = {{ vars_1.value_1 }}
!parameter_2
'''
backup_template_text = '''{% calculate append = 'join', format = 'samba',
autoupdate, package = 'test-category/test-package' -%}
[section one]
parameter_1 = value
parameter_2 = value_2
[section two]
other_parameter = other_value
'''
APPENDS_SET = TemplateAction().available_appends
vars_1 = Variables({'value_1': 'value_1'})
DATAVARS_MODULE = Variables({'vars_1': vars_1})
CHROOT_PATH = os.path.join(os.getcwd(), 'tests/templates/testfiles/test_root')
CL_CONFIG_PATH = os.path.join(CHROOT_PATH, 'var/lib/calculate/config')
CL_CONFIG_ARCHIVE_PATH = os.path.join(CHROOT_PATH,
'var/lib/calculate/config-archive')
template_engine = TemplateEngine(datavars_module=DATAVARS_MODULE,
appends_set=APPENDS_SET,
chroot_path=CHROOT_PATH)
target_path = os.path.join(CHROOT_PATH, 'etc/dir/file.conf')
class TemplateExecutorError(Exception):
pass
class TemplateTypeConflict(Exception):
pass
class TemplateCollisionError(Exception):
pass
class CalculateConfigFile:
'''Класс для работы с файлом /var/lib/calculate/config.'''
def __init__(self, cl_config_path='/var/lib/calculate/config',
cl_chroot_path='/'):
self.chroot_path = cl_chroot_path
self.cl_config_path = cl_config_path
self._config_dictionary = self._get_cl_config_dictionary()
self._unsaved_changes = False
def __contains__(self, file_path):
file_path = self._remove_chroot(file_path)
return file_path in self._config_dictionary
def _get_cl_config_dictionary(self):
'''Метод для загрузки словаря файла /var/lib/calculate/config.'''
config_dictionary = OrderedDict()
if os.path.exists(self.cl_config_path):
if os.path.isdir(self.cl_config_path):
raise TemplateExecutorError(
"directory instead calculate config file in: {}".
format(self.cl_config_path))
else:
write_file(self.cl_config_path).close()
return config_dictionary
try:
config_file_lines = read_file_lines(self.cl_config_path)
except FilesError as error:
raise TemplateExecutorError(
"cannot read calculate config file in: {0}. Reason: {1}".
format(self.cl_config_path, str(error)))
# Продумать проверку корректности найденного файла.
for file_line in config_file_lines:
filename, md5_sum = file_line.split(' ')
config_dictionary.update({filename: md5_sum})
self._unsaved_changes = False
return config_dictionary
def set_files_md5(self, file_path, file_md5):
'''Метод для установки в config соответствия файла некоторой
контрольной сумме.'''
file_path = self._remove_chroot(file_path)
self._config_dictionary[file_path] = file_md5
self._unsaved_changes = True
def remove_file(self, file_path):
'''Метод для удаления файла из config.'''
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
self._config_dictionary.pop(file_path)
self._unsaved_changes = True
def compare_md5(self, file_path, file_md5):
'''Метод для сравнения хэш-суммы из config и некоторой заданной.'''
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
return self._config_dictionary[file_path] == file_md5
else:
return False
def save_changes(self):
'''Метод для записи изменений, внессенных в файл config.'''
if not self._unsaved_changes:
return
config_file = write_file(self.cl_config_path)
for file_name, file_md5 in self._config_dictionary.items():
config_file.write('{} {}\n'.format(file_name, file_md5))
config_file.close()
self._unsaved_changes = False
def _remove_chroot(self, file_path):
'''Метод для удаления корневого пути из указанного пути.'''
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return file_path
class TemplateWrapper:
'''Класс связывающий шаблон с целевым файлом и определяющий параметры
наложения шаблона, обусловленные состоянием целевого файла.'''
type_checks = {DIR: os.path.isdir,
FILE: os.path.isfile}
chroot_path = '/'
config_archive_path = '/var/lib/calculate/config-archive'
_protected_is_set = False
_protected_set = set()
_unprotected_set = set()
def __new__(cls, *args, **kwargs):
if not cls._protected_is_set:
# Устанавливаем значения PROTECTED, если не заданы.
cls._set_protected()
return super().__new__(cls)
def __init__(self, target_file_path, parameters, template_type,
template_text=''):
self.target_path = target_file_path
self.target_package_name = None
self.package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
# Вспомогательный флаг, включается, если по целевому пути лежит файл,
# для которого не определился никакой пакет.
self.target_without_package = False
self.parameters = parameters
self.output_path = self.target_path
self.input_path = self.parameters.source
self.template_type = template_type
self.template_text = template_text
# Флаг, указывающий, что нужно удалить файл из target_path перед
# применением шаблона.
self.remove_original = False
# Флаг, указывающий, что целевой путь был изменен.
self.target_path_is_changed = False
# Флаг, указывающий, что файл является PROTECTED.
self.protected = False
# Временный флаг для определения того, является ли шаблон userspace.
self.is_userspace = False
# Получаем класс соответствующего формата файла.
if self.parameters.format:
self.format_class = ParametersProcessor.\
available_formats[self.parameters.format]
else:
# Здесь будет детектор форматов.
pass
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
if checker(target_path):
self.target_type = file_type
break
self.target_is_link = os.path.islink(target_path)
# Если установлен параметр mirror и есть параметр source,
# содержащий несуществующий путь -- удаляем целевой файл.
if self.parameters.source is True and self.parameters.mirror:
self.remove_original = True
else:
if self.parameters.mirror:
raise TemplateExecutorError("target file does not exist, while"
" 'mirror' parameter is set")
self.target_type = None
self.check_conflicts()
self.check_package_collision()
# Если целью является файл -- проверяем наличие ._cfg0000_filename
# файлов.
if self.target_type is FILE:
self._cfg_pattern = os.path.join(
os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(self._cfg_pattern)
self.check_user_changes()
def check_conflicts(self):
'''Проверка конфликтов типов.'''
if self.parameters.append == 'link':
if self.parameters.force:
self.remove_original = True
elif self.target_type == DIR:
raise TemplateTypeConflict(
"The target is a directory while the"
" template has append = 'link'.")
else:
raise TemplateTypeConflict(
"The target is a file while the"
" template has append = 'link'.")
elif self.template_type == DIR:
if self.target_type == FILE:
if self.parameters.force:
self.remove_original = True
else:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.target_is_link and self.target_type == DIR:
if self.parameters.force:
self.remove_original = True
else:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
elif self.template_type == FILE:
if self.parameters.force:
if self.target_type == DIR:
self.remove_original = True
elif self.target_is_link and self.target_type == FILE:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
elif self.target_type == DIR:
raise TemplateTypeConflict("The target file is a directory"
" while the template is a file.")
def check_package_collision(self):
'''Проверка на предмет коллизии, то есть конфликта пакета шаблона и
целевого файла.'''
if self.parameters.package:
parameter_package = self.parameters.package
else:
parameter_package = None
if self.target_type is not None:
try:
file_package = self.package_atom_parser.get_file_package(
self.target_path)
except PackageNotFound:
file_package = None
self.target_without_package = True
else:
file_package = None
# Если для шаблона и целевого файла никаким образом не удается
# определить пакет и есть параметр append -- шаблон пропускаем.
if parameter_package is None and file_package is None:
if self.parameters.append:
raise TemplateCollisionError(
"'package' parameter is not defined for"
" template with 'append' parameter.")
elif parameter_package is None:
self.target_package_name = file_package
elif file_package is None:
self.target_package_name = parameter_package
elif file_package != parameter_package:
raise TemplateCollisionError(
"The template package is {0} while target"
" file package is {1}").format(
self.target_package_name.atom,
file_package.atom
)
else:
self.target_package_name = parameter_package
self.target_package = Package(self.target_package_name,
chroot_path=self.chroot_path)
def check_user_changes(self):
'''Метод для проверки наличия пользовательских изменений в
конфигурационных файлах.'''
# Эта проверка только для файлов.
if self.template_type != FILE:
return
# Проверим, является ли файл защищенным.
# Сначала проверяем по переменной CONFIG_PROTECT.
for protected_path in self._protected_set:
if self.target_path.startswith(protected_path):
self.protected = True
break
# Затем по переменной CONFIG_PROTECT_MASK.
for unprotected_path in self._unprotected_set:
if self.target_path.startswith(unprotected_path):
self.protected = False
break
# Собираем список имеющихся ._cfg файлов.
cfg_pattern = os.path.join(os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(cfg_pattern)
# Путь к архивной версии файла.
self.archive_path = self._get_archive_path(self.target_path)
if not self.protected:
self.md5_matching = True
elif self.parameters.unbound:
# Если присутствует unbound, то просто модифицируем файл и
# удаляем его из CONTENTS.
self.md5_matching = True
elif self.target_type is None:
# Если целевой файл отсутствует.
if self.target_path in self.target_package:
# Проверка -- был ли файл удален.
self.md5_matching = False
else:
self.md5_matching = True
elif self.target_without_package:
# Если файл по целевому пути не относится к какому-либо пакету.
if self.parameters.unbound:
self.md5_matching = True
else:
self.md5_matching = False
else:
# Если файл есть и он относится к текущему пакету.
target_md5 = self.target_package.get_md5(self.target_path)
self.md5_matching = self.target_package.is_md5_equal(
self.target_path,
file_md5=target_md5)
# Если по целевому пути файл не относящийся к какому-либо пакету и
# присутствует параметр autoupdate -- удаляем этот файл.
if self.target_without_package and self.parameters.autoupdate:
self.remove_original = True
self.md5_matching = self.md5_matching or self.parameters.autoupdate
# Определяем путей входных и выходных файлов.
if self.md5_matching:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.target_path
self.output_path = self.target_path
else:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.archive_path
self.output_path = self._get_cfg_path(self.target_path)
def _get_archive_path(self, file_path):
'''Метод для получения пути к архивной версии указанного файла.'''
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return join_paths(self.config_archive_path, file_path)
def _get_cfg_path(self, file_path):
'''Метод для получения пути для создания нового ._cfg????_ файла.'''
if self.cfg_list:
last_cfg_name = os.path.basename(self.cfg_list[-1])
slice_value = len('._cfg')
cfg_number = int(last_cfg_name[slice_value: slice_value + 4])
else:
cfg_number = 0
cfg_number = str(cfg_number + 1)
if len(cfg_number) < 4:
cfg_number = '0' * (4 - len(cfg_number)) + cfg_number
new_cfg_name = "._cfg{}_{}".format(cfg_number,
os.path.basename(file_path))
new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name)
return new_cfg_path
def remove_from_contents(self):
'''Метод для удаления целевого файла из CONTENTS.'''
print('let s remove')
if self.template_type == DIR:
self.target_package.remove_dir(self.target_path)
elif self.template_type == FILE:
print('remove as file')
self.target_package.remove_obj(self.target_path)
def clear_dir_contents(self):
'''Метод для удаления из CONTENTS всего содержимого директории после
применения append = "clear".'''
if self.template_type == DIR:
self.target_package.clear_dir(self.target_path)
def add_to_contents(self, file_md5=None):
'''Метод для добавления целевого файла в CONTENTS.'''
if self.parameters.append == 'link':
self.target_package.add_sym(target_path, self.parameters.source)
elif self.template_type == DIR:
self.target_package.add_dir(target_path)
elif self.template_type == FILE:
self.target_package.add_obj(target_path, file_md5)
@classmethod
def _set_protected(cls):
'''Метод для получения множества защищенных директорий.'''
if cls._protected_is_set:
return
cls._protected_set = set()
cls._protected_set.add(join_paths(cls.chroot_path, '/etc'))
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
protected_path = join_paths(cls.chroot_path,
protected_path.strip())
cls._protected_set.add(protected_path)
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
unprotected_path = join_paths(cls.chroot_path,
unprotected_path.strip())
cls._unprotected_set.add(unprotected_path)
cls._protected_is_set = True
def save_changes(self):
'''Метод для сохранения изменений внесенных в CONTENTS.'''
if self.target_package:
print('saving CONTENTS: {}'.format(
self.target_package.contents_dictionary.keys()))
self.target_package.remove_empty_directories()
self.target_package.write_contents_file()
class TemplateExecutor:
def __init__(self, datavars_module=Variables(), chroot_path='/',
cl_config_archive='/var/lib/calculate/config-archive',
cl_config_path='/var/lib/calculate/config'):
self.datavars_module = datavars_module
self.chroot_path = chroot_path
self.directory_default_parameters =\
ParametersProcessor.directory_default_parameters
self.file_default_parameters =\
ParametersProcessor.file_default_parameters
self.directory_appends = {'join': self._append_join_directory,
'remove': self._append_remove_directory,
'skip': self._append_skip_directory,
'clear': self._append_clear_directory,
'link': self._append_link_directory,
'replace': self._append_replace_directory}
self.file_appends = {'join': self._append_join_file}
self.formats_classes = ParametersProcessor.available_formats
self.calculate_config_file = CalculateConfigFile(
cl_config_path=cl_config_path,
cl_chroot_path=chroot_path)
TemplateWrapper.chroot_path = self.chroot_path
TemplateWrapper.config_archive_path = cl_config_archive
@property
def available_appends(self):
'''Метод для получения множества возможных значений append.'''
appends_set = set(self.directory_appends.keys()).union(
set(self.file_appends.keys()))
return appends_set
def execute_template(self, target_path, parameters, template_type,
template_text=''):
'''Метод для запуска выполнения шаблонов.'''
try:
template_object = TemplateWrapper(target_path, parameters,
template_type,
template_text=template_text)
except TemplateTypeConflict as error:
print('Error: {}'.format(str(error)))
return
except TemplateCollisionError as error:
print('Error: {}'.format(str(error)))
return
# Удаляем оригинал, если это необходимо из-за наличия force или по
# другим причинам.
if template_object.remove_original:
print('remove original')
if template_object.target_type == DIR:
self._remove_directory(template_object.target_path)
else:
self._remove_file(template_object.target_path)
template_object.remove_from_contents()
# Если был включен mirror, то после удаления файла завершаем
# выполнение шаблона.
if template_object.parameters.mirror:
template_object.save_changes()
return
template_object.target_type = None
print('input path: {}'.format(template_object.input_path))
print('output path: {}'.format(template_object.output_path))
# (!) Добавить поддержку run, execute и т.д.
if template_object.template_type == DIR:
self.directory_appends[template_object.parameters.append](
template_object)
elif template_object.template_type == FILE:
self.file_appends[template_object.parameters.append](
template_object)
# Сохраняем изменения в CONTENTS внесенные согласно шаблону.
print('it is time to save changes')
template_object.save_changes()
# Возвращаем целевой путь, если он был изменен, или None если не был.
if template_object.target_path_is_changed:
return template_object.target_path
else:
return
def save_changes(self):
'''Метод для сохранения чего-нибудь после выполнения всех шаблонов.'''
# Пока сохраняем только получившееся содержимое config-файла.
self.calculate_config_file.save_changes()
def _append_join_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "join", если шаблон --
директория. Создает директорию, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
template_object.add_to_contents()
def _append_remove_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "remove", если шаблон --
директория. Удаляет директорию со всем содержимым, если она есть.'''
if template_object.target_type is not None:
self._remove_directory(template_object.target_path)
template_object.remove_from_contents()
def _append_skip_directory(self, template_object: TemplateWrapper):
pass
def _append_clear_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "clear", если шаблон --
директория. Удаляет все содержимое директории, если она есть.'''
if template_object.target_type is not None:
self._clear_directory(template_object.target_path)
template_object.clear_dir_contents()
def _append_link_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "link", если шаблон --
директория. Создает ссылку на директорию, если она есть.'''
target = template_object.parameters.source
link_path = template_object.target_path
self._link_directory(link_path, target)
template_object.add_to_contents()
def _append_replace_directory(self, template_object: TemplateWrapper):
'''Метод описывающий действия для append = "replace", если шаблон --
директория. Очищает директорию или создает, если ее нет.'''
if template_object.target_type is None:
self._create_directory(template_object)
template_object.add_to_contents()
else:
self._clear_directory(template_object.target_path)
template_object.clear_dir_contents()
def _append_join_file(self, template_object: TemplateWrapper):
'''Метод описывающий действия при append = "join", если шаблон --
файл. Объединяет шаблон с целевым файлом.'''
input_path = template_object.input_path
output_path = template_object.output_path
template_format = template_object.format_class
if template_object.md5_matching:
# Действия при совпадении md5 из CONTENTS и md5 целевого файла.
output_paths = [output_path]
# Если целевой файл защищен, а шаблон не userspace.
if template_object.protected and not template_object.is_userspace:
output_paths.append(template_object.archive_path)
if template_object.target_type is not None:
with open(input_path, 'r') as input_file:
input_text = input_file.read()
else:
input_text = ''
parsed_input = template_format(input_text)
parsed_template = template_format(template_object.template_text)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
for save_path in output_paths:
with open(save_path, 'w') as output_file:
output_file.write(output_text)
if template_object.parameters.chown:
self.chown_file(save_path,
template_object.parameters.chown,
check_existation=False)
if template_object.parameters.chmod:
self.chmod_file(save_path,
template_object.parameters.chmod,
check_existation=False)
# Убираем все ._cfg файлы.
if template_object.cfg_list:
for cfg_file_path in template_object.cfg_list:
self._remove_file(cfg_file_path)
# Убираем целевой файл из CL.
self.calculate_config_file.remove_file(template_object.target_path)
print('is contents update is needed')
# Обновляем CONTENTS.
if template_object.protected:
print('needed')
if template_object.parameters.unbound:
print('remove')
template_object.remove_from_contents()
else:
print('add')
template_object.add_to_contents(file_md5=output_text_md5)
else:
with open(input_path, 'r') as input_file:
input_text = input_file.read()
parsed_input = template_format(input_text)
parsed_template = template_format(template_object.template_text)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
if not self.calculate_config_file.compare_md5(target_path,
output_text_md5):
with open(output_path, 'w') as output_file:
output_file.write(output_text)
if template_object.parameters.chown:
self.chown_file(output_path,
template_object.parameters.chown,
check_existation=False)
if template_object.parameters.chmod:
self.chmod_file(output_file,
template_object.parameters.chmod,
check_existation=False)
self.calculate_config_file.set_files_md5(
template_object.target_path,
output_text_md5)
else:
# Действия если CL совпало. Пока ничего не делаем.
pass
# Обновляем CONTENTS.
template_object.add_to_contents(file_md5=output_text_md5)
def _create_directory(self, template_object: TemplateWrapper):
'''Метод для создания директории и, при необходимости, изменения
владельца и доступа все директорий на пути к целевой.'''
target_path = template_object.target_path
template_parameters = template_object.parameters
if os.access(target_path, os.F_OK):
if template_parameters.chmod:
self.chmod_directory(target_path)
if self.template_parameters.chown:
self.chown_directory(target_path)
return
directories_to_create = [target_path]
directory_path = os.path.dirname(target_path)
while not os.access(directory_path, os.F_OK) and directory_path:
directories_to_create.append(directory_path)
directory_path = os.path.dirname(directory_path)
try:
current_mod, current_uid, current_gid = self.get_file_info(
directory_path,
'all')
current_owner = {'uid': current_uid, 'gid': current_gid}
except OSError:
raise TemplateExecutorError('No access to the directory: {}'.
format(directory_path))
directories_to_create.reverse()
for create_path in directories_to_create:
try:
os.mkdir(create_path)
if (template_parameters.chmod and
template_parameters.chmod != current_mod):
self.chmod_directory(create_path)
elif 'chmod' in self.directory_default_parameters:
self.chmod_directory(
create_path,
chmod_value=self.directory_default_parameters['chown'])
if (template_parameters.chown and
template_parameters.chown != current_owner):
self.chown_directory(create_path)
elif 'chown' in self.directory_default_parameters:
self.chown_directory(
create_path,
chown_value=self.directory_default_parameters['chmod'])
except OSError as error:
raise TemplateExecutorError(
'Failed to create directory: {}, reason: {}'.
format(create_path, str(error)))
def _remove_directory(self, target_path):
'''Метод для удаления директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
except Exception as error:
raise TemplateExecutorError(
("Failed to delete the directory: {0},"
" reason: {1}").format(target_path,
str(error)))
else:
error_message = "target file is not directory"
else:
error_message = "target file does not exist"
raise TemplateExecutorError(("Failed to delete the directory: {0},"
"reason: {1}").format(target_path,
error_message))
def _clear_directory(self, target_path):
'''Метод для очистки содержимого целевой директории.'''
if os.path.exists(target_path):
if os.path.isdir(target_path):
for node in os.scandir(target_path):
if node.is_dir():
self._remove_directory(node.path)
else:
self._remove_file(node.path)
else:
error_message = "target file is not directory"
else:
error_message = "target directory does not exist"
raise TemplateExecutorError(("failed to delete directory: {},"
" reason: {}").format(target_path,
error_message))
def _link_directory(self, target_path, source):
'''Метод для создания по целевому пути ссылки на директорию
расположенную на пути, указанному в source.'''
try:
os.symlink(source, target_path, target_is_directory=True)
except OSError:
raise TemplateExecutorError("Failed to create symlink: {0} -> {1}".
format(target_path, self.source))
def _remove_file(self, target_path):
'''Метод для удаления файлов.'''
if os.path.islink(target_path):
try:
os.unlink(target_path)
except OSError:
raise TemplateExecutorError('failed to delete the link: {}'.
format(target_path))
if os.path.isfile(target_path):
try:
os.remove(target_path)
except OSError:
raise TemplateExecutorError('failed to delete the file: {}'.
format(target_path))
def _clear_file(self, target_path):
'''Метод для очистки файлов.'''
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
raise TemplateExecutorError("failed to clear the file: {}".
format(target_path))
def _link_file(self, target_path, source):
'''Метод для создания по целевому пути ссылки на файл расположенный на
пути, указанному в source.'''
try:
os.symlink(source, target_path)
except OSError:
raise TemplateExecutorError(
"Failed to create symlink to the file: {0} -> {1}".
format(target_path, self.source))
def chown_directory(self, target_path, chown_value={}):
"""Метод для смены владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
try:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
raise TemplateExecutorError(
'Can not chown directory: {0}, reason: {1}'.
format(target_path, str(error)))
def chmod_directory(self, target_path, chmod_value=False):
'''Метод для смены прав доступа к директории.'''
if not chmod_value:
chmod_value = self.template_parameters.chmod
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {0}, reason: {1}'.
format(target_path, str(error)))
def chown_file(self, target_path, chown_value, check_existation=True):
'''Метод для смены владельца файла.'''
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
raise TemplateExecutorError('Can not chown file: {0}, reason: {1}'.
format(target_path, str(error)))
def chmod_file(self, target_path, chmod_value, check_existation=True):
'''Метод для смены прав доступа к директории.'''
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
raise TemplateExecutorError('Can not chmod file: {0}, reason: {1}'.
format(target_path, str(error)))
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def set_uid_gid_error(self, path, uid, gid, template_path=''):
import pwd
import grp
try:
user_name = pwd.getpwuid(uid).pw_name
except (TypeError, KeyError):
user_name = str(uid)
try:
group_name = grp.getgrgid(gid).gr_name
except (TypeError, KeyError):
group_name = str(gid)
owner = '{0}:{1}'.format(user_name, group_name)
if template_path:
self.output.set_error('Failed to process template file {}'.
template_path)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! описать ошибку !!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.output.set_error('error with owner: {}'.format(owner))
def check_filesystem(self, target_path):
'''Метод, который предположительно будет использоваться для проверки
файловой системы перед применением шаблона.'''
pass
def is_vfat(self, path):
'''Метод, проверяющий является ли файловая система vfat. Нужно для того,
чтобы заранее знать о возможности применения chown, chmod и т.д.'''
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
# Применение основного шаблона:
template_engine.process_template_from_string(template_text, FILE)
template_parameters = template_engine.parameters
template_text = template_engine.template_text
template_executor_obj = TemplateExecutor(
datavars_module=DATAVARS_MODULE,
chroot_path=CHROOT_PATH,
cl_config_archive=CL_CONFIG_ARCHIVE_PATH,
cl_config_path=CL_CONFIG_PATH)
template_executor_obj.execute_template(target_path,
template_parameters,
FILE, template_text=template_text)
template_executor_obj.save_changes()
input()
template_engine.process_template_from_string(backup_template_text, FILE)
template_parameters = template_engine.parameters
template_text = template_engine.template_text
template_executor_obj = TemplateExecutor(
datavars_module=DATAVARS_MODULE,
chroot_path=CHROOT_PATH,
cl_config_archive=CL_CONFIG_ARCHIVE_PATH,
cl_config_path=CL_CONFIG_PATH)
template_executor_obj.execute_template(target_path,
template_parameters,
FILE, template_text=template_text)
template_executor_obj.save_changes()