You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-4-lib/calculate/templates/template_processor.py

747 lines
33 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
import re
import stat
import shutil
from .template_engine import TemplateEngine, Variables, ConditionFailed
from ..utils.io_module import IOModule
from collections import OrderedDict
from ..utils.mount import Mounts
class TemplateAction:
def __init__(self, output_module=IOModule(), base_directory='/'):
self.output = output_module
self.mounts = None
self.base_directory = base_directory
self.exec_list = OrderedDict()
self.DIRECTORY_APPENDS = {
'remove': self.remove_directory,
'clear': self.clear_directory,
'replace': self.remove_directory,
'link': self.link_directory,
'join': self.join_directory
}
self.FILE_APPENDS = {
'replace': self.replace_file,
'remove': self.remove_file,
'clear': self.clear_file,
'link': self.link_file,
'join': self.join_file,
'before': self.before_file,
'after': self.after_file
}
def process_template_directory(self, target_path, template_path,
parameters={}):
self.template_path = template_path
# разбираем общие параметры шаблона.
self.force = parameters.pop('force', False)
self.autoupdate = parameters.pop('autoupdate', False)
self.source = parameters.pop('source', False)
if self.source and parameters['append'] != 'link':
self.output.set_error(("'source' parameter is set without "
"'append = link' in template {}").
format(self.template_path))
return False
self.chown = parameters.get('chown', False)
if not self.get_chown_values(self.chown):
return False
self.chmod = parameters.get('chmod', False)
def join_directory(self, target_path, template_path):
if os.access(target_path, os.F_OK):
return True
directory = target_path
directories_to_create = []
while not os.access(directory, os.F_OK) and directory:
directory = os.path.split(directory)[0]
directories_to_create.append(directory)
try:
current_mode, current_uid, current_gid = self.get_file_info(
target_path,
'all')
except OSError:
self.output.set_error("No access to the directory: {}".
format(target_path))
return False
if self.chmode:
mode = self.chmode
else:
mode = current_mode
if self.chown['uid']:
uid = self.chown['uid']
else:
uid = current_uid
if self.chown['gid']:
gid = self.chown['gid']
else:
gid = current_gid
directories_to_create.reverse()
for directory in directories_to_create:
try:
if mode:
os.mkdir(directory)
os.chmod(directory, mode)
else:
os.mkdir(directory)
self.chown_directory(directory)
except OSError:
self.output.set_error('failed to create directory: {}'.
format(directory))
return False
return True
def remove_directory(self, target_path):
if os.path.isdir(target_path) or os.path.exists(target_path):
try:
shutil.rmtree(target_path)
return True
except Exception:
self.output.set_error("failed to delete the directory: {}".
format(target_path))
return False
else:
self.output.set_error("failed to delete the directory: {}".
format(target_path))
self.output.set_error(
"target file is not directory or not exists.".
format(target_path))
return False
def clear_directory(self, target_path):
if os.path.isdir(target_path) and os.path.exists(target_path):
for node in os.scandir(target_path):
if node.is_dir():
if self.remove_directory(node.path):
return True
else:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
try:
os.remove(node.path)
return True
except OSError:
self.output.set_error('Failed to delete directory: {}'.
format(target_path))
else:
self.output.set_error('failed to delete directory: {}'.
format(target_path))
self.output.set_error(
'target file is not directory or does not exist: {}'.
format(target_path))
def link_directory(self, target_path):
if not self.source:
self.output.set_error(("'source' parameter is not defined for "
"'append = link' in template {}").
format(self.template_path))
return False
def replace_directory(self, target_path):
pass
def process_template_file(self, target_path, template_path,
template_text='', parameters={}):
self.template_text = template_text
self.template_path = template_path
# разбираем общие параметры шаблона.
self.autoupdate = parameters.pop('autoupdate', False)
self.force = parameters.pop('force', False)
self.mirror = parameters.pop('mirror', False)
self.protected = parameters.pop('protected', False)
self.format = parameters.pop('format', False)
self.source = parameters.pop('source', False)
self.run = parameters.pop('run', False)
self.exec = parameters.pop('exec', False)
self.chmod = parameters.pop('chmod', False)
self.chown = parameters.pop('chown', False)
if (self.source and parameters['append'] not in
{'link', 'join', 'after', 'before'}):
self.output.set_error(("'source' parameter is set without"
" a suitable 'append' parameter"
" in template {}").
format(self.template_path))
return False
try:
append_value = parameters.pop('append', False)
if not append_value:
self.output("'append' parameter is not defined in template {}".
format(self.template_path))
return False
self.append = self.DIRECTORY_APPENDS[append_value]
except KeyError:
self.output.set_error("Unknown 'append' value in template {}".
format(self.template_path))
return False
# получаем uid и gid значения из параметра chown.
if self.chown:
self.chown = self.get_chown_values()
if not self.chown:
return False
# временный параметр format будет заменен на автоопределение формата.
if not self.format:
self.output.set_error(
"'format' parameter is not defined in template {}".
format(self.template_path)
)
return False
def join_file(self, target_path):
pass
def before_file(self, target_path):
pass
def after_file(self, target_path):
pass
def replace_file(self, target_path):
pass
def remove_file(self, target_path):
try:
if os.path.islink(target_path):
try:
os.unlink(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the link: {}'.
format(target_path))
return False
if os.path.isfile(target_path):
try:
os.remove(target_path)
return True
except OSError:
self.output.set_error('Template error: {}'.
format(target_path))
self.output.set_error('Failed to delete the file: {}'.
format(target_path))
finally:
pass
def clear_file(self, target_path):
try:
with open(target_path, 'w') as f:
f.truncate(0)
except IOError:
self.output.set_error("Template error: {}".
format(self.template_path))
self.output.set_error("Failed to clear the file: {}".
format(target_path))
return False
def link_file(self, target_path):
pass
def get_parameter_names(self):
'''Временная замена механизма получения множества
поддерживаемых форматами параметров.'''
parameters_set = {'name', 'path', 'append', 'chmod', 'chown',
'autoupdate', 'env', 'link', 'force', 'symbolic',
'format', 'comment', 'protected', 'mirror', 'run',
'exec', 'env', 'stretch', 'convert', 'dotall',
'multiline', 'dconf', 'package', 'merge',
'postmerge', 'action', 'rebuild'}
return parameters_set
# возможно, создать отдельный класс для хранения имен имеющихся в системе.
def get_chown_values(self, chown: str):
"""Получить значения uid и gid из параметра chown."""
if chown and ':' in chown:
user_name, group_name = chown.split(':')
import pwd
try:
if self.base_directory == '/':
uid = pwd.getpwnam(user_name).pw_uid
else:
uid = self.get_uid_from_passwd(user_name)
except (KeyError, TypeError):
self.output.set_error(
"There is no such user in the system: {}".
format(user_name))
self.output.set_error(
"Wrong 'chown' value '{0}' in the template: {1}".
format(chown, self.template_path))
return False
import grp
try:
if self.base_directory == '/':
gid = grp.getgrnam(group_name).pw_gid
else:
gid = self.get_gid_from_group(group_name)
except (KeyError, TypeError):
self.output.set_error(
"There is no such group in the system: {}".
format(group_name))
self.output.set_error(
"Wrong 'chown' value '{0}' in the template: {1}".
format(chown, self.template_path))
return False
return {'uid': uid, 'gid': gid}
else:
self.output.set_error(
"Wrong 'chown' value '{0}' in the template: {1}".
format(chown, self.template_path))
return False
def get_uid_from_passwd(self, user_name: str):
"""Взять uid из chroot passwd файла."""
passwd_file_path = os.path.join(self.base_directory, 'etc/passwd')
passwd_dictionary = []
if os.path.exists(passwd_file_path):
with open(passwd_file_path, 'r') as passwd_file:
for line in passwd_file:
if line.startswith('#'):
continue
passwd_item = tuple(line.split(':')[0:3:2])
if (len(passwd_item) > 1 and passwd_item[0]
and passwd_item[0]):
passwd_dictionary.append(passwd_item)
passwd_dictionary = dict(passwd_dictionary)
return int(passwd_dictionary[user_name])
else:
self.output.set_error("passwd file was not found in {}".
format(passwd_file_path))
return False
def get_gid_from_group(self, group_name: str):
"""Взять gid из chroot group файла."""
group_file_path = os.path.join(self.base_directory, 'etc/group')
group_dictionary = []
if os.path.exists(group_file_path):
with open(group_file_path, 'r') as group_file:
for line in group_file:
if line.startswith('#'):
continue
group_item = tuple(line.split(':')[0:3:2])
if len(group_item) > 1 and group_item[0] and group_item[1]:
group_dictionary.append(group_item)
group_dictionary = dict(group_dictionary)
if group_name in group_dictionary:
return int(group_dictionary[group_name])
else:
self.output.set_error("'{0}' gid was not found in {1}".
format(group_name, group_file_path))
else:
self.output.set_error("group file was not found in {}".
format(group_file_path))
return False
def chown_directory(self, target_path):
"""Сменить владельца директории."""
try:
os.chown(target_path, self.chown['uid'], self.chown['gid'])
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown directory: {}'.
format(target_path))
return False
def chmod_directory(self, target_path):
"""Сменить права доступа к директории."""
try:
os.chmod(target_path, self.chmod)
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod directory: {}'.
format(target_path))
return False
def chown_file(self, target_path, check_existation=True):
"""Сменить владельца файла."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.lchown(target_path, self.chown['uid'], self.chown['gid'])
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chown file: {}'.format(target_path))
return False
def chmod_file(self, target_path, check_existation=True):
"""Сменить права доступа к директории."""
try:
if check_existation and not os.path.exists(target_path):
open(target_path, 'w').close()
os.chmod(target_path, self.chmod)
return True
except (OSError, Exception) as error:
# возможно потребуются дополнительные проверки.
self.output.set_error('Can not chmod file: {}'.format(target_path))
return False
def get_file_info(self, path, info='all'):
file_stat = os.stat(path)
if info == 'all':
return stat.S_IMODE(file_stat.st_mode), file_stat.st_uid,\
file_stat.st_gid
if info == 'mode':
return stat.S_IMODE(file_stat.st_mode)
if info == 'owner':
return file_stat.st_uid, file_stat.st_gid
def is_vfat(self, path):
if self.mounts is None:
self.mounts = Mounts()
if self.mounts.get_from_fstab(what=self.mounts.TYPE,
where=self.mounts.DIR,
is_in=path) in ('vfat', 'ntfs-3g',
'ntfs'):
return True
return False
def check_os_error(self, error, path):
if hasattr(error, 'errno') and error.errno == os.errno.EPERM:
if self.is_vfat(path):
return True
if hasattr(error, 'errno') and error.errno == os.errno.EACCES and\
'var/calculate/remote' in path:
return True
return False
class FormatDetector:
pass
class DirectoryProcessor:
chmod_regex = re.compile(r'\d{3}')
def __init__(self, action, datavars_module=Variables(), package='',
output_module=IOModule()):
self.INHERITABLE_PARAMETERS = {'chmod', 'chown', 'autoupdate',
'env', 'package', 'action'}
self.datavars_module = datavars_module
self.output = output_module
self.template_action = TemplateAction(output_module=self.output)
self.action = action
self.chroot_path = datavars_module.main.cl_chroot.path
self.parameters_set = self.template_action.get_parameter_names()
self.template_engine = TemplateEngine(
datavars_module=datavars_module,
parameters_set=self.parameters_set
)
self.template_paths = (self.datavars_module.
main.cl_template.path.split(','))
self.for_package = package
self.processed_packages = []
self.packages_to_merge = []
self.packages_file_paths = {}
def process_template_directories(self):
# Проходим каталоги из main.cl_template.path
for directory_path in self.template_paths:
directory_path = directory_path.strip()
entries = os.scandir(directory_path)
for node in entries:
self.walk_directory_tree(node.path,
self.chroot_path,
{}, set())
if self.for_package:
self.output.set_info('Processing packages from merge parameter...')
self.processed_packages.append(self.for_package)
self.merge_packages()
def merge_packages(self):
not_merged_packages = []
while self.packages_to_merge:
self.for_package = self.packages_to_merge.pop()
if self.for_package not in self.packages_file_paths:
self.output.set_error(
"Error: package '{0}' not found for action '{1}'.".
format(self.for_package, self.action)
)
not_merged_packages.append(self.for_package)
continue
for template_path in self.packages_file_paths[self.for_package]:
base_directory, queue = self.get_directories_queue(
template_path
)
first_directory = queue.pop()
first_directory_path = os.path.join(base_directory,
first_directory)
self.walk_directory_tree(first_directory_path,
self.chroot_path, {}, set(),
directories_queue=queue)
self.processed_packages.append(self.for_package)
if not_merged_packages:
self.output.set_error('Packages {} is not merged.'.
format(','.join(self.packages_to_merge)))
else:
self.output.set_success('All packages are merged...')
def get_directories_queue(self, path):
directories_queue = []
for base_dir in self.template_paths:
base_dir = base_dir.strip()
if path.startswith(base_dir):
base_directory = base_dir
break
while path != base_directory:
path, dir_name = os.path.split(path)
directories_queue.append(dir_name)
return base_directory, directories_queue
def walk_directory_tree(self, current_directory_path, target_path,
directory_parameters, directory_env,
directories_queue=[]):
template_files = []
template_directories = []
directory_name = os.path.basename(current_directory_path)
current_env = directory_env.copy()
current_target_path = target_path
directory_parameters = directory_parameters.copy()
entries = os.scandir(current_directory_path)
self.template_engine.change_directory(current_directory_path)
for node in entries:
# Временное, вместо этого добавить переменную ignore_files.
if node.name.endswith('.swp'):
continue
if node.is_dir():
template_directories.append(node.path)
elif node.is_file():
template_files.append(node.name)
# обрабатываем в первую очередь шаблон директории.
if '.calculate_directory' in template_files:
template_files.remove('.calculate_directory')
try:
self.template_engine.process_template('.calculate_directory',
env=current_env)
except ConditionFailed:
self.output.set_warning('Condition failed in {}'.
format(current_directory_path))
self.output.console_output.print_default('\n')
return
except Exception as error:
self.output.set_error('Template error: {}'.
format(str(error)))
return
directory_parameters.update(self.template_engine.parameters)
# Если есть параметр name -- меняем имя текущего каталога.
if 'name' in directory_parameters:
directory_name = directory_parameters['name']
# Если есть параметр path -- меняем текущий путь к целевому
# каталогу.
if 'path' in directory_parameters:
current_target_path = directory_parameters['path']
if directory_parameters.get('append', None) != 'skip':
current_target_path = os.path.join(current_target_path,
directory_name)
if directory_parameters.get('action', None) != self.action:
self.output.set_error(
'Action parameter not found or not relevant: {}'.
format(directory_name))
self.output.console_output.print_default('\n')
return
# Если шаблоны обрабатываются для одного пакета -- проверяем
# параметр package и собираем пути к директориям других
# пакетов, необходимые для merge.
if self.for_package:
if 'package' not in directory_parameters:
self.output.set_warning(
"'package' is not defined for {}".
format(current_directory_path)
)
return
elif directory_parameters['package'] != self.for_package:
package_name = directory_parameters['package']
if package_name in self.packages_file_paths:
self.packages_file_paths[package_name].append(
current_directory_path
)
else:
self.packages_file_paths[package_name] =\
[current_directory_path]
self.output.set_warning(
"'package' parameter is not actual in {}".
format(current_directory_path)
)
return
# Если параметр env содержит несуществующий модуль -- шаблон не
# выполняем.
if 'env' in directory_parameters:
env_to_check = directory_parameters['env'].split(',')
for name in env_to_check:
if name.strip() not in self.datavars_module:
return
del(directory_parameters['env'])
# Если есть параметр merge -- собираем указанные в нем пакеты
# в списке имен пакетов для последующей обработки.
if self.for_package and 'merge' in directory_parameters:
merge_list = directory_parameters['merge'].split(',')
del(directory_parameters['merge'])
for package in merge_list:
self.packages_to_merge.append(package.strip())
# Выполняем действие с директорией.
self.output.set_success('Processing directory: {}'.
format(current_directory_path))
self.output.console_output.print_default(
'Directory parameters: {}\n'.
format(directory_parameters)
)
self.output.set_info(
'Actions using template directory. Target: {}'.
format(current_target_path)
)
# Оставляем только наследуемые параметры.
directory_parameters = {name: value for name, value in
directory_parameters.items()
if name in self.INHERITABLE_PARAMETERS}
if 'chmod' in directory_parameters:
match = self.chmod_regex.search(
str(directory_parameters['chmod'])
)
if match:
del(directory_parameters['chmod'])
if directories_queue:
next_node = directories_queue.pop()
if next_node in template_files:
template_files = [next_node]
template_directories = []
else:
next_directory_path = os.path.join(current_directory_path,
next_node)
template_files = []
template_directories = [next_directory_path]
# обрабатываем файлы шаблонов хранящихся в директории.
for template_name in template_files:
template_parameters = {}
template_parameters = directory_parameters.copy()
try:
self.template_engine.process_template(template_name,
env=current_env.copy())
except ConditionFailed:
self.output.set_warning('Condition failed for: {}'.
format(template_name))
continue
except Exception as error:
self.output.set_error('Template error: {}'.
format(str(error)))
continue
template_parameters.update(self.template_engine.parameters)
template_text = self.template_engine.template_text
if template_parameters.get('action', None) != self.action:
self.output.set_warning(
'Action parameter not found or not relevant.'
)
continue
if 'action' in template_parameters:
del(template_parameters['action'])
if self.for_package:
if 'package' not in template_parameters:
self.output.set_warning("'package' is not defined for {}".
format(template_name))
continue
elif template_parameters['package'] != self.for_package:
package_name = template_parameters['package']
template_path = os.path.join(current_directory_path,
template_name)
if package_name in self.packages_file_paths:
self.packages_file_paths[package_name].append(
template_path
)
else:
self.packages_file_paths[package_name] =\
[template_path]
self.output.set_warning(
"'package' parameter is not actual in {}".
format(template_name)
)
continue
if 'merge' in template_parameters:
merge_list = template_parameters['merge'].split(',')
del(template_parameters['merge'])
for package_name in merge_list:
if package_name not in self.processed_packages:
self.packages_to_merge.append(package_name.strip())
# Если параметр env содержит несуществующий модуль -- шаблон не
# выполняем.
if 'env' in template_parameters:
env_to_check = template_parameters['env'].split(',')
for name in env_to_check:
if name.strip() not in self.datavars_module:
continue
del(template_parameters['env'])
if 'name' in template_parameters:
template_name = template_parameters['name']
if 'path' in template_parameters:
target_file_path = os.path.join(template_parameters['path'],
template_name)
else:
target_file_path = os.path.join(current_target_path,
template_name)
# Выполняем действие с использованием шаблона.
self.output.set_success('Processing template: {}...'.
format(template_name))
self.output.console_output.print_default(
'With parameters: {}\n'.
format(template_parameters)
)
self.output.set_info('Target file: {}.\nTemplate text:'.
format(target_file_path))
self.output.console_output.print_default(template_text + '\n\n')
# проходимся далее по директориям.
for directory_path in template_directories:
self.walk_directory_tree(directory_path, current_target_path,
directory_parameters, current_env,
directories_queue=directories_queue)