You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/calculate/templates/template_processor.py

755 lines
33 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
import os
import re
import stat
import shutil
from ..utils.files import join_paths
from .template_engine import TemplateEngine, Variables, ConditionFailed,\
TemplateParametersChecker, DIR, FILE,\
IncorrectParameter
from ..utils.io_module import IOModule
from collections import OrderedDict
from ..utils.mount import Mounts
class TemplateActionError(Exception):
pass
class TemplateError(Exception):
pass
class SystemAction:
pass
class TemplateAction:
def __init__(self, output_module=IOModule(),
chroot_path='/', mounts=None):
self.output = output_module
self.mounts = mounts
self.chroot_path = chroot_path
self.exec_list = OrderedDict()
self.DIRECTORY_APPENDS = {
'remove': self._append_remove_directory,
'clear': self._append_clear_directory,
'replace': self._append_replace_directory,
'link': self._append_link_directory,
'join': self._append_join_directory,
'skip': self._append_skip_directory
}
self.FILE_APPENDS = {
'replace': self._append_replace_file,
'remove': self._append_remove_file,
'clear': self._append_clear_file,
'link': self._append_link_file,
'join': self._append_join_file,
'before': self._append_before_file,
'after': self._append_after_file,
'skip': self._append_skip_file
}
@property
def available_appends(self):
appends_set = set(self.DIRECTORY_APPENDS.keys()).union(
set(self.FILE_APPENDS.keys()))
return appends_set
def process_template_directory(self, target_path, template_path,
parameters={}):
'''Метод для выполнения шаблона директории.'''
self.template_path = template_path
self.target_path = target_path
try:
self.template_parameters = TemplateParametersChecker(
parameters,
template_type=DIR,
chroot_path=self.chroot_path)
except IncorrectParameter as error:
self.output.set_error('Incorrect parameter: {}'.format(str(error)))
return
self.output.console_output.print_default(
'With parameters: {}\n'.
format(self.template_parameters._parameters_dictionary)
)
self.output.set_info('Template path: {}'.format(self.template_path))
self.output.set_info('Target directory: {}'.format(self.target_path))
# Методы соответствующие разным значениям append для директорий.
def _append_join_directory(self):
'''Создать каталог.'''
self._create_directory()
def _append_remove_directory(self):
'''Удалить каталог и все его содержимое.'''
pass
def _append_clear_directory(self):
'''Очистить содержимое каталога, если каталог есть.'''
pass
def _append_replace_directory(self):
'''Очистить содержимое каталога или создать его.'''
pass
def _append_link_directory(self):
'''Создать символическую ссылку на указанный каталог.'''
pass
def _append_skip_directory(self):
pass
# Непосредственно действия с директориями.
def _create_directory(self, target_path=''):
if not target_path:
target_path = self.target_path
if os.access(target_path, os.F_OK):
if self.template_parameters.chmod:
self.chmod_directory(target_path)
if self.template_parameters.chown:
self.chown_directory(target_path)
return True
directory = target_path
directories_to_create = [target_path]
directory = os.path.dirname(directory)
while not os.access(directory, os.F_OK) and directory:
directories_to_create.append(directory)
directory = os.path.dirname(directory)
try:
current_mod, current_uid, current_gid = self.get_file_info(
directory,
'all')
current_chown = {'uid': current_uid, 'gid': current_gid}
except OSError:
self.output.set_error('No access to the directory: {}'.format(
directory))
return False
directories_to_create.reverse()
for directory_path in directories_to_create:
try:
os.mkdir(directory_path)
if self.template_parameters.chmod:
self.chmod_directory(directory_path)
else:
self.chmod_directory(directory_path,
chmod_value=current_mod)
if self.template_parameters.chown:
self.chown_directory
else:
self.chown_directory(directory_path,
chown_value=current_chown)
except OSError as error:
self.output.set_error('Failed to create directory: {}'.
format(directory_path))
self.output.set_error('Reason: {}'.format(str(error)))
return False
return True
def _remove_directory(self, target_path):
if os.path.isdir(target_path) or os.path.exists(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
return True
except Exception as error:
self.output.set_error("Failed to delete the directory: {}".
format(target_path))
self.output.set_error("Reason: {}".
format(str(error)))
return False
else:
self.output.set_error("Failed to delete the directory: {}".
format(target_path))
self.output.set_error(
"Target file is not directory or not exists.".
format(target_path))
return False
def _clear_directory(self, target_path):
if os.path.isdir(target_path) and os.path.exists(target_path):
for node in os.scandir(target_path):
if node.is_dir():
if self._remove_directory(node.path):
return True
else:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
try:
os.remove(node.path)
return True
except OSError:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
self.output.set_error('failed to delete directory: {}'.
format(target_path))
self.output.set_error(
'target file is not directory or does not exist: {}'.
format(target_path))
def _link_directory(self, target_path):
if not self.source:
self.output.set_error("'source' parameter is not defined for "
"'append = link' in template {}".
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
if not os.path.exists(self.source):
self.output.set_error("'source' file does not exist for "
"'append = link' in template {}".
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
try:
os.symlink(self.source, target_path, target_is_directory=True)
print('linked: {0} -> {1}'.format(os.path.basename(target_path),
os.path.basename(self.source)))
return True
except OSError:
self.output.set_error('Template error: {}'.
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
def process_template_file(self, target_path, template_path,
template_text='', parameters={}):
self.template_text = template_text
self.template_path = template_path
self.target_path = target_path
try:
self.template_parameters = TemplateParametersChecker(
parameters,
template_type=FILE,
chroot_path=self.chroot_path)
except IncorrectParameter as error:
self.output.set_error('Incorrect parameter: {}'.format(str(error)))
return
self.output.console_output.print_default(
'With parameters: {}\n'.
format(self.template_parameters._parameters_dictionary)
)
self.output.set_info('Template path: {}'.format(self.template_path))
self.output.set_info('Target file: {}.\nTemplate text:'.
format(self.target_path))
self.output.console_output.print_default(self.template_text + '\n\n')
# Методы соответствующие значениями параметра append для файлов шаблонов.
def _append_replace_file(self):
pass
def _append_remove_file(self):
pass
def _append_clear_file(self):
pass
def _append_link_file(self):
pass
def _append_join_file(self):
pass
def _append_before_file(self):
pass
def _append_after_file(self):
pass
def _append_skip_file(self):
pass
# Методы для работы непосредственно с файлами.
def join_file(self, target_path, option=False):
pass
def remove_file(self, target_path):
try:
if os.path.islink(target_path):
try:
os.unlink(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the link: {}'.
format(target_path))
return False
if os.path.isfile(target_path):
try:
os.remove(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the file: {}'.
format(target_path))
finally:
pass
def clear_file(self, target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
self.output.set_error("Template error: {}".
format(self.template_path))
self.output.set_error("Failed to clear the file: {}".
format(target_path))
return False
def link_file(self, target_path):
pass
def chown_directory(self, target_path, chown_value={}):
"""Сменить владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
try:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
def chmod_directory(self, target_path, chmod_value=False):
"""Сменить права доступа к директории."""
if not chmod_value:
chmod_value = self.template_parameters.chmod
print('chmod value = {}'.format(chmod_value))
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
self.output.set_error('reason: {}'.format(str(error)))
return False
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def set_uid_gid_error(self, path, uid, gid, template_path=''):
import pwd
import grp
try:
user_name = pwd.getpwuid(uid).pw_name
except (TypeError, KeyError):
user_name = str(uid)
try:
group_name = grp.getgrgid(gid).gr_name
except (TypeError, KeyError):
group_name = str(gid)
owner = '{0}:{1}'.format(user_name, group_name)
if template_path:
self.output.set_error('Failed to process template file {}'.
template_path)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! описать ошибку !!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.output.set_error('error with owner: {}'.format(owner))
def is_vfat(self, path):
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
def _clear_parameters(self):
'''Метод для удаления данных о шаблоне директории после
его выполнения.'''
try:
del(self.template_parameters)
except NameError:
pass
class DirectoryProcessor:
chmod_regex = re.compile(r'\d{3}')
def __init__(self, action, datavars_module=Variables(), package='',
output_module=IOModule(), test_mode=False):
self.action = action
self.chroot_path = datavars_module.main.cl_chroot_path
if not test_mode:
self.cl_chroot_path = datavars_module.main.cl_chroot_path
else:
self.cl_chroot_path = datavars_module.test.test_root
self.datavars_module = datavars_module
self.output = output_module
self.template_action = TemplateAction(output_module=self.output,
chroot_path=self.chroot_path)
self.inheritable_parameters =\
set(TemplateParametersChecker.inheritable_parameters)
self.template_engine = TemplateEngine(datavars_module=datavars_module)
self.template_paths = (self.datavars_module.
main.cl_template_path.split(','))
self.for_package = package
self.processed_packages = []
self.packages_to_merge = []
self.packages_file_paths = {}
def process_template_directories(self):
# Проходим каталоги из main.cl_template.path
for directory_path in self.template_paths:
directory_path = directory_path.strip()
entries = os.scandir(directory_path)
for node in entries:
self.walk_directory_tree(node.path,
self.cl_chroot_path,
{}, set())
if self.for_package:
self.output.set_info('Processing packages from merge parameter...')
self.processed_packages.append(self.for_package)
self.merge_packages()
def merge_packages(self):
not_merged_packages = []
while self.packages_to_merge:
self.for_package = self.packages_to_merge.pop()
if self.for_package not in self.packages_file_paths:
self.output.set_error(
"Error: package '{0}' not found for action '{1}'.".
format(self.for_package, self.action)
)
not_merged_packages.append(self.for_package)
continue
for template_path in self.packages_file_paths[self.for_package]:
base_directory, queue = self.get_directories_queue(
template_path
)
first_directory = queue.pop()
first_directory_path = os.path.join(base_directory,
first_directory)
self.walk_directory_tree(first_directory_path,
self.cl_chroot_path, {}, set(),
directories_queue=queue)
self.processed_packages.append(self.for_package)
if not_merged_packages:
self.output.set_error('Packages {} is not merged.'.
format(','.join(self.packages_to_merge)))
else:
self.output.set_success('All packages are merged...')
def get_directories_queue(self, path):
directories_queue = []
for base_dir in self.template_paths:
base_dir = base_dir.strip()
if path.startswith(base_dir):
base_directory = base_dir
break
while path != base_directory:
path, dir_name = os.path.split(path)
directories_queue.append(dir_name)
return base_directory, directories_queue
def walk_directory_tree(self, current_directory_path, target_path,
directory_parameters, directory_env,
directories_queue=[]):
template_files = []
template_directories = []
directory_name = os.path.basename(current_directory_path)
current_env = directory_env.copy()
current_target_path = target_path
directory_parameters = directory_parameters.copy()
entries = os.scandir(current_directory_path)
self.template_engine.change_directory(current_directory_path)
for node in entries:
# Временное, вместо этого добавить переменную ignore_files.
if node.name.endswith('.swp'):
continue
if node.is_dir():
template_directories.append(node.path)
elif node.is_file():
template_files.append(node.name)
# обрабатываем в первую очередь шаблон директории.
if '.calculate_directory' in template_files:
template_files.remove('.calculate_directory')
try:
self.template_engine.process_template('.calculate_directory',
env=current_env)
except ConditionFailed:
self.output.set_warning('Condition failed in {}'.
format(current_directory_path))
self.output.console_output.print_default('\n')
return
except Exception as error:
self.output.set_error('Template error: {0} \nTemplate: {1},'
' lineno: {2}'.
format(str(error),
current_directory_path,
error.lineno))
return
directory_parameters.update(self.template_engine.parameters)
# Если есть параметр name -- меняем имя текущего каталога.
if 'name' in directory_parameters:
directory_name = directory_parameters['name']
# Если есть параметр path -- меняем текущий путь к целевому
# каталогу.
if 'path' in directory_parameters:
current_target_path = join_paths(self.cl_chroot_path,
directory_parameters['path'])
if directory_parameters.get('append', None) != 'skip':
current_target_path = os.path.join(current_target_path,
directory_name)
if directory_parameters.get('action', None) != self.action:
self.output.set_error(
'Action parameter not found or not relevant: {}'.
format(directory_name))
self.output.console_output.print_default('\n')
return
# Если шаблоны обрабатываются для одного пакета -- проверяем
# параметр package и собираем пути к директориям других
# пакетов, необходимые для merge.
if self.for_package:
if 'package' not in directory_parameters:
self.output.set_warning(
"'package' is not defined for {}".
format(current_directory_path)
)
# считаем наличие параметра package необязательным.
# return
elif directory_parameters['package'] != self.for_package:
package_name = directory_parameters['package']
if package_name in self.packages_file_paths:
self.packages_file_paths[package_name].append(
current_directory_path
)
else:
self.packages_file_paths[package_name] =\
[current_directory_path]
self.output.set_warning(
"'package' parameter is not actual in {}".
format(current_directory_path)
)
return
# Если параметр env содержит несуществующий модуль -- шаблон не
# выполняем.
if 'env' in directory_parameters:
env_to_check = directory_parameters['env'].split(',')
for name in env_to_check:
if name.strip() not in self.datavars_module:
return
del(directory_parameters['env'])
# Если есть параметр merge -- собираем указанные в нем пакеты
# в списке имен пакетов для последующей обработки.
if self.for_package and 'merge' in directory_parameters:
merge_list = directory_parameters['merge'].split(',')
del(directory_parameters['merge'])
for package in merge_list:
self.packages_to_merge.append(package.strip())
# Выполняем действие с директорией.
self.output.set_success('Processing directory: {}'.
format(current_directory_path))
self.template_action.process_template_directory(
current_target_path,
template_path=current_directory_path,
parameters=directory_parameters)
# Оставляем только наследуемые параметры.
directory_parameters = {name: value for name, value in
directory_parameters.items()
if name in self.inheritable_parameters}
if ('chmod' in directory_parameters and
directory_parameters['chmod'].isdigit() and
len(directory_parameters['chmod']) <= 4):
del(directory_parameters['chmod'])
if directories_queue:
next_node = directories_queue.pop()
if next_node in template_files:
template_files = [next_node]
template_directories = []
else:
next_directory_path = os.path.join(current_directory_path,
next_node)
template_files = []
template_directories = [next_directory_path]
# обрабатываем файлы шаблонов хранящихся в директории.
for template_name in template_files:
template_parameters = {}
template_parameters = directory_parameters.copy()
template_path = os.path.join(current_directory_path, template_name)
try:
self.template_engine.process_template(template_name,
env=current_env.copy())
except ConditionFailed:
self.output.set_warning('Condition failed for: {}'.
format(template_name))
continue
except Exception as error:
self.output.set_error('Template error: {0} in template {1}'.
format(str(error), template_path))
continue
template_parameters.update(self.template_engine.parameters)
template_text = self.template_engine.template_text
if template_parameters.get('action', None) != self.action:
self.output.set_warning(
'Action parameter not found or not relevant.'
)
continue
if 'action' in template_parameters:
del(template_parameters['action'])
if self.for_package:
if 'package' not in template_parameters:
self.output.set_warning("'package' is not defined for {}".
format(template_path))
# считаем параметр package необязательным.
# continue
elif template_parameters['package'] != self.for_package:
package_name = template_parameters['package']
if package_name in self.packages_file_paths:
self.packages_file_paths[package_name].append(
template_path
)
else:
self.packages_file_paths[package_name] =\
[template_path]
self.output.set_warning(
"'package' parameter is not actual in {}".
format(template_name)
)
continue
if 'merge' in template_parameters:
merge_list = template_parameters['merge'].split(',')
del(template_parameters['merge'])
for package_name in merge_list:
if package_name not in self.processed_packages:
self.packages_to_merge.append(package_name.strip())
# Если параметр env содержит несуществующий модуль -- шаблон не
# выполняем.
if 'env' in template_parameters:
env_to_check = template_parameters['env'].split(',')
for name in env_to_check:
if name.strip() not in self.datavars_module:
continue
del(template_parameters['env'])
if 'name' in template_parameters:
template_name = template_parameters['name']
if 'path' in template_parameters:
target_file_path = join_paths(self.cl_chroot_path,
template_parameters['path'],
template_name)
else:
target_file_path = join_paths(current_target_path,
template_name)
# Выполняем действие с использованием шаблона.
self.output.set_success('Processing template: {}...'.
format(template_name))
self.template_action.process_template_file(
target_file_path,
template_path,
template_text=template_text,
parameters=template_parameters)
# проходимся далее по директориям.
for directory_path in template_directories:
self.walk_directory_tree(directory_path, current_target_path,
directory_parameters, current_env,
directories_queue=directories_queue)