You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/calculate/templates/template_processor.py

896 lines
38 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
import os
import re
import stat
import shutil
import copy
from pprint import pprint
from ..utils.files import join_paths
from .template_engine import TemplateEngine, Variables, ConditionFailed,\
ParametersProcessor, DIR, FILE,\
IncorrectParameter, ParametersContainer
from ..utils.io_module import IOModule
from collections import OrderedDict, abc
from ..utils.mount import Mounts
class TemplateActionError(Exception):
pass
class TemplateError(Exception):
pass
class SystemAction:
pass
class TemplateAction:
def __init__(self, output_module=IOModule(),
chroot_path='/', mounts=None):
self.output = output_module
self.mounts = mounts
self.chroot_path = chroot_path
self.exec_list = OrderedDict()
self.DIRECTORY_APPENDS = {
'remove': self._append_remove_directory,
'clear': self._append_clear_directory,
'replace': self._append_replace_directory,
'link': self._append_link_directory,
'join': self._append_join_directory,
'skip': self._append_skip_directory
}
self.FILE_APPENDS = {
'replace': self._append_replace_file,
'remove': self._append_remove_file,
'clear': self._append_clear_file,
'link': self._append_link_file,
'join': self._append_join_file,
'before': self._append_before_file,
'after': self._append_after_file,
'skip': self._append_skip_file
}
@property
def available_appends(self):
appends_set = set(self.DIRECTORY_APPENDS.keys()).union(
set(self.FILE_APPENDS.keys()))
return appends_set
def process_template_directory(self, target_path, template_path,
parameters={}):
'''Метод для выполнения шаблона директории.'''
self.template_path = template_path
self.target_path = target_path
try:
self.template_parameters = ParametersProcessor(
parameters,
template_type=DIR,
chroot_path=self.chroot_path)
except IncorrectParameter as error:
self.output.set_error('Incorrect parameter: {}'.format(str(error)))
return
self.output.console_output.print_default(
'With parameters: {}\n'.
format(self.template_parameters._parameters_dictionary)
)
self.output.set_info('Template path: {}'.format(self.template_path))
self.output.set_info('Target directory: {}'.format(self.target_path))
# Методы соответствующие разным значениям append для директорий.
def _append_join_directory(self):
'''Создать каталог.'''
self._create_directory()
def _append_remove_directory(self):
'''Удалить каталог и все его содержимое.'''
pass
def _append_clear_directory(self):
'''Очистить содержимое каталога, если каталог есть.'''
pass
def _append_replace_directory(self):
'''Очистить содержимое каталога или создать его.'''
pass
def _append_link_directory(self):
'''Создать символическую ссылку на указанный каталог.'''
pass
def _append_skip_directory(self):
pass
# Непосредственно действия с директориями.
def _create_directory(self, target_path=''):
if not target_path:
target_path = self.target_path
if os.access(target_path, os.F_OK):
if self.template_parameters.chmod:
self.chmod_directory(target_path)
if self.template_parameters.chown:
self.chown_directory(target_path)
return True
directory = target_path
directories_to_create = [target_path]
directory = os.path.dirname(directory)
while not os.access(directory, os.F_OK) and directory:
directories_to_create.append(directory)
directory = os.path.dirname(directory)
try:
current_mod, current_uid, current_gid = self.get_file_info(
directory,
'all')
current_chown = {'uid': current_uid, 'gid': current_gid}
except OSError:
self.output.set_error('No access to the directory: {}'.format(
directory))
return False
directories_to_create.reverse()
for directory_path in directories_to_create:
try:
os.mkdir(directory_path)
if self.template_parameters.chmod:
self.chmod_directory(directory_path)
else:
self.chmod_directory(directory_path,
chmod_value=current_mod)
if self.template_parameters.chown:
self.chown_directory
else:
self.chown_directory(directory_path,
chown_value=current_chown)
except OSError as error:
self.output.set_error('Failed to create directory: {}'.
format(directory_path))
self.output.set_error('Reason: {}'.format(str(error)))
return False
return True
def _remove_directory(self, target_path):
if os.path.isdir(target_path) or os.path.exists(target_path):
try:
if os.path.islink(target_path):
os.unlink(target_path)
else:
shutil.rmtree(target_path)
return True
except Exception as error:
self.output.set_error("Failed to delete the directory: {}".
format(target_path))
self.output.set_error("Reason: {}".
format(str(error)))
return False
else:
self.output.set_error("Failed to delete the directory: {}".
format(target_path))
self.output.set_error(
"Target file is not directory or not exists.".
format(target_path))
return False
def _clear_directory(self, target_path):
if os.path.isdir(target_path) and os.path.exists(target_path):
for node in os.scandir(target_path):
if node.is_dir():
if self._remove_directory(node.path):
return True
else:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
try:
os.remove(node.path)
return True
except OSError:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
self.output.set_error('failed to delete directory: {}'.
format(target_path))
self.output.set_error(
'target file is not directory or does not exist: {}'.
format(target_path))
def _link_directory(self, target_path):
if not self.source:
self.output.set_error("'source' parameter is not defined for "
"'append = link' in template {}".
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
if not os.path.exists(self.source):
self.output.set_error("'source' file does not exist for "
"'append = link' in template {}".
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
try:
os.symlink(self.source, target_path, target_is_directory=True)
print('linked: {0} -> {1}'.format(os.path.basename(target_path),
os.path.basename(self.source)))
return True
except OSError:
self.output.set_error('Template error: {}'.
format(self.template_path))
self.output.set_error('Failed to create symlink: {0} -> {1}'.
format(target_path, self.source))
return False
def process_template_file(self, target_path, template_path,
template_text='', parameters={}):
self.template_text = template_text
self.template_path = template_path
self.target_path = target_path
try:
self.template_parameters = ParametersProcessor(
parameters,
template_type=FILE,
chroot_path=self.chroot_path)
except IncorrectParameter as error:
self.output.set_error('Incorrect parameter: {}'.format(str(error)))
return
self.output.console_output.print_default(
'With parameters: {}\n'.
format(self.template_parameters._parameters_dictionary)
)
self.output.set_info('Template path: {}'.format(self.template_path))
self.output.set_info('Target file: {}.\nTemplate text:'.
format(self.target_path))
self.output.console_output.print_default(self.template_text + '\n\n')
# Методы соответствующие значениями параметра append для файлов шаблонов.
def _append_replace_file(self):
pass
def _append_remove_file(self):
pass
def _append_clear_file(self):
pass
def _append_link_file(self):
pass
def _append_join_file(self):
pass
def _append_before_file(self):
pass
def _append_after_file(self):
pass
def _append_skip_file(self):
pass
# Методы для работы непосредственно с файлами.
def join_file(self, target_path, option=False):
pass
def remove_file(self, target_path):
try:
if os.path.islink(target_path):
try:
os.unlink(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the link: {}'.
format(target_path))
return False
if os.path.isfile(target_path):
try:
os.remove(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the file: {}'.
format(target_path))
finally:
pass
def clear_file(self, target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
self.output.set_error("Template error: {}".
format(self.template_path))
self.output.set_error("Failed to clear the file: {}".
format(target_path))
return False
def link_file(self, target_path):
pass
def chown_directory(self, target_path, chown_value={}):
"""Сменить владельца директории."""
if not chown_value:
chown_value = self.template_parameters.chown
print('chown value = {}'.format(chown_value))
try:
os.chown(target_path, chown_value['uid'], chown_value['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
def chmod_directory(self, target_path, chmod_value=False):
"""Сменить права доступа к директории."""
if not chmod_value:
chmod_value = self.template_parameters.chmod
print('chmod value = {}'.format(chmod_value))
try:
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
self.output.set_error('reason: {}'.format(str(error)))
return False
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def set_uid_gid_error(self, path, uid, gid, template_path=''):
import pwd
import grp
try:
user_name = pwd.getpwuid(uid).pw_name
except (TypeError, KeyError):
user_name = str(uid)
try:
group_name = grp.getgrgid(gid).gr_name
except (TypeError, KeyError):
group_name = str(gid)
owner = '{0}:{1}'.format(user_name, group_name)
if template_path:
self.output.set_error('Failed to process template file {}'.
template_path)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! описать ошибку !!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.output.set_error('error with owner: {}'.format(owner))
def is_vfat(self, path):
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
def _clear_parameters(self):
'''Метод для удаления данных о шаблоне директории после
его выполнения.'''
try:
del(self.template_parameters)
except NameError:
pass
class DirectoryTree:
def __init__(self, base_directory):
self.base_directory = base_directory
self._tree = {}
def update_tree(self, tree):
self._update(self._tree, tree)
def _update(self, original_tree, tree):
for parent, child in tree.items():
if isinstance(child, abc.Mapping):
original_tree[parent] = self._update(original_tree.get(parent,
dict()),
child)
else:
original_tree[parent] = child
return original_tree
def show_tree(self):
pprint(self._tree)
def get_directory_tree(self, directory):
directory_tree = DirectoryTree(os.path.join(self.base_directory,
directory))
directory_tree._tree = self._tree[directory]
return directory_tree
def __getitem__(self, name):
if name in self._tree:
return self._tree[name]
else:
return None
def __setitem__(self, name, value):
self._tree[name] = value
def __iter__(self):
if self._tree is not None:
return iter(self._tree.keys())
else:
return iter([])
def __repr__(self):
return '<DirectoryTree: {}>'.format(self._tree)
def __bool__(self):
return bool(self._tree)
class DirectoryProcessor:
chmod_regex = re.compile(r'\d{3}')
def __init__(self, action, datavars_module=Variables(), package='',
output_module=IOModule(), without_execution=False,
debug_mode=False):
self.action = action
self.debug_mode = debug_mode
self.cl_chroot_path = datavars_module.main.cl_chroot_path
self.without_execution = without_execution
self.datavars_module = datavars_module
self.output = output_module
self.template_action = TemplateAction(output_module=self.output,
chroot_path=self.cl_chroot_path)
self.template_engine = TemplateEngine(
datavars_module=datavars_module,
chroot_path=self.cl_chroot_path,
appends_set=self.template_action.available_appends)
if package:
try:
self.for_package = self.template_engine.parameters_processor.\
check_package_parameter(package)
except ConditionFailed as error:
self.output.set_error(str(error))
return
else:
self.for_package = False
self.template_paths = (self.datavars_module.
main.cl_template_path.split(','))
self.inheritable_parameters = set(
ParametersProcessor.inheritable_parameters)
self.processed_packages = []
self.packages_to_merge = []
self.packages_file_trees = {}
def process_template_directories(self):
# Проходим каталоги из main.cl_template.path
# Режим заполнения очередей директорий пакетов, необходимых для более
# быстрой обработки параметра merge.
if self.for_package:
self.fill_trees = True
else:
self.fill_trees = False
for directory_path in self.template_paths:
self.base_directory = directory_path.strip()
entries = os.scandir(self.base_directory)
for node in entries:
self.directory_tree = {}
self.walk_directory_tree(node.path,
self.cl_chroot_path,
ParametersContainer(),
directory_tree=self.directory_tree)
# Теперь когда дерево заполнено, можно выключить этот режим.
self.fill_trees = False
if self.for_package:
self.output.set_info('Processing packages from merge parameter...')
self.processed_packages.append(self.for_package)
self.merge_packages()
def merge_packages(self):
not_merged_packages = []
while self.packages_to_merge:
self.for_package = self.packages_to_merge.pop()
if self.for_package not in self.packages_file_trees:
self.output.set_error(
"Error: package '{0}' not found for action '{1}'.".
format(self.for_package, self.action)
)
not_merged_packages.append(self.for_package)
continue
for directory_name in self.packages_file_trees[self.for_package]:
directory_tree = self.packages_file_trees[self.for_package].\
get_directory_tree(directory_name)
self.walk_directory_tree(directory_tree.base_directory,
self.cl_chroot_path,
ParametersContainer(),
directory_tree=directory_tree)
self.processed_packages.append(self.for_package)
if not_merged_packages:
self.output.set_error('Packages {} is not merged.'.
format(','.join(self.packages_to_merge)))
else:
self.output.set_success('All packages are merged...')
def get_directories_queue(self, path):
'''Уже не актуальный метод для построение очередей из путей к
шаблонам.'''
directories_queue = []
for base_dir in self.template_paths:
base_dir = base_dir.strip()
if path.startswith(base_dir):
base_directory = base_dir
break
while path != base_directory:
path, dir_name = os.path.split(path)
directories_queue.append(dir_name)
return base_directory, directories_queue
def walk_directory_tree(self, current_directory_path, target_path,
directory_parameters, directory_tree={}):
template_files = []
template_directories = []
directory_name = os.path.basename(current_directory_path)
if self.fill_trees:
directory_tree[directory_name] = {}
current_target_path = target_path
entries = os.scandir(current_directory_path)
self.template_engine.change_directory(current_directory_path)
for node in entries:
# Временное, вместо этого добавить переменную ignore_files.
if node.name.endswith('.swp'):
continue
if node.is_dir():
template_directories.append(node.path)
elif node.is_file():
template_files.append(node.name)
# обрабатываем в первую очередь шаблон директории.
if '.calculate_directory' in template_files:
template_files.remove('.calculate_directory')
try:
self.template_engine.process_template(
'.calculate_directory',
template_type=DIR,
parameters=directory_parameters)
except ConditionFailed as error:
self.output.set_warning('{}.\nTemplate: {}'.
format(str(error),
current_directory_path))
directory_tree = {}
return
except Exception as error:
self.output.set_error('Template error: {0}\nTemplate: {1}'.
format(str(error),
current_directory_path))
directory_tree = {}
return
# directory_parameters.print_parameters_for_debug()
# Если есть параметр name -- меняем имя текущего каталога.
if directory_parameters.name:
directory_name = directory_parameters.name
# Если есть параметр path -- меняем текущий путь к целевому
# каталогу.
if directory_parameters.path:
current_target_path = join_paths(self.cl_chroot_path,
directory_parameters.path)
if self.fill_trees and self.check_package_and_action(
directory_parameters,
current_directory_path,
DIR,
directory_tree=directory_tree):
current_target_path = os.path.join(current_target_path,
directory_name)
elif self.fill_trees and self.check_package_and_action(
directory_parameters,
current_directory_path,
DIR):
current_target_path = os.path.join(current_target_path,
directory_name)
else:
# Обновляем дерево директорий для данного пакета.
if (self.fill_trees and
directory_tree[directory_name] is None):
package_name = directory_parameters.package
if package_name in self.packages_file_trees:
self.packages_file_trees[package_name].update_tree(
copy.deepcopy(self.directory_tree)
)
else:
directory_tree = DirectoryTree(self.base_directory)
directory_tree.update_tree(
copy.deepcopy(self.directory_tree))
self.packages_file_trees[package_name] = directory_tree
directory_tree = {}
return
if self.for_package and directory_parameters.merge:
self.packages_to_merge.extend(directory_parameters.merge)
else:
# Если .calculate_directory отсутствует -- создаем директорию
# используя унаследованные параметры и имя самой директории.
if not self.check_package_and_action(
directory_parameters,
current_directory_path,
DIR,
directory_tree=directory_tree):
# Обновляем дерево директорий для данного пакета.
if (directory_tree[directory_name] is None and
self.fill_trees):
package_name = directory_parameters.package
if package_name in self.packages_file_trees:
self.packages_file_trees[package_name].update_tree(
copy.deepcopy(self.directory_tree)
)
else:
directory_tree = DirectoryTree(self.base_directory)
directory_tree.update_tree(
copy.deepcopy(self.directory_tree))
self.packages_file_trees[package_name] = directory_tree
directory_tree = {}
return
directory_parameters.set_parameter({'append': 'join'})
current_target_path = os.path.join(current_target_path,
directory_name)
# Выполняем действие с директорией.
if not self.without_execution:
self.output.set_success('Processing directory: {}'.
format(current_directory_path))
self.template_action.process_template_directory(
current_target_path,
template_path=current_directory_path,
parameters=directory_parameters)
else:
self.output.set_success('Processing directory: {}'.
format(current_directory_path))
# Если режим заполнения очередей выключен -- используем имеющуюся
# очередь для получения списков обрабатываемых файлов и директорий.
if not self.fill_trees and directory_tree:
template_files = []
template_directories = []
for template in directory_tree:
if template in template_files:
template_files.append(template)
else:
next_directory_tree =\
directory_tree.get_directory_tree(template)
template_directories.append(next_directory_tree)
# обрабатываем файлы шаблонов хранящихся в директории.
for template_name in template_files:
directory_parameters.remove_not_inheritable()
template_path = os.path.join(current_directory_path, template_name)
try:
self.template_engine.process_template(
template_name, FILE,
parameters=directory_parameters)
except ConditionFailed as error:
self.output.set_warning('{0}.\nTemplate: {1}'.
format(str(error),
current_directory_path))
continue
except Exception as error:
self.output.set_error('Template error: {0} \nTemplate: {1}'.
format(str(error),
current_directory_path))
continue
# directory_parameters.print_parameters_for_debug()
template_text = self.template_engine.template_text
if self.fill_trees and not self.check_package_and_action(
directory_parameters,
template_path, FILE,
directory_tree=directory_tree[directory_name]):
continue
elif not self.fill_trees and not self.check_package_and_action(
directory_parameters,
template_path, FILE):
continue
# Если есть параметр merge добавляем в список
if self.for_package and directory_parameters.merge:
self.packages_to_merge.extend(directory_parameters.merge)
if directory_parameters.name:
template_name = directory_parameters.name
if directory_parameters.path:
target_file_path = join_paths(self.cl_chroot_path,
directory_parameters.path,
template_name)
else:
target_file_path = join_paths(current_target_path,
template_name)
# Выполняем действие с использованием шаблона.
if not self.without_execution:
self.output.set_success('Processing template: {}...'.
format(template_path))
self.template_action.process_template_file(
target_file_path,
template_path,
template_text=template_text,
parameters=directory_parameters)
else:
self.output.set_success('Processing template: {}...'.
format(template_path))
# Обновляем дерево директорий для данного пакета.
if (self.fill_trees and directory_tree[directory_name]):
package_name = directory_parameters.package
if package_name in self.packages_file_trees:
self.packages_file_trees[package_name].update_tree(
copy.deepcopy(self.directory_tree))
else:
directory_tree = DirectoryTree(self.base_directory)
directory_tree.update_tree(copy.deepcopy(self.directory_tree))
self.packages_file_trees[package_name] = directory_tree
directory_tree[directory_name] = {}
# проходимся далее по директориям.
for directory in template_directories:
if self.fill_trees:
self.walk_directory_tree(
directory, current_target_path,
directory_parameters.get_inheritables(),
directory_tree=directory_tree[directory_name])
directory_tree[directory_name] = {}
else:
if isinstance(directory, DirectoryTree):
directory_path = directory.base_directory
else:
directory_path = directory
self.walk_directory_tree(
directory_path,
current_target_path,
directory_parameters.get_inheritables())
if self.fill_trees:
directory_tree = {}
return
def check_package_and_action(self, parameters, template_path,
template_type, directory_tree=None):
if parameters.append != 'skip':
if not parameters.action:
self.output.set_warning(
("Action parameter value '{0}' not found."
"\nTemplate: {1}").format(parameters.action,
template_path))
return False
elif parameters.action != self.action:
self.output.set_warning(
("Action parameter value '{0}' does not match its"
" current value '{1}'.\nTemplate: {2}").format(
parameters.action,
template_path))
return False
if self.for_package:
if not parameters.package:
self.output.set_warning(
"'package' parameter is not defined.\nTemplate: {}".
format(template_path))
# считаем наличие параметра package необязательным.
# return False
elif parameters.package != self.for_package:
if directory_tree is not None:
template_name = os.path.basename(template_path)
directory_tree[template_name] = None
self.output.set_warning(
("'package' parameter value '{0}' does not "
"match its current target package.\nTemplate: {1}").
format(parameters.package, template_path)
)
return False
return True