5
0
Derivar 0
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

1817 linhas
80 KiB

Este ficheiro contém caracteres Unicode ambíguos!

Este ficheiro contém caracteres Unicode ambíguos que podem ser confundidos com outros da sua configuração regional vigente. Se o uso é intencional e legítimo, pode ignorar este aviso com segurança. Use o botão Revelar para realçar esses caracteres.

# vim: fileencoding=utf-8
#
from jinja2.ext import Extension
from jinja2.lexer import Token
from jinja2.parser import Parser
from jinja2 import (
nodes,
Template,
Environment,
pass_context,
FileSystemLoader,
TemplateSyntaxError,
)
from jinja2.utils import missing
from jinja2.runtime import Context, Undefined
from collections.abc import MutableMapping
from collections import OrderedDict
from importlib import import_module
from pprint import pprint
import copy
import re
import os
import stat
from typing import (
Optional,
Union,
List,
Any,
Dict,
Tuple,
Iterator,
)
from ..utils.package import (
PackageAtomParser,
PackageAtomError,
PackageAtomName,
NOTEXIST,
Package,
Version,
)
from ..utils.files import (
check_directory_link,
check_command,
join_paths,
FilesError,
)
from calculate.variables.datavars import (
VariableNotFoundError,
NamespaceNode,
VariableNode,
IntegerType,
FloatType,
ListType,
HashType,
IniType,
)
from calculate.utils.fs import readFile
from calculate.variables.loader import Datavars
import calculate.templates.template_filters as template_filters
# Типы шаблона: директория или файл.
DIR, FILE, LINK = range(3)
# Словарь, в котором можно регистрировать фильтры.
CALCULATE_FILTERS = {"cut": template_filters.cut}
class IncorrectParameter(Exception):
pass
class DefaultParameterError(Exception):
pass
class SaveError(Exception):
pass
class ConditionFailed(TemplateSyntaxError):
pass
class Variables(MutableMapping):
'''Класс-заглушка вместо модуля переменных для тестов.'''
def __init__(self, *args, **kwargs):
self.__attrs: dict = dict(*args, **kwargs)
self.__iter: Union[Iterator, None] = None
def __next__(self) -> Any:
if self._iter is None:
self._iter = iter(self.__attrs)
return next(self._iter)
def __getattribute__(self, name: str) -> Any:
if name == '_Variables__attrs':
return super().__getattribute__(name)
if name == 'available_packages':
return super().__getattribute__(name)
if name == '_variables':
return self.__attrs
if name == '_iter':
return self._iter
try:
return self.__attrs[name]
except KeyError:
raise AttributeError(name)
@property
def available_packages(self) -> set:
packages = set(self.__attrs.keys())
packages.update({'custom'})
return packages
def __getitem__(self, name: str) -> Any:
return self.__attrs[name]
def __setitem__(self, name: str, value: Any) -> None:
self.__attrs[name] = value
def __delitem__(self, name: str) -> None:
del self.__attrs[name]
def __iter__(self) -> Iterator:
return iter(self.__attrs)
def __len__(self) -> int:
return len(self.__attrs)
def __repr__(self) -> str:
return '<Variables {}>'.format(self.__attrs)
def __contains__(self, name: str) -> bool:
return name in self.__attrs
def __hash__(self) -> int:
return hash(id(self))
class ParametersProcessor:
'''Класс для проверки и разбора параметров шаблона.'''
available_parameters: set = {'name', 'path', 'append', 'chmod', 'chown',
'autoupdate', 'env', 'force', 'source',
'format', 'unbound', 'mirror', 'run', 'exec',
'env', 'package', 'merge', 'postmerge',
'action', 'rebuild', 'restart', 'stop',
'start', 'handler', 'notify', 'group',
'convert', 'stretch', 'execsql'}
inheritable_parameters: set = {'chmod', 'chown', 'autoupdate', 'env',
'package', 'action', 'handler', 'group'}
# Параметры по умолчанию для файлов --
# будут заполняться из __datavars__
file_default_parameters: dict = {}
# Параметры по умолчанию для директорий --
# будут заполняться из __datavars__
directory_default_parameters: dict = {}
available_appends: set = set()
available_formats: dict = dict()
format_is_inspected: bool = False
chmod_value_regular = re.compile(
r'([r-][w-][Xx-])([r-][w-][Xx-])([r-][w-][Xx-])')
def __init__(self,
parameters_container: Optional["ParametersContainer"] = None,
chroot_path: str = '/',
datavars_module: Union[Datavars,
NamespaceNode,
Variables] = Variables(),
for_package: Optional[Package] = None):
self.chroot_path: str = chroot_path
self.template_type: int = DIR
self.datavars_module: Union[Datavars,
NamespaceNode,
Variables] = datavars_module
self._parameters_container: ParametersContainer = parameters_container
self.package_atom_parser: PackageAtomParser = PackageAtomParser(
chroot_path=chroot_path)
self._groups: dict = {}
try:
groups = list(datavars_module.main.cl.groups._variables.keys())
for group in groups:
if isinstance(datavars_module, (Datavars, NamespaceNode)):
packages = datavars_module.main.cl.groups[group].get_value(
).get_table()
else:
packages = datavars_module.main.cl.groups[group]
self._groups.update({group: packages})
except Exception:
pass
self._inspect_formats_package()
self._for_package: Union[Package, None] = for_package
# Если добавляемый параметр нуждается в проверке -- добавляем сюда
# метод для проверки.
self.checkers_list = OrderedDict({
'package': self.check_package_parameter,
'group': self.check_group_parameter,
'append': self.check_append_parameter,
'rebuild': self.check_rebuild_parameter,
'restart': self.check_restart_parameter,
'run': self.check_run_parameter,
'exec': self.check_exec_parameter,
'stop': self.check_stop_parameter,
'start': self.check_start_parameter,
'chown': self.check_chown_parameter,
'chmod': self.check_chmod_parameter,
'autoupdate': self.check_autoupdate_parameter,
'source': self.check_source_parameter,
'force': self.check_force_parameter,
'env': self.check_env_parameter,
'merge': self.check_merge_parameter,
'format': self.check_format_parameter,
'handler': self.check_handler_parameter,
'notify': self.check_notify_parameter,
'convert': self.check_convert_parameter,
'stretch': self.check_stretch_parameter,
'execsql': self.check_execsql_parameter,
})
# Если добавляемый параметр должен быть проверен после того, как
# будет закончен парсинг всех других параметров -- добавляем сюда метод
# для проверки.
self.postparse_checkers_list = OrderedDict({
'package': self.check_postparse_package,
'append': self.check_postparse_append,
'source': self.check_postparse_source,
'autoupdate': self.check_postparse_autoupdate,
'run': self.check_postparse_run,
'exec': self.check_postparse_exec,
'handler': self.check_postparse_handler,
'convert': self.check_postparse_convert,
'stretch': self.check_postparse_stretch,
'execsql': self.check_postparse_execsql,
})
# Если параметр является наследуемым только при некоторых условиях --
# указываем здесь эти условия.
self.inherit_conditions = {'chmod': self.is_chmod_inheritable}
def set_parameters_container(self,
parameters_container: "ParametersContainer"
) -> None:
'''Метод для установки текущего контейнера параметров.'''
self._parameters_container = parameters_container
self._added_parameters = set()
@property
def for_package(self) -> Union[Package, None]:
return self._for_package
@for_package.setter
def for_package(self, package: Package) -> None:
self._for_package = package
def __getattr__(self, parameter_name: str) -> Any:
if parameter_name not in self.available_parameters:
raise IncorrectParameter("Unknown parameter: '{}'".
format(parameter_name))
elif parameter_name not in self._parameters_container:
return False
else:
return self._parameters_container[parameter_name]
def check_template_parameter(self, parameter_name: str,
parameter_value: Any,
template_type: int, lineno: int) -> None:
'''Метод, проверяющий указанный параметр.'''
self.lineno = lineno
self.template_type = template_type
if parameter_name not in self.available_parameters:
raise IncorrectParameter("Unknown parameter '{0}'".
format(parameter_name))
elif parameter_name in self.checkers_list:
checked_value = self.checkers_list[parameter_name](parameter_value)
else:
checked_value = parameter_value
# Способ пропустить параметр, если он корректен, но добавлять не нужно
if checked_value is None:
return
self._added_parameters.add(parameter_name)
if (parameter_name in self.inheritable_parameters and
self.template_type == DIR):
if parameter_name in self.inherit_conditions:
if self.inherit_conditions[parameter_name](
parameter_value):
self._parameters_container.set_inheritable(
{parameter_name: checked_value})
return
else:
self._parameters_container.set_inheritable(
{parameter_name: checked_value})
return
self._parameters_container.set_parameter({parameter_name:
checked_value})
def check_postparse_parameters(self) -> None:
'''Метод, запускающий проверку параметров после их разбора.'''
for parameter, parameter_checker in\
self.postparse_checkers_list.items():
if parameter not in self._added_parameters:
continue
parameter_value = self._parameters_container[parameter]
result = parameter_checker(parameter_value)
if result is not None:
self._parameters_container.change_parameter(parameter,
result)
def check_template_parameters(self, parameters: dict,
template_type: int, lineno: int) -> None:
'''Метод, запускающий проверку указанных параметров.'''
self.template_type = template_type
self.lineno = lineno
for parameter_name in parameters:
if parameter_name not in self.available_parameters:
raise IncorrectParameter("Unknown parameter '{0}'".
format(parameter_name))
elif parameter_name in self.checkers_list:
checked_value = self.checkers_list[parameter_name](
parameters[parameter_name]
)
else:
checked_value = parameters[parameter_name]
if (template_type == DIR and
parameter_name in self.inheritable_parameters):
if parameter_name in self.inherit_conditions:
if self.inherit_conditions[parameter_name](
parameters[parameter_name]):
self._parameters_container.set_inheritable(
parameter_name=checked_value)
continue
else:
self._parameters_container.set_inheritable(
parameter_name=checked_value)
continue
self._parameters_container.set_parameter(
parameter_name=checked_value)
# Методы для проверки параметров во время разбора шаблона.
def check_package_parameter(self, parameter_value: Any) -> str:
if not isinstance(parameter_value, str):
raise IncorrectParameter("'package' parameter must have value of"
" the 'str' type, not"
f" {type(parameter_value)}")
return parameter_value
def check_group_parameter(self, parameter_value: Any) -> List[str]:
if isinstance(parameter_value, str):
result = [group.strip() for group in parameter_value.split(',')]
elif isinstance(parameter_value, (list, tuple)):
result = parameter_value if isinstance(parameter_value, list) else\
list(parameter_value)
else:
raise IncorrectParameter("'package' parameter must have value of"
" the 'str', 'list' or 'tuple' type, not"
f" {type(parameter_value)}")
for group in result:
if group != "install" and group not in self._groups:
raise IncorrectParameter(f"'group' parameter value '{group}'"
" is not available. Available values:"
f" {', '.join(self._groups.keys())}")
return result
def check_append_parameter(self, parameter_value: Any) -> str:
if parameter_value not in self.available_appends:
raise IncorrectParameter("Unacceptable value '{}' of parameter"
" 'append'".format(parameter_value))
return parameter_value
def check_merge_parameter(self, parameter_value: Any
) -> List[PackageAtomName]:
packages_list = []
packages_names = parameter_value.split(',')
for package_name in packages_names:
package_name = package_name.strip()
try:
atom_object = self.package_atom_parser.\
parse_package_parameter(package_name)
packages_list.append(atom_object)
except PackageAtomError:
continue
return packages_list
def check_rebuild_parameter(self, parameter_value: Any) -> bool:
if not isinstance(parameter_value, bool):
raise IncorrectParameter("'rebuild' parameter value is not bool")
elif 'package' not in self._parameters_container:
raise IncorrectParameter(("'rebuild' parameter is set without "
"'package' parameter"))
return parameter_value
def check_restart_parameter(self, parameter_value: Any) -> str:
if parameter_value and isinstance(parameter_value, str):
return parameter_value
else:
raise IncorrectParameter(
"'restart' parameter value is not correct")
def check_stop_parameter(self, parameter_value: Any) -> str:
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'stop' parameter value is empty")
return parameter_value
def check_start_parameter(self, parameter_value: Any) -> str:
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'start' parameter value is empty")
return parameter_value
def check_run_parameter(self, parameter_value: Any) -> str:
if self.template_type == DIR:
raise IncorrectParameter("'run' parameter is not available in"
" directory templates")
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'run' parameter value is empty")
try:
interpreter_path = check_command(parameter_value)
except FilesError:
raise IncorrectParameter("interpreter from 'run' parameter not"
" found")
return interpreter_path
def check_exec_parameter(self, parameter_value: Any) -> str:
if self.template_type == DIR:
raise IncorrectParameter("'exec' parameter is not available in"
" directory templates")
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'exec' parameter value is empty")
try:
interpreter_path = check_command(parameter_value)
except FilesError:
raise IncorrectParameter("interpreter from 'exec' parameter is not"
" found")
return interpreter_path
def check_chown_parameter(self, parameter_value: Any) -> dict:
if not parameter_value or isinstance(parameter_value, bool):
raise IncorrectParameter("'chown' parameter value is empty.")
parameter_value = self.get_chown_values(parameter_value)
return parameter_value
def check_chmod_parameter(self, parameter_value: Any) -> Union[int, tuple]:
result = self.chmod_value_regular.search(parameter_value)
if result:
return self._translate_symbol_chmod(result)
elif parameter_value.isdigit():
parameter_value = int(parameter_value, 8)
return parameter_value
else:
raise IncorrectParameter("'chmod' parameter value is not correct")
def _translate_symbol_chmod(self, result) -> Tuple[int, int]:
'''Метод для перевода буквенного значения chmod в числовое.
Возвращает кортеж (chmod, x_mask):
chmod -- число, полученное из последовательности битов, где
"r", "w" и "x" -> 1, "-" и "X" -> 0;
x_mask -- маска, полученная из последовательности битов, где
"X" -> 1, "r", "w", "-" и "x" -> 0. Она необходима для получения
значения chmod для файлов.'''
chmod = ''
x_mask = ''
for group_index in range(3):
group = result.groups()[group_index]
for sym_index in range(3):
if group[sym_index] in {'-', 'X'}:
chmod = chmod + '0'
else:
chmod = chmod + '1'
if group[sym_index] == 'X':
x_mask = x_mask + "1"
else:
x_mask = x_mask + "0"
return (int(chmod, 2), int(x_mask, 2))
def check_source_parameter(self, parameter_value: Any
) -> Union[str, Tuple[bool, str]]:
if not parameter_value or isinstance(parameter_value, bool):
raise IncorrectParameter("'source' parameter value is empty")
if self.chroot_path != '/':
real_path = join_paths(self.chroot_path, parameter_value)
else:
real_path = parameter_value
# Ставим True, чтобы потом проверить этот параметр в postparse
if not os.path.exists(real_path):
return (False, real_path)
source_file_type = DIR if os.path.isdir(real_path) else FILE
# Проверяем, совпадают ли типы шаблона и файла, указанного в source
if (self.template_type != source_file_type):
raise IncorrectParameter(
"the type of the 'source' file does not match"
" the type of the template file")
# Проверяем, не является ли файл из source зацикленной ссылкой.
if (source_file_type == DIR and os.path.islink(real_path)):
try:
check_directory_link(real_path, chroot_path=self.chroot_path)
except FilesError as error:
raise IncorrectParameter(
"the link from 'source' is not correct: {}".
format(str(error)))
return os.path.normpath(real_path)
def check_env_parameter(self, parameter_value: Any) -> Union[None, set]:
env_set = set()
for env_value in parameter_value.split(','):
env_value = env_value.strip()
name_parts = env_value.split('.')
namespace = self.datavars_module
for name in name_parts:
if name not in namespace:
raise ConditionFailed(
(f"Namespace '{env_value}' from 'env' parameter"
" does not exist."),
self.lineno)
namespace = namespace[name]
env_set.add(namespace)
CalculateContext._env_set.add(namespace)
# Если шаблон файла -- не добавляем env в контейнер,
# а только используем для рендеринга шаблона.
if self.template_type is None:
return None
if self._parameters_container.env:
env_set.union(self._parameters_container.env)
return env_set
def check_force_parameter(self, parameter_value: Any) -> bool:
if isinstance(parameter_value, bool):
return parameter_value
else:
raise IncorrectParameter("'force' parameter value is not bool")
def check_autoupdate_parameter(self, parameter_value: Any) -> bool:
if isinstance(parameter_value, bool):
return parameter_value
else:
raise IncorrectParameter(
"'autoupdate' parameter value is not bool.")
def check_format_parameter(self, parameter_value: Any) -> str:
if self.template_type == DIR:
raise IncorrectParameter("'format' parameter is redundant for"
" directory templates.")
if isinstance(parameter_value, str):
if parameter_value not in self.available_formats:
raise IncorrectParameter(f"'{parameter_value}' value of the"
" 'format' parameter is not"
" available.")
return parameter_value
raise IncorrectParameter("'format' parameter must be string value not"
f" {type(parameter_value)}.")
def check_handler_parameter(self, parameter_value: Any) -> str:
if not isinstance(parameter_value, str):
raise IncorrectParameter("'handler' parameter must be string"
f" value not {type(parameter_value)}.")
return parameter_value
def check_notify_parameter(self, parameter_value: Any) -> List[str]:
if isinstance(parameter_value, list):
return parameter_value
elif isinstance(parameter_value, str):
return [parameter.strip() for parameter in
parameter_value.split(',')]
raise IncorrectParameter("'notify' parameter must be string or list"
f" value not {type(parameter_value)}.")
def check_convert_parameter(self, parameter_value: Any) -> str:
if not isinstance(parameter_value, str):
raise IncorrectParameter("'convert' parameter value must be string"
f" not '{type(parameter_value)}'.")
parameter_value = parameter_value.strip().upper()
try:
available_image_formats =\
self.datavars_module.main.cl_image_formats
except VariableNotFoundError:
# TODO возможно стоит кидать ошибку.
available_image_formats = ["JPEG", "PNG", "GIF", "JPG"]
if parameter_value not in available_image_formats:
raise IncorrectParameter(f"'{parameter_value}' image format is "
"not available. Available image formats: "
f"'{', '.join(available_image_formats)}.'"
)
return parameter_value
def check_stretch_parameter(self, parameter_value: Any) -> bool:
if not isinstance(parameter_value, bool):
raise IncorrectParameter("'stretch' parameter value should be bool"
f" value not '{type(parameter_value)}'")
return parameter_value
def check_execsql_parameter(self, parameter_value: Any) -> bool:
if not isinstance(parameter_value, str):
raise IncorrectParameter("'execsql' parameter value should be str"
f" value not '{type(parameter_value)}'")
available_values = {"rollback", "stop", "continue"}
parameter_value = parameter_value.lower().strip()
if parameter_value not in available_values:
raise IncorrectParameter("'exesql' parameter value"
f" '{parameter_value}' is not available."
" Available values: "
f"{', '.join(available_values)}")
return parameter_value
# Методы для проверки параметров после разбора всего шаблона.
def check_postparse_append(self, parameter_value: str) -> None:
if parameter_value == 'link':
if 'source' not in self._parameters_container:
raise IncorrectParameter("append = 'link' without source "
"parameter.")
if self._parameters_container.run:
raise IncorrectParameter("'append' parameter is not 'compatible' "
"with the 'run' parameter")
if self._parameters_container.exec:
raise IncorrectParameter("'append' parameter is not 'compatible' "
"with the 'exec' parameter")
def check_postparse_run(self, parameter_value: str) -> None:
if self._parameters_container.append:
raise IncorrectParameter("'run' parameter is not 'compatible' "
"with the 'append' parameter")
if self._parameters_container.exec:
raise IncorrectParameter("'run' parameter is not 'compatible' "
"with the 'exec' parameter")
def check_postparse_exec(self, parameter_value: str) -> None:
if self._parameters_container.append:
raise IncorrectParameter("'exec' parameter is not 'compatible' "
"with the 'append' parameter")
if self._parameters_container.run:
raise IncorrectParameter("'exec' parameter is not 'compatible' "
"with the 'run' parameter")
def check_postparse_source(self,
parameter_value: Union[str, Tuple[bool, str]]
) -> None:
# Если файл по пути source не существует, но присутствует параметр
# mirror -- пропускаем шаблон для того, чтобы целевой файл мог быть
# удален в исполнительном модуле.
if isinstance(parameter_value, tuple):
if ((self._parameters_container.append == "link" and
self._parameters_container.force)
or self._parameters_container.format == "backgrounds"):
self._parameters_container['source'] = parameter_value[1]
elif not self._parameters_container.mirror:
raise IncorrectParameter(
"File from 'source' parameter does not exist")
elif (self.template_type == DIR and
('append' not in self._parameters_container or
self._parameters_container['append'] != 'link')):
raise IncorrectParameter(
("'source' parameter is set without "
"append = 'link' for directory template")
)
def check_postparse_autoupdate(self, parameter_value: bool) -> None:
if self._parameters_container.unbound:
raise IncorrectParameter("'unbound' parameter is incompatible"
" with 'autoupdate' parameter")
def check_postparse_handler(self, parameter_value: bool) -> None:
if self._parameters_container.merge:
raise IncorrectParameter("'merge' parameter is not available"
" in handler templates")
elif (self._parameters_container.package and
not self._parameters_container.is_inherited('package')):
raise IncorrectParameter("'package' parameter is not available"
" in handler templates")
def check_postparse_package(self, parameter_value: str) -> None:
groups = []
package_atom = PackageAtomParser.parse_atom_name(parameter_value)
if (self._parameters_container is None
or not self._parameters_container.group):
# Если параметр group не задан или метод используется для проверки
# отдельного параметра package -- делаем только проверку install.
# Предполагающую проверку существования пакета.
groups.append('install')
else:
groups = self._parameters_container.group
for group in groups:
if group == 'install':
try:
result = self.package_atom_parser.parse_package_parameter(
package_atom)
return result
except PackageAtomError as error:
if error.errno != NOTEXIST:
raise IncorrectParameter(error.message)
elif self._check_package_group(package_atom,
self._groups[group]):
if (self._parameters_container is not None
and self._parameters_container.package):
self._parameters_container.remove_parameter('package')
return
raise ConditionFailed(f"package '{parameter_value}'"
" does not match the template condition",
self.lineno if hasattr(self, 'lineno') else 0)
def _check_package_group(self, package: dict, group_packages: list
) -> bool:
'''Метод для проверки соответствия описания пакета, заданного словарем,
какому-либо описанию пакета, заданного в переменных groups.'''
for group_package in group_packages:
for parameter in ['category', 'name', 'version', 'slot']:
if package[parameter] is not None:
if (group_package[parameter] is None
or group_package[parameter] != package[parameter]):
break
else:
if package['use_flags'] is not None:
if group_package['use_flags'] is None:
continue
else:
for use_flag in package['use_flags']:
if use_flag not in group_package['use_flags']:
continue
return True
return False
def check_postparse_convert(self, parameter_value: str) -> None:
template_format = self._parameters_container.format
if not template_format or template_format != "backgrounds":
raise IncorrectParameter("'convert' parameter available for"
" 'backgrounds' format only.")
def check_postparse_stretch(self, parameter_value: str) -> None:
template_format = self._parameters_container.format
if not template_format or template_format != "backgrounds":
raise IncorrectParameter("'stretch' parameter available for"
" 'backgrounds' format only.")
def check_postparse_execsql(self, parameter_value: str) -> None:
template_format = self._parameters_container.format
if not template_format or template_format != "sqlite":
raise IncorrectParameter("'execsql' parameter available for"
" 'sqlite' format only.")
# Методы для проверки того, являются ли параметры наследуемыми.
def is_chmod_inheritable(self, parameter_value: str) -> bool:
chmod_regex = re.compile(r'\d+')
if chmod_regex.search(parameter_value):
return False
return True
def get_chown_values(self, chown: str) -> dict:
"""Получить значения uid и gid из параметра chown."""
if chown and ':' in chown:
user_name, group_name = chown.split(':')
if user_name.isdigit():
uid = int(user_name)
else:
import pwd
try:
if self.chroot_path == '/':
uid = pwd.getpwnam(user_name).pw_uid
else:
uid = self.get_uid_from_passwd(user_name)
except (FilesError, KeyError, TypeError) as error:
raise IncorrectParameter(
"'chown = {}' parameter check is failed: {}".
format(chown, str(error)))
if group_name.isdigit():
gid = int(group_name)
else:
import grp
try:
if self.chroot_path == '/':
gid = grp.getgrnam(group_name).gr_gid
else:
gid = self.get_gid_from_group(group_name)
except (FilesError, KeyError, TypeError) as error:
raise IncorrectParameter(
"'chown = {}' parameter check is failed: {}".
format(chown, str(error)))
return {'uid': uid, 'gid': gid}
else:
raise IncorrectParameter("'chown' value '{0}' is not correct".
format(chown))
def get_uid_from_passwd(self, user_name: str) -> int:
"""Функция для получения uid из chroot passwd файла."""
passwd_file_path = os.path.join(self.chroot_path, 'etc/passwd')
passwd_dictionary = dict()
if os.path.exists(passwd_file_path):
with open(passwd_file_path, 'r') as passwd_file:
for line in passwd_file:
if line.startswith('#'):
continue
passwd_item = tuple(line.split(':')[0:3:2])
if (len(passwd_item) > 1 and passwd_item[0]):
passwd_dictionary[passwd_item[0]] = passwd_item[1]
if user_name in passwd_dictionary:
return int(passwd_dictionary[user_name])
else:
raise FilesError("'{0}' uid was not found in {1}".
format(user_name, passwd_file_path))
else:
raise FilesError("passwd file was not found in {}".
format(passwd_file_path))
def get_gid_from_group(self, group_name: str) -> int:
"""Функция для получения gid из chroot group файла."""
group_file_path = os.path.join(self.chroot_path, 'etc/group')
group_dictionary = dict()
if os.path.exists(group_file_path):
with open(group_file_path, 'r') as group_file:
for line in group_file:
if line.startswith('#'):
continue
group_item = tuple(line.split(':')[0:3:2])
if len(group_item) > 1 and group_item[0] and group_item[1]:
group_dictionary[group_item[0]] = group_item[1]
if group_name in group_dictionary:
return int(group_dictionary[group_name])
else:
raise FilesError("'{0}' gid was not found in {1}".
format(group_name, group_file_path))
else:
raise FilesError("group file was not found in {}".
format(group_file_path))
@classmethod
def _inspect_formats_package(cls) -> None:
'''Метод для определения множества доступных форматов и
предоставляемых ими параметров.'''
if cls.format_is_inspected:
return
parameters_set = set()
available_formats = dict()
format_directory_path = os.path.join(os.path.dirname(__file__),
'format')
for module_name in os.listdir(format_directory_path):
if (os.path.isdir(os.path.join(format_directory_path,
module_name)) or
module_name == '__init__.py'):
continue
if module_name.endswith('.py'):
module_name = module_name[:-3]
try:
module = import_module('calculate.templates.format.{}'.
format(module_name))
for obj in dir(module):
# TODO изменить способ выборки классов форматов.
if obj.endswith('Format') and obj != 'BaseFormat':
format_class = getattr(module, obj, False)
if format_class:
format_name = getattr(format_class,
'FORMAT', False)
if not format_name:
continue
available_formats.update(
{format_name: format_class})
format_parameters = getattr(format_class,
'FORMAT_PARAMETERS',
set())
parameters_set.update(format_parameters)
except Exception:
continue
cls.available_formats = available_formats
cls.available_parameters.update(parameters_set)
cls.formats_inspected = True
def resolve_or_missing(context: "CalculateContext",
key: str, missing=missing,
env: Optional[set] = None) -> Any:
'''Переопределение функции из для поиска значений переменных из jinja2.
Ищет переменные в datavars.'''
if env is None:
env = {}
datavars = context.parent['__datavars__']
if key in context.vars:
return context.vars[key]
if key in context.parent:
return context.parent[key]
if key in datavars:
return datavars[key]
for namespace in env:
if key in namespace:
return namespace[key]
return missing
class CalculateContext(Context):
'''Класс контекста позволяющий использовать значения datavars и
сохранять их.'''
_env_set = set()
def __init__(self, environment: Environment,
parent: Union[Context, Dict[str, Any]],
name: str, blocks: Dict[str, Any], **kwargs):
super().__init__(environment, parent, name, blocks, **kwargs)
def resolve(self, key: str) -> Any:
if self._legacy_resolve_mode:
rv = resolve_or_missing(self, key,
env=self._env_set)
else:
rv = self.resolve_or_missing(key)
if rv is missing:
return self.environment.undefined(name=key)
return rv
def resolve_or_missing(self, key: str) -> Any:
if self._legacy_resolve_mode:
rv = self.resolve(key)
if isinstance(rv, Undefined):
rv = missing
return rv
return resolve_or_missing(self, key,
env=self._env_set)
class ParametersContainer(MutableMapping):
'''Класс для хранения параметров, взятых из шаблона, и передачи
их шаблонизатору.'''
def __init__(self, parameters_dictionary: Optional[dict] = None):
# Слой ненаследуемых параметров.
self.__parameters: dict = {}
# Слой наследуемых параметров.
if parameters_dictionary is not None:
self.__inheritable: dict = parameters_dictionary
else:
self.__inheritable: dict = {}
def set_parameter(self, item_to_add: dict) -> None:
self.__parameters.update(item_to_add)
def set_inheritable(self, item_to_add: dict) -> None:
self.__inheritable.update(item_to_add)
def get_inheritables(self) -> "ParametersContainer":
return ParametersContainer(copy.deepcopy(self.__inheritable))
def remove_not_inheritable(self) -> None:
self.__parameters.clear()
def print_parameters_for_debug(self) -> None:
print('Parameters:')
pprint(self.__parameters)
print('Inherited:')
pprint(self.__inheritable)
def is_inherited(self, parameter_name: str) -> bool:
return (parameter_name not in self.__parameters
and parameter_name in self.__inheritable)
def remove_parameter(self, parameter_name: str) -> None:
if parameter_name in self.__parameters:
self.__parameters.pop(parameter_name)
elif parameter_name in self.__inheritable:
self.__inheritable.pop(parameter_name)
def change_parameter(self, parameter: str, value: Any) -> None:
if parameter in self.__parameters:
self.__parameters.update({parameter: value})
elif parameter in self.__inheritable:
self.__inheritable.update({parameter: value})
def _clear_container(self) -> None:
self.__parameters.clear()
self.__inheritable.clear()
def __getattr__(self, parameter_name: str) -> Any:
if (parameter_name not in
ParametersProcessor.available_parameters):
raise IncorrectParameter("Unknown parameter: '{}'".
format(parameter_name))
if parameter_name in self.__parameters:
return self.__parameters[parameter_name]
elif parameter_name in self.__inheritable:
return self.__inheritable[parameter_name]
else:
return False
def __getitem__(self, name: str) -> Any:
if name in self.__parameters:
return self.__parameters[name]
elif name in self.__inheritable:
return self.__inheritable[name]
else:
return False
def __setitem__(self, name: str, value: Any) -> None:
self.__parameters[name] = value
def __delitem__(self, name: str) -> None:
if name in self.__parameters:
del self.__parameters[name]
if name in self.__inheritable:
del self.__inheritable[name]
def __iter__(self) -> Iterator[str]:
return iter(set(self.__parameters).union(self.__inheritable))
def __len__(self) -> int:
return len(set(self.__parameters).union(self.__inheritable))
def __repr__(self) -> str:
return '<ParametersContainer: parameters={0}, inheritables={1}>'.\
format(self.__parameters, self.__inheritable)
def __contains__(self, name: str) -> bool:
return name in self.__parameters or name in self.__inheritable
@property
def parameters(self) -> dict:
return self.__parameters
class CalculateExtension(Extension):
'''Класс расширения для jinja2, поддерживающий теги calculate-шаблонов.'''
_parameters_set = set()
# Виды операций в теге save.
ASSIGN, APPEND, REMOVE = range(3)
def __init__(self, environment: Environment,
parameters_processor: ParametersProcessor,
datavars_module: Union[Datavars,
NamespaceNode,
Variables] = Variables(),
chroot_path: str = "/"):
super().__init__(environment)
self.environment: Environment = environment
self.package_atom_parser = PackageAtomParser(chroot_path=chroot_path)
self.environment.globals.update({'pkg': self.pkg,
'grep': self.grep,
'exists': self.exists})
self._datavars = datavars_module
self.parameters_processor = parameters_processor
self.template_type: int = DIR
# Флаг, указывающий, что тег calculate уже был разобран. Нужен для
# того, чтобы проверять единственность тега calculate.
self.calculate_parsed: bool = False
self.tags = {'calculate', 'save'}
self.CONDITION_TOKENS_TYPES = {'eq', 'ne', 'lt', 'gt', 'lteq', 'gteq'}
self.CONDITION_NAME_TOKENS = {'not'}
self.LITERAL_TOKENS_TYPES = {'string', 'integer', 'float'}
if hasattr(self._datavars, 'variables_to_save'):
self.TARGET_FILES_SET =\
set(self._datavars.variables_to_save.keys())
else:
self.TARGET_FILES_SET = set()
self.parse_methods = {'calculate': self.parse_calculate,
'save': self.parse_save}
def __call__(self, env: Environment) -> "CalculateExtension":
# Необходимо для обеспечения возможности передать готовый объект
# расширения, а не его класс.
# Через функцию-фабрику такое не получается устроить, потому что при
# добавлении расширения jinja2 ищет у его класса атрибут identifier.
return self
def parse(self, parser: Parser) -> List[nodes.Output]:
self.parser = parser
self.stream = parser.stream
tag_token = self.stream.current.value
return [self.parse_methods[tag_token]()]
def parse_save(self) -> nodes.Output:
'''Метод для разбора тега save, сохраняющего значение указанной
переменной datavars.'''
lineno = next(self.stream).lineno
target_file = nodes.Const('', lineno=lineno)
# Получаем имя целевого файла.
if self.stream.skip_if('dot'):
target_file_name = self.stream.expect('name').value
if target_file_name in self.TARGET_FILES_SET:
target_file = nodes.Const(target_file_name)
else:
raise TemplateSyntaxError("Unknown target file '{}'".
format(target_file_name),
lineno=lineno)
# получаем список из имени переменной.
module_name = self.stream.expect('name').value
variable_name = [nodes.Const(module_name, lineno=lineno)]
while self.stream.skip_if('dot'):
name = self.stream.expect('name').value
variable_name.append(nodes.Const(name, lineno=lineno))
variable_name = nodes.List(variable_name, lineno=lineno)
if self.stream.skip_if('assign'):
return self._make_save_node(variable_name,
target_file,
self.ASSIGN,
lineno)
elif self.stream.skip_if('add') and self.stream.skip_if('assign'):
return self._make_save_node(variable_name,
target_file,
self.APPEND,
lineno)
elif self.stream.skip_if('sub') and self.stream.skip_if('assign'):
return self._make_save_node(variable_name,
target_file,
self.REMOVE,
lineno)
raise TemplateSyntaxError("'=' is expected in 'save' tag",
lineno=lineno)
def parse_calculate(self) -> nodes.Output:
'''Метод для разбора тега calculate, содержащего значения параметров и
условия выполнения шаблона.'''
lineno = next(self.stream).lineno
if self.calculate_parsed:
raise TemplateSyntaxError(
"template can only have one calculate tag.",
lineno=lineno)
expect_comma_flag = False
conditions = []
while self.stream.current.type != 'block_end':
if expect_comma_flag:
self.stream.expect('comma')
if (self.stream.current.type == 'name'
and self.stream.current.value in self._parameters_set
and self.stream.look().type != 'dot'
and self.stream.look().type not in
self.CONDITION_TOKENS_TYPES
and self.stream.current.value not in
self.CONDITION_NAME_TOKENS):
# разбираем параметр.
# pairs_list.append(self.get_parameter_node())
name_node, value_node = self._get_parameter()
check_node = self.call_method('check_parameter',
[name_node,
value_node,
nodes.ContextReference()],
lineno=lineno)
check_template_node = nodes.Template(
[nodes.Output([check_node])])
check_template_node = check_template_node.set_environment(
self.environment)
check_template = self.environment.from_string(
check_template_node)
check_template.render(__datavars__=self._datavars)
elif (self._is_variable_name(self.stream.current)
or self.stream.current.type in {'lparen', 'integer',
'float', 'string'}
or self.stream.current.value in self.CONDITION_NAME_TOKENS):
# разбираем условие. Если условие False -- кидаем исключение.
# condition_result = self.get_condition_result()
# if not condition_result:
# raise ConditionFailed('Condition is failed',
# lineno=self.stream.current.lineno)
conditions.append(self.parse_condition())
elif self.stream.current.type == 'name':
raise TemplateSyntaxError(
f"Unknown identifier '{self.stream.current.value}'"
" in calculate tag.",
lineno=self.stream.current.lineno)
else:
raise TemplateSyntaxError(
f"Can not parse token '{self.stream.current.value}'"
" in caluculate tag.",
lineno=self.stream.current.lineno)
expect_comma_flag = True
self.parameters_processor.check_postparse_parameters()
self.check_conditions(conditions)
self.calculate_parsed = True
return nodes.Output([nodes.Const('')], lineno=lineno)
def _is_variable_name(self, token: Token) -> bool:
'''Метод для проверки токена на предмет того, что он является частью
имени переменной.'''
if not token.type == 'name':
return False
if (token.value in self._datavars.available_packages or
token.value in self.environment.globals):
return True
for namespace in self.environment.context_class._env_set:
if token.value in namespace:
return True
return False
def check_parameter(self, parameter_name: str, parameter_value: Any,
context: CalculateContext) -> str:
self.parameters_processor.check_template_parameter(
parameter_name,
parameter_value,
self.template_type,
self.stream.current.lineno)
return ''
def parse_condition(self) -> Template:
try:
condition_node = self.parser.parse_expression(with_condexpr=True)
condition_node = self.call_method(
'set_condition_result',
[condition_node],
lineno=self.stream.current.lineno)
condition_template = nodes.Template(
[nodes.Output([condition_node])])
condition_template = condition_template.set_environment(
self.environment)
template = self.environment.from_string(condition_template)
return template
except Exception as error:
raise ConditionFailed('Error during parsing condition:{}'
.format(str(error)),
lineno=self.stream.current.lineno)
def check_conditions(self, conditions: List[Template]) -> None:
for condition in conditions:
# TODO Добавить различие провала условие из синтаксической ошибки
# или при отрицательном условии.
self.condition_result = False
try:
condition.render(__datavars__=self._datavars)
except Exception as error:
raise ConditionFailed('Error during handling condition: {}'
.format(str(error)),
lineno=self.stream.current.lineno)
if not self.condition_result:
raise ConditionFailed('Condition is failed',
lineno=self.stream.current.lineno)
# DEPRECATED
def get_condition_result(self) -> bool:
'''Метод для разбора условий из тега calculate.'''
self.condition_result = False
try:
condition_node = self.parser.parse_expression(with_condexpr=True)
condition_node = self.call_method(
'set_condition_result',
[condition_node],
lineno=self.stream.current.lineno)
condition_template = nodes.Template(
[nodes.Output([condition_node])])
condition_template = condition_template.set_environment(
self.environment)
template = self.environment.from_string(condition_template)
template.render(__datavars__=self._datavars)
except Exception:
return False
return self.condition_result
def set_condition_result(self, condition_result: Any) -> str:
'''Метод для сохранения результата вычисления условия.'''
self.condition_result = condition_result
return ''
def _make_save_node(self, variable_name_node: nodes.List,
target_file_node: nodes.Const, optype: int,
lineno: int) -> nodes.Output:
'''Метод для создания ноды, сохраняющей переменные.'''
right_value = self.parser.parse_expression(with_condexpr=True)
optype_node = nodes.Const(optype, lineno=lineno)
save_variable_node = self.call_method('save_variable',
[variable_name_node,
right_value,
target_file_node,
optype_node,
nodes.ContextReference()],
lineno=lineno)
return nodes.Output([save_variable_node], lineno=lineno)
def save_variable(self, variable: List[str], right_value: Any,
target_file: str, optype: int,
context: CalculateContext) -> str:
'''Метод для сохранения значений переменных указанных в теге save.'''
datavars = context.parent['__datavars__']
if variable[0] not in datavars:
raise SaveError("can not save variable '{}'. The variable's"
" package '{}' is not found".format(
'.'.join(variable),
variable[0]))
modify_only = (variable[0] != 'custom')
package = datavars[variable[0]]
variable_name = variable[-1]
value_container = self._find_value_container(variable, package,
modify_only=modify_only)
# Теперь меняем знaчение переменной.
if isinstance(value_container, NamespaceNode):
self._modify_variables(variable, value_container, right_value,
optype, target_file=target_file,
modify_only=modify_only)
elif isinstance(value_container, VariableNode):
hash_value = value_container.get_value().get_hash()
if variable_name in hash_value:
if optype == self.ASSIGN:
new_value = right_value
elif optype == self.APPEND:
new_value = new_value + right_value
elif optype == self.REMOVE:
new_value = new_value - right_value
else:
new_value = right_value
hash_value.update({variable_name: new_value})
value_container.set(hash_value)
if target_file:
self._save_to_target(variable[:-1], variable_name,
new_value, target_file)
return ''
def _find_value_container(self, variable: List[str],
vars_package: NamespaceNode,
modify_only: bool = True
) -> Union[NamespaceNode, VariableNode]:
'''Метод для поиска контейнера, путь к которому указан в аргументе.
Этим контейнером может быть пространство имен или хэш.'''
current_container = vars_package
variable_path = variable[1:-1]
for section in variable_path:
if section in current_container._namespaces:
current_container = current_container._namespaces[section]
elif (section in current_container._variables
and current_container._variables[section].variable_type
is HashType):
current_container = current_container._variables[section]
if section != variable_path[-1]:
# Если обнаружен хэш, но в пути к переменной кроме ключа
# хэша есть еще что-то далее -- значит путь к переменной
# ошибочен.
raise SaveError("can not save variable '{}'. Other"
" variable '{}' on the path".format(
'.'.join(variable),
current_container.get_fullname()))
elif not modify_only:
new_namespace = NamespaceNode(section)
current_container.add_namespace(new_namespace)
current_container = new_namespace
else:
raise SaveError("can not save variable '{}'. Namespace '{}'"
" is not found in '{}'".format(
'.'.join(variable),
section,
current_container.get_fullname()))
return current_container
def _modify_variables(self, variable: List[str], namespace: NamespaceNode,
new_value: Any, optype: int,
target_file: Optional[str] = None,
modify_only: bool = True) -> None:
'''Метод для модификации значения переменной.'''
variable_name = variable[-1]
if variable_name in namespace._variables:
variable_node = namespace[variable_name]
if optype == self.ASSIGN:
variable_node.set(new_value)
elif optype == self.APPEND:
new_value = self._append_variable_value(variable_node,
new_value)
variable_node.set(new_value)
elif optype == self.REMOVE:
new_value = self._remove_variable_value(variable_node,
new_value)
variable_node.set(new_value)
elif not modify_only:
VariableNode(variable_name, namespace,
variable_type=IniType, source=str(new_value))
else:
raise SaveError("can not create variable '{}' in not 'custom'"
" namespace".format('.'.join(variable)))
if target_file:
if namespace._variables[variable_name].variable_type is HashType:
for key, value in new_value.items():
self._save_to_target(variable, key, value, target_file)
else:
self._save_to_target(variable[:-1], variable_name,
new_value, target_file)
# DEPRECATED
def _modify_hash(self, variable_name: List[str],
hash_variable: VariableNode, new_value, optype,
target_file: Optional[str] = None) -> None:
'''Метод для модификации значения в переменной-хэше.'''
value_name = variable_name[-1]
hash_value = hash_variable.get_value().get_hash()
if value_name in hash_value:
if optype == self.APPEND:
new_value = hash_value[value_name] + new_value
elif optype == self.REMOVE:
new_value = hash_value[value_name] - new_value
hash_value.update({value_name: new_value})
hash_variable.set(hash_value)
if target_file:
self._save_to_target(variable_name[:-1], value_name,
new_value, target_file)
def _save_to_target(self, namespace_name: List[str],
variable_name: str, value: Any, target_file: str
) -> None:
'''Метод для добавления переменной в список переменных, значение
которых было установлено через тег save и при этом должно быть
сохранено в указанном файле: save.target_file.'''
namespace_name = tuple(namespace_name)
target_file_dict = self._datavars.variables_to_save[target_file]
if namespace_name not in target_file_dict:
target_file_dict.update({namespace_name: dict()})
target_file_dict[namespace_name].update(
{variable_name: ('=', str(value))})
def _append_variable_value(self, variable: VariableNode,
value: Any) -> Any:
'''Метод описывающий операцию += в теге save.'''
variable_value = variable.get_value()
if (variable.variable_type is IntegerType or
variable.variable_type is FloatType):
variable_value += value
return variable_value
elif variable.variable_type is ListType:
if isinstance(value, str):
value = value.split(',')
if isinstance(value, list):
for item in value:
if item not in variable_value:
variable_value.append(item)
else:
variable_value.append(value)
return variable_value
elif variable.variable_type is IniType:
if not isinstance(variable_value, str):
variable_value = str(variable_value)
variable_value = variable_value.split(',')
if not isinstance(value, list):
if isinstance(value, str):
value = value.split(',')
else:
value = str(value).split(',')
for item in value:
if item not in variable_value:
variable_value.append(item)
return ','.join(variable_value)
# Пока что во всех остальных случаях будем просто возвращать исходное
# значение.
return variable_value
def _remove_variable_value(self, variable: VariableNode, value: Any
) -> Any:
'''Метод описывающий операцию -= в теге save.'''
variable_value = variable.get_value()
if (variable.variable_type is IntegerType or
variable.variable_type is FloatType):
variable_value -= value
return variable_value
elif variable.variable_type is ListType:
if isinstance(value, str):
value = value.split(',')
if isinstance(value, list):
for item in value:
if item in variable_value:
variable_value.remove(item)
elif value in variable_value:
variable_value.remove(value)
return variable_value
elif variable.variable_type is IniType:
if not isinstance(variable_value, list):
if not isinstance(variable_value, str):
variable_value = str(variable_value)
variable_value = variable_value.split(',')
if not isinstance(value, list):
if isinstance(value, str):
value = value.split(',')
else:
value = str(value).split(',')
for item in value:
if item in variable_value:
variable_value.remove(item)
return ','.join(variable_value)
# Пока что во всех остальных случаях будем просто возвращать исходное
# значение.
return variable_value
def _get_parameter(self) -> Tuple[nodes.Const, nodes.Node]:
'''Метод для разбора параметров, содержащихся в теге calculate.'''
lineno = self.stream.current.lineno
parameter_name = self.stream.expect('name').value
parameter_name_node = nodes.Const(parameter_name, lineno=lineno)
if self.stream.skip_if('assign'):
# parameter_value = self.stream.current.value
parameter_rvalue = self.parser.parse_expression(with_condexpr=True)
# if parameter_name == 'env':
# # если параметр env -- обновляем множество значений env
# # контекста вo время парсинга.
# env_names = parameter_value.split(',')
# for name in env_names:
# self.environment.context_class._env_set.add(name.strip())
else:
parameter_rvalue = nodes.Const(True, lineno=lineno)
# parameter_value = True
return (parameter_name_node, parameter_rvalue)
def save_parameters(cls, parameters_dictionary: dict,
context: CalculateContext) -> str:
'''Метод для сохранения значений параметров.'''
context.parent['__parameters__'].set_parameter(parameters_dictionary)
return ''
@pass_context
def pkg(self, context: CalculateContext, *args: dict) -> Version:
'''Метод, реализующий функцию pkg() шаблонов. Функция предназначена для
получения версии пакета, к которому уже привязан шаблон, если
аргументов нет, или версию пакета в аргументе функции. Если аргументов
нет, а шаблон не привязан к какому-либо пакету, или если указанного в
аргументе пакета нет -- функция возвращает пустой объект Version().'''
if args:
package_atom = args[0]
try:
atom_name = self.package_atom_parser.parse_package_parameter(
package_atom)
return atom_name.version
except PackageAtomError:
return Version()
else:
# package = context.parent['__parameters__'].package
package = self.parameters_processor._parameters_container.package
if not package:
return Version()
return package.version
def get_full_filepath(self, fname: str) -> str:
# TODO: добавить получение домашней директории пользователя
# if fname[0] == "~":
# # Получаем директорию пользователя
# fname = os.path.join(self.homeDir,
# fname.partition("/")[2], "")[:-1]
# TODO: учитывать также root_path
fname = os.path.join(
self.parameters_processor.chroot_path,
fname.lstrip("/"))
return fname
@pass_context
def grep(self, context: CalculateContext, fname: str,
regpattern: str) -> str:
'''Метод реализующий функцию grep.'''
fname = self.get_full_filepath(fname)
try:
reg = re.compile(regpattern, re.MULTILINE)
except re.error:
raise TemplateSyntaxError("Wrong regular expression")
fileContent = readFile(fname)
if not fileContent:
return ""
match_data = reg.search(fileContent)
if match_data:
md_groups = match_data.groups()
if md_groups:
return md_groups[0] or ""
else:
return match_data.group()
else:
return ""
@pass_context
def exists(self, context: CalculateContext, fname: str) -> str:
'''Метод реализующий функцию exists.'''
fname = self.get_full_filepath(fname)
try:
check_map = (
('f', stat.S_ISREG),
('d', stat.S_ISDIR),
('l', stat.S_ISLNK),
('b', stat.S_ISBLK),
('c', stat.S_ISCHR),
('p', stat.S_ISFIFO),
('s', stat.S_ISSOCK))
fmode = os.lstat(fname)
for t, func in check_map:
if func(fmode.st_mode):
return t
else:
return 'f'
except OSError:
return ""
class TemplateEngine:
def __init__(self, directory_path: Optional[str] = None,
datavars_module: Union[Datavars,
NamespaceNode,
Variables] = Variables(),
appends_set: set = set(),
chroot_path: str = '/',
for_package: Union[Package, None] = None,
pkg_autosave: bool = False):
ParametersProcessor._inspect_formats_package()
CalculateExtension._parameters_set =\
ParametersProcessor.available_parameters
ParametersProcessor.available_appends = appends_set
self._datavars_module = datavars_module
self._template_text = ''
self._pkg_autosave: bool = pkg_autosave
self._parameters_object = ParametersContainer()
self.parameters_processor = ParametersProcessor(
chroot_path=chroot_path,
datavars_module=datavars_module,
for_package=for_package)
if directory_path is not None:
self.environment = Environment(
loader=FileSystemLoader(directory_path),
trim_blocks=True,
lstrip_blocks=True)
else:
self.environment = Environment(trim_blocks=True,
lstrip_blocks=True)
self.environment.filters.update(CALCULATE_FILTERS)
self.calculate_extension = CalculateExtension(
self.environment,
self.parameters_processor,
datavars_module=datavars_module,
chroot_path=chroot_path)
self.environment.add_extension(self.calculate_extension)
self.environment.context_class = CalculateContext
@property
def for_package(self) -> Package:
return self.parameters_processor.for_package
@for_package.setter
def for_package(self, package: Package) -> None:
self.parameters_processor.for_package = package
def change_directory(self, directory_path: str) -> None:
'''Метод для смены директории в загрузчике.'''
self.environment.loader = FileSystemLoader(directory_path)
def process_template(self, template_path: str, template_type: str,
parameters: Optional[ParametersContainer] = None
) -> None:
'''Метод для обработки файла шаблона, расположенного по указанному
пути.'''
if parameters is not None:
self._parameters_object = parameters
else:
self._parameters_object = ParametersContainer()
if self._parameters_object.env:
CalculateContext._env_set = self._parameters_object.env.copy()
else:
CalculateContext._env_set = set()
self.parameters_processor.set_parameters_container(
self._parameters_object)
self.calculate_extension.template_type = template_type
self.calculate_extension.calculate_parsed = False
template = self.environment.get_template(template_path)
self._template_text = template.render(
__datavars__=self._datavars_module,
__parameters__=self._parameters_object,
Version=Version
)
def process_template_from_string(
self, string: str, template_type: int,
parameters: Optional[ParametersContainer] = None
) -> None:
'''Метод для обработки текста шаблона.'''
if parameters is not None:
self._parameters_object = parameters
else:
self._parameters_object = ParametersContainer()
if self._parameters_object.env:
CalculateContext._env_set = self._parameters_object.env.copy()
else:
CalculateContext._env_set = set()
self.parameters_processor.set_parameters_container(
self._parameters_object)
self.calculate_extension.template_type = template_type
self.calculate_extension.calculate_parsed = False
template = self.environment.from_string(string)
self._template_text = template.render(
__datavars__=self._datavars_module,
__parameters__=self._parameters_object,
Version=Version
)
@property
def parameters(self) -> ParametersContainer:
return self._parameters_object
@property
def template_text(self) -> str:
text, self._template_text = self._template_text, ''
return text