You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/template_action_draft.py

800 lines
31 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from calculate.templates.template_engine import TemplateEngine, Variables,\
FILE, DIR, ParametersProcessor
from calculate.templates.template_processor import TemplateAction
from calculate.utils.package import PackageAtomParser, Package, PackageNotFound
from calculate.utils.files import write_file, read_link, read_file_lines,\
FilesError, join_paths
from calculate.utils.mount import Mounts
from collections import OrderedDict
import hashlib
import stat
import glob
import shutil
import os
template_text = '''{% calculate append = 'join' -%}
{% calculate package = 'dev-lang/python', format = 'samba' -%}
[section one]
parameter_1 = {{ vars_1.value_1 }}
!parameter_2
'''
backup_template_text = '''{% calculate append = 'join', format = 'samba' -%}
[section one]
parameter_1 = value
parameter_2 = value_2
'''
APPENDS_SET = TemplateAction().available_appends
vars_1 = Variables({'value_1': 'value_1'})
DATAVARS_MODULE = Variables({'vars_1': vars_1})
CHROOT_PATH = os.path.join(os.getcwd(), 'tests/templates/testfiles/test_root')
CL_CONFIG_PATH = os.path.join(CHROOT_PATH, 'var/lib/calculate/config')
template_engine = TemplateEngine(datavars_module=DATAVARS_MODULE,
appends_set=APPENDS_SET,
chroot_path=CHROOT_PATH)
target_path = os.path.join(CHROOT_PATH, 'etc/dir/file.conf')
class TemplateExecutorError(Exception):
pass
class TemplateTypeConflict(Exception):
pass
class TemplateCollisionError(Exception):
pass
class CalculateConfigFile:
def __init__(self, cl_config_path='/var/lib/calculate/config',
chroot_path='/'):
self.chroot_path = chroot_path
self.cl_config_path = cl_config_path
self._config_dictionary = self._get_cl_config_dictionary()
self._unsaved_changes = False
def __contains__(self, file_path):
file_path = self._remove_chroot(file_path)
return file_path in self._config_dictionary
def _get_cl_config_dictionary(self):
config_dictionary = OrderedDict()
if os.path.exists(self.cl_config_path):
if os.path.isdir(self.cl_config_path):
raise TemplateExecutorError(
"directory instead calculate config file in: {}".
format(self.cl_config_path))
else:
write_file(self.cl_config_path).close()
return config_dictionary
try:
config_file_lines = read_file_lines(self.cl_config_path)
except FilesError as error:
raise TemplateExecutorError(
"cannot read calculate config file in: {0}. Reason: {1}".
format(self.cl_config_path, str(error)))
# Продумать проверку корректности найденного файла.
for file_line in config_file_lines:
filename, md5_sum = file_line.split(' ')
config_dictionary.update({filename: md5_sum})
self._unsaved_changes = False
return config_dictionary
def set_files_md5(self, file_path, file_md5):
file_path = self._remove_chroot(file_path)
self._config_dictionary[file_path] = file_md5
self._unsaved_changes = True
def remove_file(self, file_path):
file_path = self._remove_chroot(file_path)
if file_path in self._config_dictionary:
self._config_dictionary.pop(file_path)
self._unsaved_changes = True
def compare_md5(self, file_path, file_md5):
file_path = self._remove_chroot(file_path)
return self._config_dictionary[file_path] == file_md5
def save_changes(self):
config_file = write_file(self.cl_config_path)
for file_name, file_md5 in self._config_dictionary.items():
config_file.write('{} {}\n'.format(file_name, file_md5))
config_file.close()
self._unsaved_changes = False
def _remove_chroot(self, file_path):
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return file_path
def __del__(self):
'''При окончании работы исполнительного модуля не забываем сохранить
изменения внесенные в /var/lib/calculate/config.'''
if self._unsaved_changes:
self.save_changes()
self._config_dictionary = OrderedDict()
class TemplateWrapper:
'''Класс связывающий шаблон с целевым файлом и определяющий параметры
наложения шаблона, обусловленные состоянием целевого файла.'''
type_checks = {DIR: os.path.isdir,
FILE: os.path.isfile}
chroot_path = '/'
config_archive_path = '/var/lib/calculate/config-archive'
package_atom_parser = PackageAtomParser(chroot_path=chroot_path)
calculate_config_file = CalculateConfigFile()
_protected_is_set = False
_protected_set = {'/etc'}
_unprotected_set = set()
def __init__(self, target_file_path, parameters, template_type,
template_text=''):
self.target_path = target_file_path
self.target_package_name = None
self.parameters = parameters
self.output_path = self.target_path
self.input_path = self.parameters.source
self.template_type = template_type
self.template_text = template_text
# Флаг, указывающий, что нужно удалить файл из target_path перед
# применением шаблона.
self.remove_original = False
# Флаг, разрешающий изменение хэш-суммы в contents после отработки
# шаблона.
self.update_contents = False
self.update_config = False
self.update_archive = False
# Флаг, указывающий, что нужно удалить все имеющиеся ._cfg????_filename
# файлы.
self.clean_cfg = False
self.clean_config = False
self.check_config = False
# Получаем класс соответствующего формата файла.
if self.parameters.format:
self.format_class = ParametersProcessor.\
available_formats[self.parameters.format]
else:
# Здесь будет детектор форматов.
pass
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
if checker(target_path):
self.target_type = file_type
break
self.target_is_link = os.path.islink(target_path)
else:
self.target_type = None
self.check_conflicts()
self.check_package_collision()
# Если целью является файл -- проверяем наличие ._cfg0000_filename
# файлов.
if self.target_type is FILE:
self._cfg_pattern = os.path.join(
os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self.cfg_list = glob.glob(self._cfg_pattern)
self.check_user_changes()
def check_conflicts(self):
'''Проверка конфликтов типов.'''
if self.parameters.append == 'link':
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == DIR:
raise TemplateTypeConflict(
"The target is a directory while the"
" template has append = 'link'.")
else:
raise TemplateTypeConflict(
"The target is a file while the"
" template has append = 'link'.")
if self.template_type == DIR:
if self.parameters.force:
self.remove_original = True
return
elif self.target_type == FILE:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.target_is_link and self.target_type == DIR:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
if self.template_type == FILE:
if self.parameters.force:
if self.target_type == DIR:
self.remove_original = True
return
elif self.target_is_link and self.target_type == FILE:
try:
self.target_path = read_link(self.target_path)
except FilesError as error:
raise TemplateExecutorError("files error: {}".
format(str(error)))
return
else:
return
elif self.target_type == FILE:
return
else:
raise TemplateTypeConflict("The target file is a directory"
" while the template is a file.")
def check_package_collision(self):
'''Проверка на предмет коллизии, то есть конфликта пакета шаблона и
целевого файла.'''
if self.parameters.package:
self.target_package_name = self.parameters.package
if self.target_type is None:
return
try:
file_package = self.package_atom_parser.get_file_package(
self.target_path)
except PackageNotFound:
return
if self.target_package_name is None:
self.target_package_name = file_package
elif file_package != self.target_package_name:
raise TemplateCollisionError(
"The template package is {0} while target"
" file package is {1}").format(
self.target_package_name.atom,
file_package.atom
)
self.target_package = Package(self.target_package_name,
chroot_path=self.chroot_path)
def check_user_changes(self):
if (self.template_type is None or
self.target_package is None or
self.target_type != FILE):
return
target_md5 = self.target_package.get_md5(self.target_path)
md5_comparision_result = self.target_package.is_md5_equal(
self.target_path,
file_md5=target_md5)
if (self.parameters.autoupdate or md5_comparision_result):
if not self.cfg_list:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.target_path
self.output_path = self.target_path
# Чем обновляем CONTENTS
self.update_contents = self.target_path
# Обновляем CA
# Должна быть какая-то проверка на предмет userspace.
self.update_archive = self._get_archive_path(self.target_path)
# Очищаем CL
self.clean_config = True
else:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self.target_path
self.output_path = self.target_path
# Чем обновляем CONTENTS
self.update_contents = self.target_path
# Обновляем CA
# Должна быть какая-то проверка на предмет userspace.
self.update_archive = self._get_archive_path(self.target_path)
# Очищаем CL
self.clean_config = True
# Убираем имеющиеся ._cfg????_filename
self.clean_cfg = True
else:
if not self.cfg_list:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self._get_archive_path(self.target_path)
self.output_path = self._get_cfg_path(self.target_path)
# Чем обновляем CONTENTS
self.update_contents = self.output_path
# Oбновляем хэш-сумму в CL
self.update_config = self.output_path
self.check_config = True
else:
# Приоритет отдаем пути из параметра source.
if self.parameters.source:
self.input_path = self.parameters.source
else:
self.input_path = self._get_archive_path(self.target_path)
self.output_path = self._get_cfg_path(self.target_path)
# Чем обновляем CONTENTS
self.update_contents = self.output_path
# Обновляем хэш-сумму в CL
self.update_config = self.output_path
self.check_config = True
def _get_archive_path(self, file_path):
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
return join_paths(self.config_archive_path, file_path)
def _get_cfg_path(self, file_path):
if self.cfg_list:
last_cfg_name = os.path.basename(self.cfg_list[-1])
slice_value = len('._cfg')
cfg_number = int(last_cfg_name[slice_value: slice_value + 4])
else:
cfg_number = 0
cfg_number = str(cfg_number + 1)
if len(cfg_number) < 4:
cfg_number = '0' * (4 - len(cfg_number)) + cfg_number
new_cfg_name = "._cfg{}_{}".format(cfg_number,
os.path.basename(file_path))
new_cfg_path = os.path.join(os.path.dirname(file_path), new_cfg_name)
return new_cfg_path
def _update_contents(self):
pass
def use_format(self):
if self.format_class.EXECUTABLE:
# Для исполняемых форматов.
return
if self.target_type is None:
original_file_text = ''
else:
original_file = open(target_path, 'r')
original_file_text = original_file.read()
original_file.close()
original_file = open(target_path, 'w')
original_object = self.format_class(original_file_text)
template_object = self.format_class(self.template_text)
original_object.join_template(template_object)
original_file.write(original_object.document_text)
original_file.close()
def accept_changes(self):
pass
@classmethod
def _set_protected(cls):
if cls._protected_is_set:
return
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
cls._protected_set.add(protected_path.strip())
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
cls._unprotected_set.add(protected_path.strip())
cls._protected_is_set = True
class TemplateExecutor:
def __init__(self, datavars_module=Variables(), chroot_path='/',
cl_config_archive='/var/lib/calculate/config-archive',
cl_config_path='/var/lib/calculate/config'):
self.datavars_module = datavars_module
self.chroot_path = chroot_path
self.directory_default_parameters =\
ParametersProcessor.directory_default_parameters
self.file_default_parameters =\
ParametersProcessor.file_default_parameters
self.directory_appends = {'join': self._append_join_directory,
'remove': self._append_remove_directory,
'clear': self._append_clear_directory}
self.file_appends = {'join': self._append_join_file}
self.formats_classes = ParametersProcessor.available_formats
TemplateWrapper.chroot_path = self.chroot_path
TemplateWrapper.cl_config_archive = cl_config_archive
TemplateWrapper.calculate_config_file = CalculateConfigFile(
cl_config_path=cl_config_path,
cl_chroot_path=chroot_path)
@property
def available_appends(self):
appends_set = set(self.directory_appends.keys()).union(
set(self.file_appends.keys()))
return appends_set
def execute_template(self, target_path, parameters, template_type,
template_text=''):
print('Template parameters:')
parameters.print_parameters_for_debug()
# try:
template_object = TemplateWrapper(target_path, parameters,
template_type,
template_text=template_text)
# except TemplateTypeConflict as error:
# pass
if template_object.remove_original:
if template_object.target_type == DIR:
self._remove_directory(template_object.target_path)
else:
self._remove_file(template_object.target_path)
# Добавить поддержку run, excute и т.д.
self.directory_appends[template_object.parameters.append](
template_object)
def _append_join_directory(self, template_object: TemplateWrapper):
pass
def _append_remove_directory(self, template_object: TemplateWrapper):
pass
def _append_clear_directory(self, template_object: TemplateWrapper):
pass
def _append_join_file(self, template_object: TemplateWrapper):
input_path = template_object.input_path
output_path = template_object.output_path
template_format = template_object.format_class
with open(input_path, 'r') as input_file:
input_text = input_file.read()
parsed_input = template_format(input_text)
parsed_template = template_format(template_object.template_text)
parsed_input.join_template(parsed_template)
output_text = parsed_input.document_text
changed_file_md5 = hashlib.md5(output_text.encode()).hexdigest()
with open(output_path, 'w') as output_file:
output_file.write(parsed_input.document_text)
if template_object.clean_cfg:
for cfg_file_path in template_object.cfg_list:
self._remove_file(cfg_file_path)
if template_object.update_archive:
with open(template_object.update_archive, 'r') as ca_file:
ca_text = ca_file.read()
parsed_ca = template_format(ca_text)
parsed_ca.join_template(parsed_template)
ca_text = parsed_ca.document_text
template_object._update_contents()
def _create_directory(self, template_object: TemplateWrapper):
target_path = template_object.target_path
template_parameters = template_object.parameters
if os.access(target_path, os.F_OK):
if template_parameters.chmod:
self.chmod_directory(target_path)
if self.template_parameters.chown:
self.chown_directory(target_path)
return
directories_to_create = [target_path]
directory_path = os.path.dirname(target_path)
while not os.access(directory_path, os.F_OK) and directory_path:
directories_to_create.append(directory_path)
directory_path = os.path.dirname(directory_path)
try:
current_mod, current_uid, current_gid = self.get_file_info(
directory_path,
'all')
current_owner = {'uid': current_uid, 'gid': current_gid}
except OSError:
raise TemplateExecutorError('No access to the directory: {}'.
format(directory_path))
directories_to_create.reverse()
for create_path in directories_to_create:
try:
os.mkdir(create_path)
if (template_parameters.chmod and
template_parameters.chmod != current_mod):
self.chmod_directory(create_path)
elif 'chmod' in self.directory_default_parameters:
self.chmod_directory(
create_path,
chmod_value=self.directory_default_parameters['chown'])
if (template_parameters.chown and
template_parameters.chown != current_owner):
self.chown_directory
elif 'chown' in self.directory_default_parameters:
self.chown_directory(
create_path,
chown_value=self.directory_default_parameters['chmod'])
except OSError as error:
raise TemplateExecutorError(
'Failed to create directory: {}, reason: {}'.
format(create_path, str(error)))
def _remove_directory(self, target_path):
if os.path.exists(target_path):
if os.path.isdir(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
except Exception as error:
raise TemplateExecutorError(
("Failed to delete the directory: {0},"
" reason: {1}").format(target_path,
str(error)))
else:
error_message = "target file is not directory"
else:
error_message = "target file does not exist"
raise TemplateExecutorError(("Failed to delete the directory: {0},"
"reason: {1}").format(target_path,
error_message))
def _clear_directory(self, target_path):
if os.path.exists(target_path):
if os.path.isdir(target_path):
for node in os.scandir(target_path):
if node.is_dir():
self._remove_directory(node.path)
else:
self._remove_file(node.path)
else:
error_message = "target file is not directory"
else:
error_message = "target directory does not exist"
raise TemplateExecutorError(("failed to delete directory: {},"
" reason: {}").format(target_path,
error_message))
def _link_directory(self, target_path, source):
try:
os.symlink(source, target_path, target_is_directory=True)
print('linked: {0} -> {1}'.format(os.path.basename(target_path),
os.path.basename(source)))
except OSError:
raise TemplateExecutorError("Failed to create symlink: {0} -> {1}".
format(target_path, self.source))
def _execute_template(self, template_object):
pass
def _remove_file(self, target_path):
if os.path.islink(target_path):
try:
os.unlink(target_path)
except OSError:
raise TemplateExecutorError('failed to delete the link: {}'.
format(target_path))
if os.path.isfile(target_path):
try:
os.remove(target_path)
except OSError:
self.output.set_error('failed to delete the file: {}'.
format(target_path))
def _clear_file(self, target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
raise TemplateExecutorError("failed to clear the file: {}".
format(target_path))
def _link_file(self, target_path):
pass
def chown_directory(self, target_path, chown_value={}):
"""Сменить владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
try:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
def chmod_directory(self, target_path, chmod_value=False):
"""Сменить права доступа к директории."""
if not chmod_value:
chmod_value = self.template_parameters.chmod
print('chmod value = {}'.format(chmod_value))
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
self.output.set_error('reason: {}'.format(str(error)))
return False
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def set_uid_gid_error(self, path, uid, gid, template_path=''):
import pwd
import grp
try:
user_name = pwd.getpwuid(uid).pw_name
except (TypeError, KeyError):
user_name = str(uid)
try:
group_name = grp.getgrgid(gid).gr_name
except (TypeError, KeyError):
group_name = str(gid)
owner = '{0}:{1}'.format(user_name, group_name)
if template_path:
self.output.set_error('Failed to process template file {}'.
template_path)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! описать ошибку !!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.output.set_error('error with owner: {}'.format(owner))
def check_filesystem(self, target_path):
pass
def is_vfat(self, path):
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
# Применение основного шаблона:
template_engine.process_template_from_string(template_text, FILE)
template_parameters = template_engine.parameters
template_text = template_engine.template_text
template_action_obj = TemplateExecutor(datavars_module=DATAVARS_MODULE,
chroot_path=CHROOT_PATH)
result = template_action_obj.use_file_template(target_path,
template_parameters,
template_text=template_text)
print('MAIN TEST TEMPLATE IS USED')
# Применение шаблона бэкапа:
template_engine.process_template_from_string(backup_template_text, FILE)
result = template_action_obj.use_file_template(
target_path,
template_engine.parameters,
template_text=template_engine.template_text)
print('BACKUP TEMPLATE IS USED')