You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/template_action_draft.py

815 lines
33 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from calculate.templates.template_engine import TemplateEngine, Variables,\
FILE, DIR, ParametersProcessor
from calculate.templates.template_processor import TemplateAction
from calculate.utils.package import PackageAtomParser, Package, PackageNotFound
from calculate.utils.files import write_file, read_link, read_file_lines,\
FilesError, join_paths
from calculate.utils.mount import Mounts
from collections import OrderedDict
import hashlib
import stat
import glob
import shutil
import os
template_text = '''{% calculate append = 'join' -%}
{% calculate package = 'dev-lang/python', format = 'samba' -%}
[section one]
parameter_1 = {{ vars_1.value_1 }}
!parameter_2
'''
backup_template_text = '''{% calculate append = 'join', format = 'samba' -%}
[section one]
parameter_1 = value
parameter_2 = value_2
'''
APPENDS_SET = TemplateAction().available_appends
vars_1 = Variables({'value_1': 'value_1'})
DATAVARS_MODULE = Variables({'vars_1': vars_1})
CHROOT_PATH = os.path.join(os.getcwd(), 'tests/templates/testfiles/test_root')
CL_CONFIG_PATH = os.path.join(CHROOT_PATH, 'var/lib/calculate/config')
template_engine = TemplateEngine(datavars_module=DATAVARS_MODULE,
appends_set=APPENDS_SET,
chroot_path=CHROOT_PATH)
target_path = os.path.join(CHROOT_PATH, 'etc/dir/file.conf')
class TemplateExecutorError(Exception):
pass
class TemplateTypeConflict(Exception):
pass
class TemplateCollisionError(Exception):
pass
class CalculateConfigFile:
def __init__(self, cl_config_path='/var/lib/calculate/config',
chroot_path='/'):
self.chroot_path = chroot_path
self.cl_config_path = cl_config_path
self._config_dictionary = self._get_cl_config_dictionary()
self._unsaved_changes = False
def __contains__(self, file_path):
file_path = self._remove_chroot(file_path)
return file_path in self._config_dictionary
def _get_cl_config_dictionary(self):
config_dictionary = OrderedDict()
if os.path.exists(self.cl_config_path):
if os.path.isdir(self.cl_config_path):
raise TemplateExecutorError(
"directory instead calculate config file in: {}".
format(self.cl_config_path))
else:
write_file(self.cl_config_path).close()
return config_dictionary
try:
config_file_lines = read_file_lines(self.cl_config_path)
except FilesError as error:
raise TemplateExecutorError(
"cannot read calculate config file in: {0}. Reason: {1}".
format(self.cl_config_path, str(error)))
# Продумать проверку корректности найденного файла.
for file_line in config_file_lines:
filename, md5_sum = file_line.split(' ')
config_dictionary.update({filename: md5_sum})
self._unsaved_changes = False
return config_dictionary
def set_files_md5(self, file_path, file_md5):
file_path = self._remove_chroot(file_path)
self._config_dictionary[file_path] = file_md5
self._unsaved_changes = True
def remove_file(self, file_path):
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
self._config_dictionary.pop(file_path)
self._unsaved_changes = True
def compare_md5(self, file_path, file_md5):
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
return self._config_dictionary[file_path] == file_md5
else:
return False
def save_changes(self):
config_file = write_file(self.cl_config_path)
for file_name, file_md5 in self._config_dictionary.items():
config_file.write('{} {}\n'.format(file_name, file_md5))
config_file.close()
self._unsaved_changes = False
def _remove_chroot(self, file_path):
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return file_path
def __del__(self):
'''При окончании работы исполнительного модуля не забываем сохранить
изменения внесенные в /var/lib/calculate/config.'''
if self._unsaved_changes:
self.save_changes()
self._config_dictionary = OrderedDict()
class TemplateWrapper:
'''Класс связывающий шаблон с целевым файлом и определяющий параметры
наложения шаблона, обусловленные состоянием целевого файла.'''
type_checks = {DIR: os.path.isdir,
FILE: os.path.isfile}
chroot_path = '/'
config_archive_path = '/var/lib/calculate/config-archive'
package_atom_parser = PackageAtomParser(chroot_path=chroot_path)
_protected_is_set = False
_protected_set = {'/etc'}
_unprotected_set = set()
def __init__(self, target_file_path, parameters, template_type,
template_text=''):
self.target_path = target_file_path
self.target_package_name = None
# Вспомогательный флаг, включается, если по целевому пути лежит файл,
# для которого не определился никакой пакет.
self.target_without_package = False
self.parameters = parameters
self.output_path = self.target_path
self.input_path = self.parameters.source
self.template_type = template_type
self.template_text = template_text
# Флаг, указывающий, что нужно удалить файл из target_path перед
# применением шаблона.
self.remove_original = False
# Временный флаг для определения того, является ли шаблон userspace.
self.is_userspace = False
# Получаем класс соответствующего формата файла.
if self.parameters.format:
self.format_class = ParametersProcessor.\
available_formats[self.parameters.format]
else:
# Здесь будет детектор форматов.
pass
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
if checker(target_path):
self.target_type = file_type
break
self.target_is_link = os.path.islink(target_path)
else:
self.target_type = None
self.check_conflicts()
self.check_package_collision()
# Если целью является файл -- проверяем наличие ._cfg0000_filename
# файлов.
if self.target_type is FILE:
self._cfg_pattern = os.path.join(
os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(self._cfg_pattern)
self.check_user_changes()
def check_conflicts(self):
'''Проверка конфликтов типов.'''
if self.parameters.append == 'link':
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == DIR:
raise TemplateTypeConflict(
"The target is a directory while the"
" template has append = 'link'.")
else:
raise TemplateTypeConflict(
"The target is a file while the"
" template has append = 'link'.")
if self.template_type == DIR:
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == FILE:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.target_is_link and self.target_type == DIR:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
if self.template_type == FILE:
if self.parameters.force:
if self.target_type == DIR:
self.remove_original = True
return
elif self.target_is_link and self.target_type == FILE:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
elif self.target_type == FILE:
return
else:
raise TemplateTypeConflict("The target file is a directory"
" while the template is a file.")
def check_package_collision(self):
'''Проверка на предмет коллизии, то есть конфликта пакета шаблона и
целевого файла.'''
if self.parameters.package:
parameter_package = self.parameters.package
else:
parameter_package = None
if self.target_type is not None:
try:
file_package = self.package_atom_parser.get_file_package(
self.target_path)
except PackageNotFound:
file_package = None
self.target_without_package = True
else:
file_package = None
# Если для шаблона и целевого файла никаким образом не удается
# определить пакет и есть параметр append -- шаблон пропускаем.
if parameter_package is None and file_package is None:
if self.parameters.append:
raise TemplateCollisionError(
"'package' parameter is not defined for"
" template with 'append' parameter.")
if parameter_package is None:
self.target_package_name = file_package
if file_package is None:
self.target_package_name = parameter_package
if file_package != parameter_package:
raise TemplateCollisionError(
"The template package is {0} while target"
" file package is {1}").format(
self.target_package_name.atom,
file_package.atom
)
else:
self.target_package_name = parameter_package
self.target_package = Package(self.target_package_name,
chroot_path=self.chroot_path)
def check_user_changes(self):
if self.target_type is None:
self.md5_matching = True
elif self.target_without_package:
if self.parameters.autoupdate:
self.md5_matching = True
self.remove_original = True
else:
self.md5_matching = False
if (self.target_type is None or
self.target_package is None or
self.target_type != FILE):
return
# Собираем список имеющихся ._cfg файлов.
cfg_pattern = os.path.join(os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(cfg_pattern)
target_md5 = self.target_package.get_md5(self.target_path)
self.md5_matching = self.target_package.is_md5_equal(
self.target_path,
file_md5=target_md5)
self.md5_matching = self.md5_matching or self.parameters.autoupdate
self.archive_path = self._get_archive_path(self.target_path)
if self.md5_matching:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.target_path
self.output_path = self.target_path
else:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.archive_path
self.output_path = self._get_cfg_path(self.target_path)
def _get_archive_path(self, file_path):
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return join_paths(self.config_archive_path, file_path)
def _get_cfg_path(self, file_path):
if self.cfg_list:
last_cfg_name = os.path.basename(self.cfg_list[-1])
slice_value = len('._cfg')
cfg_number = int(last_cfg_name[slice_value: slice_value + 4])
else:
cfg_number = 0
cfg_number = str(cfg_number + 1)
if len(cfg_number) < 4:
cfg_number = '0' * (4 - len(cfg_number)) + cfg_number
new_cfg_name = "._cfg{}_{}".format(cfg_number,
os.path.basename(file_path))
new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name)
return new_cfg_path
def remove_from_contents(self):
pass
def add_to_contents(self, file_md5=None):
if self.parameters.append == 'link':
self.target_package.add_sym(target_path, self.parameters.source)
elif self.template_type == DIR:
self.target_package.add_dir(target_path)
elif self.template_type == FILE:
self.target_package.add_obj(target_path, file_md5)
@classmethod
def _set_protected(cls):
if cls._protected_is_set:
return
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
cls._protected_set.add(protected_path.strip())
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
cls._unprotected_set.add(protected_path.strip())
cls._protected_is_set = True
class TemplateExecutor:
def __init__(self, datavars_module=Variables(), chroot_path='/',
cl_config_archive='/var/lib/calculate/config-archive',
cl_config_path='/var/lib/calculate/config'):
self.datavars_module = datavars_module
self.chroot_path = chroot_path
self.directory_default_parameters =\
ParametersProcessor.directory_default_parameters
self.file_default_parameters =\
ParametersProcessor.file_default_parameters
self.directory_appends = {'join': self._append_join_directory,
'remove': self._append_remove_directory,
'clear': self._append_clear_directory}
self.file_appends = {'join': self._append_join_file}
self.formats_classes = ParametersProcessor.available_formats
self.calculate_config_file = CalculateConfigFile(
cl_config_path=cl_config_path,
cl_chroot_path=chroot_path)
TemplateWrapper.chroot_path = self.chroot_path
TemplateWrapper.cl_config_archive = cl_config_archive
@property
def available_appends(self):
appends_set = set(self.directory_appends.keys()).union(
set(self.file_appends.keys()))
return appends_set
def execute_template(self, target_path, parameters, template_type,
template_text=''):
print('Template parameters:')
parameters.print_parameters_for_debug()
try:
template_object = TemplateWrapper(target_path, parameters,
template_type,
template_text=template_text)
except TemplateTypeConflict as error:
print('type conflict: {}'.format(str(error)))
except TemplateCollisionError as error:
print('collision: {}'.format(str(error)))
if template_object.remove_original:
if template_object.target_type == DIR:
self._remove_directory(template_object.target_path)
else:
self._remove_file(template_object.target_path)
template_object.target_type = None
# Добавить поддержку run, excute и т.д.
if template_object.template_type == DIR:
self.directory_appends[template_object.parameters.append](
template_object)
elif template_object.template_type == FILE:
self.file_appends[template_object.parameters.append](
template_object)
def _append_join_directory(self, template_object: TemplateWrapper):
if template_object.target_type is None:
self._create_directory(template_object)
template_object.add_to_contents()
else:
pass
def _append_remove_directory(self, template_object: TemplateWrapper):
if template_object.target_type is not None:
self._remove_directory(template_object.target_path)
template_object.remove_from_contents()
else:
pass
def _append_clear_directory(self, template_object: TemplateWrapper):
if template_object.target_type is not None:
# Подумать об организации очистки CONTENTS для этого append.
pass
def _append_join_file(self, template_object: TemplateWrapper):
'''Метод описывающий действия при append = "join".'''
input_path = template_object.input_path
output_path = template_object.output_path
template_format = template_object.format_class
if template_object.md5_matching:
output_paths = [output_path]
# Проверка на предмет userspace.
if template_object.is_userspace:
output_paths.append(template_object.archive_path)
if template_object.target_type is not None:
with open(input_path, 'r') as input_file:
input_text = input_file.read()
else:
input_text = ''
parsed_input = template_format(input_text)
parsed_template = template_format(template_object.template_text)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
for save_path in output_paths:
with open(save_path, 'w') as output_file:
output_file.write(output_text)
# Убираем все ._cfg файлы.
if template_object.cfg_list:
for cfg_file_path in template_object.cfg_list:
self._remove_file(cfg_file_path)
# Убираем целевой файл из CL.
self.calculate_config_file.remove_file(template_object.target_path)
# Обновляем CONTENTS.
template_object.add_to_contents(file_md5=output_text_md5)
else:
with open(input_path, 'r') as input_file:
input_text = input_file.read()
parsed_input = template_format(input_text)
parsed_template = template_format(template_object.template_text)
parsed_input.join_template(parsed_template)
# Результат наложения шаблона.
output_text = parsed_input.document_text
# Удаляем форматный объект входного файла.
del(parsed_input)
output_text_md5 = hashlib.md5(output_text.encode()).hexdigest()
if not self.calculate_config_file.compare_md5(target_path,
output_text_md5):
with open(output_path, 'w') as output_file:
output_file.write(output_text)
self.calculate_config_file.set_files_md5(template_object,
output_text_md5)
else:
# Действия если CL совпало. Пока ничего не делаем.
pass
# Обновляем CONTENTS.
template_object.add_to_contents(file_md5=output_text_md5)
def _create_directory(self, template_object: TemplateWrapper):
target_path = template_object.target_path
template_parameters = template_object.parameters
if os.access(target_path, os.F_OK):
if template_parameters.chmod:
self.chmod_directory(target_path)
if self.template_parameters.chown:
self.chown_directory(target_path)
return
directories_to_create = [target_path]
directory_path = os.path.dirname(target_path)
while not os.access(directory_path, os.F_OK) and directory_path:
directories_to_create.append(directory_path)
directory_path = os.path.dirname(directory_path)
try:
current_mod, current_uid, current_gid = self.get_file_info(
directory_path,
'all')
current_owner = {'uid': current_uid, 'gid': current_gid}
except OSError:
raise TemplateExecutorError('No access to the directory: {}'.
format(directory_path))
directories_to_create.reverse()
for create_path in directories_to_create:
try:
os.mkdir(create_path)
if (template_parameters.chmod and
template_parameters.chmod != current_mod):
self.chmod_directory(create_path)
elif 'chmod' in self.directory_default_parameters:
self.chmod_directory(
create_path,
chmod_value=self.directory_default_parameters['chown'])
if (template_parameters.chown and
template_parameters.chown != current_owner):
self.chown_directory
elif 'chown' in self.directory_default_parameters:
self.chown_directory(
create_path,
chown_value=self.directory_default_parameters['chmod'])
except OSError as error:
raise TemplateExecutorError(
'Failed to create directory: {}, reason: {}'.
format(create_path, str(error)))
def _remove_directory(self, target_path):
if os.path.exists(target_path):
if os.path.isdir(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
except Exception as error:
raise TemplateExecutorError(
("Failed to delete the directory: {0},"
" reason: {1}").format(target_path,
str(error)))
else:
error_message = "target file is not directory"
else:
error_message = "target file does not exist"
raise TemplateExecutorError(("Failed to delete the directory: {0},"
"reason: {1}").format(target_path,
error_message))
def _clear_directory(self, target_path):
if os.path.exists(target_path):
if os.path.isdir(target_path):
for node in os.scandir(target_path):
if node.is_dir():
self._remove_directory(node.path)
else:
self._remove_file(node.path)
else:
error_message = "target file is not directory"
else:
error_message = "target directory does not exist"
raise TemplateExecutorError(("failed to delete directory: {},"
" reason: {}").format(target_path,
error_message))
def _link_directory(self, target_path, source):
try:
os.symlink(source, target_path, target_is_directory=True)
print('linked: {0} -> {1}'.format(os.path.basename(target_path),
os.path.basename(source)))
except OSError:
raise TemplateExecutorError("Failed to create symlink: {0} -> {1}".
format(target_path, self.source))
def _execute_template(self, template_object):
pass
def _remove_file(self, target_path):
if os.path.islink(target_path):
try:
os.unlink(target_path)
except OSError:
raise TemplateExecutorError('failed to delete the link: {}'.
format(target_path))
if os.path.isfile(target_path):
try:
os.remove(target_path)
except OSError:
self.output.set_error('failed to delete the file: {}'.
format(target_path))
def _clear_file(self, target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
raise TemplateExecutorError("failed to clear the file: {}".
format(target_path))
def _link_file(self, target_path):
pass
def chown_directory(self, target_path, chown_value={}):
"""Сменить владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
try:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
def chmod_directory(self, target_path, chmod_value=False):
"""Сменить права доступа к директории."""
if not chmod_value:
chmod_value = self.template_parameters.chmod
print('chmod value = {}'.format(chmod_value))
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
self.output.set_error('reason: {}'.format(str(error)))
return False
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def set_uid_gid_error(self, path, uid, gid, template_path=''):
import pwd
import grp
try:
user_name = pwd.getpwuid(uid).pw_name
except (TypeError, KeyError):
user_name = str(uid)
try:
group_name = grp.getgrgid(gid).gr_name
except (TypeError, KeyError):
group_name = str(gid)
owner = '{0}:{1}'.format(user_name, group_name)
if template_path:
self.output.set_error('Failed to process template file {}'.
template_path)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! описать ошибку !!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.output.set_error('error with owner: {}'.format(owner))
def check_filesystem(self, target_path):
pass
def is_vfat(self, path):
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
# Применение основного шаблона:
template_engine.process_template_from_string(template_text, FILE)
template_parameters = template_engine.parameters
template_text = template_engine.template_text
template_action_obj = TemplateExecutor(datavars_module=DATAVARS_MODULE,
chroot_path=CHROOT_PATH)
result = template_action_obj.use_file_template(target_path,
template_parameters,
template_text=template_text)
print('MAIN TEST TEMPLATE IS USED')
# Применение шаблона бэкапа:
template_engine.process_template_from_string(backup_template_text, FILE)
result = template_action_obj.use_file_template(
target_path,
template_engine.parameters,
template_text=template_engine.template_text)
print('BACKUP TEMPLATE IS USED')