Browse Source

Implemented commands and scripts modules and parameter's container too.

packages
Иванов Денис 2 years ago
parent
commit
a36aa05bae
  1. 0
      calculate/commands/__init__.py
  2. 175
      calculate/commands/commands.py
  3. 0
      calculate/scripts/__init__.py
  4. 986
      calculate/scripts/scripts.py
  5. 30
      calculate/templates/template_engine.py
  6. 203
      calculate/templates/template_processor.py
  7. 2
      calculate/utils/files.py
  8. 5
      calculate/utils/io_module.py
  9. 19
      calculate/utils/package.py
  10. 209
      calculate/variables/datavars.py
  11. 78
      calculate/variables/loader.py
  12. 575
      calculate/variables/old_vars/datavars.py
  13. 5
      calculate/variables/old_vars/main/__init__.py
  14. 88
      calculate/variables/old_vars/os/gentoo/__init__.py
  15. 335
      calculate/variables/old_vars/vars_loader.py
  16. 727
      calculate/variables/parameters.py
  17. 19
      calculate/vars/main/__init__.py
  18. 5
      calculate/vars/os/gentoo/__init__.py
  19. 2
      calculate/vars/system/__init__.py
  20. 8
      pytest.ini
  21. 164
      run.py
  22. 38
      run_templates.py
  23. 0
      tests/commands/__init__.py
  24. 194
      tests/commands/test_commands.py
  25. 0
      tests/commands/testfiles/calculate.ini
  26. 0
      tests/commands/testfiles/gentoo.backup/ini_vars_0.backup
  27. 0
      tests/commands/testfiles/gentoo.backup/portage/calculate.ini
  28. 0
      tests/commands/testfiles/gentoo.backup/portage/profiles/calculate.ini
  29. 0
      tests/commands/testfiles/gentoo.backup/portage/profiles/main/calculate.ini
  30. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/calculate.ini
  31. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/20/calculate.ini
  32. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/20/calculate.ini
  33. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/20/desktop/calculate.ini
  34. 3
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/20/desktop/parent
  35. 2
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/20/parent
  36. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/calculate.ini
  37. 1
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/amd64/parent
  38. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/calculate.ini
  39. 0
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/desktop/calculate.ini
  40. 1
      tests/commands/testfiles/gentoo.backup/repos/calculate/profiles/default/parent
  41. 2
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLD/amd64/calculate.ini
  42. 2
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLD/amd64/parent
  43. 21
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLD/calculate.ini
  44. 1
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLD/parent
  45. 0
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLDX/amd64/calculate.ini
  46. 2
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLDX/amd64/parent
  47. 0
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLDX/calculate.ini
  48. 1
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/CLDX/parent
  49. 0
      tests/commands/testfiles/gentoo.backup/repos/distros/profiles/calculate.ini
  50. 4
      tests/commands/testfiles/variables/main/__init__.py
  51. 66
      tests/commands/testfiles/variables/os/__init__.py
  52. 57
      tests/commands/testfiles/variables/os/gentoo/__init__.py
  53. 18
      tests/commands/testfiles/variables/system/__init__.py
  54. 0
      tests/scripts/__init__.py
  55. 1854
      tests/scripts/test_scripts.py
  56. 48
      tests/templates/test_directory_processor.py
  57. 49
      tests/templates/test_template_engine.py
  58. 11
      tests/templates/test_template_executor.py
  59. 3
      tests/templates/testfiles/test_dir_processor_root/templates_32/install/.calculate_directory
  60. 1
      tests/templates/testfiles/test_dir_processor_root/templates_32/install/dir_32/.calculate_directory
  61. 5
      tests/templates/testfiles/test_dir_processor_root/templates_32/install/dir_32/file_0
  62. 3
      tests/templates/testfiles/test_dir_processor_root/templates_32/update/.calculate_directory
  63. 1
      tests/templates/testfiles/test_dir_processor_root/templates_32/update/dir_33/.calculate_directory
  64. 5
      tests/templates/testfiles/test_dir_processor_root/templates_32/update/dir_33/file_0
  65. 2
      tests/templates/testfiles/test_dir_processor_root/templates_33/install/.calculate_directory
  66. 1
      tests/templates/testfiles/test_dir_processor_root/templates_33/install/dir_34/.calculate_directory
  67. 1
      tests/templates/testfiles/test_dir_processor_root/templates_33/install/dir_34/file_0
  68. 3
      tests/templates/testfiles/test_dir_processor_root/templates_33/update/.calculate_directory
  69. 1
      tests/templates/testfiles/test_dir_processor_root/templates_33/update/dir_35/.calculate_directory
  70. 5
      tests/templates/testfiles/test_dir_processor_root/templates_33/update/dir_35/file_0
  71. 2
      tests/templates/testfiles/test_dir_processor_root/templates_34/install/.calculate_directory
  72. 2
      tests/templates/testfiles/test_dir_processor_root/templates_34/install/dir_36/.calculate_directory
  73. 1
      tests/templates/testfiles/test_dir_processor_root/templates_34/install/dir_36/file_0
  74. 2
      tests/templates/testfiles/test_runner/install/.calculate_directory
  75. 1
      tests/templates/testfiles/test_runner/install/dir_2/.calculate_directory
  76. 5
      tests/templates/testfiles/test_runner/install/dir_2/file_0
  77. 1
      tests/templates/testfiles/test_runner/install/dir_3/.calculate_directory
  78. 1
      tests/templates/testfiles/test_runner/install/dir_3/file_0
  79. 2
      tests/templates/testfiles/test_runner/remove/.calculate_directory
  80. 1
      tests/templates/testfiles/test_runner/remove/test_dir/.calculate_directory
  81. 2
      tests/templates/testfiles/test_runner/update/.calculate_directory
  82. 1
      tests/templates/testfiles/test_runner/update/dir_1/.calculate_directory
  83. 4
      tests/templates/testfiles/test_runner/update/dir_1/file_0
  84. 1
      tests/templates/testfiles/test_runner/update/dir_2/.calculate_directory
  85. 4
      tests/templates/testfiles/test_runner/update/dir_2/file_0
  86. 4
      tests/utils/test_files.py
  87. 793
      tests/variables/old_vars/old_vars.py
  88. 119
      tests/variables/old_vars/variables/level/__init__.py
  89. 6
      tests/variables/old_vars/variables/level/level2/__init__.py
  90. 6
      tests/variables/old_vars/variables/main/__init__.py
  91. 65
      tests/variables/old_vars/variables/os/__init__.py
  92. 5
      tests/variables/test_calculateini.py
  93. 270
      tests/variables/test_datavars.py
  94. 1090
      tests/variables/test_parameters.py
  95. 4
      tests/variables/testfiles/gentoo.backup/repos/distros/profiles/CLD/calculate.ini
  96. 0
      tests/variables/testfiles/parameters_ini/calculate.ini
  97. 7
      tests/variables/testfiles/variables_1/level/level_2/__init__.py
  98. 4
      tests/variables/testfiles/variables_13/main/__init__.py
  99. 50
      tests/variables/testfiles/variables_13/os/__init__.py
  100. 57
      tests/variables/testfiles/variables_13/os/gentoo/__init__.py

0
calculate/variables/old_vars/os/__init__.py → calculate/commands/__init__.py

175
calculate/commands/commands.py

@ -0,0 +1,175 @@
from types import FunctionType
from typing import Union, Callable, List
from calculate.scripts.scripts import Script
from calculate.variables.parameters import Parameters
from calculate.variables.datavars import NamespaceNode, DependenceAPI,\
DependenceError, VariableNotFoundError
from calculate.variables.loader import Datavars
from calculate.utils.io_module import IOModule
class CommandCreationError(Exception):
'''Исключение кидаемое при наличии ошибок во время создания объекта
команды.'''
def __init__(self, message: str, command_id: str = '', title: str = ''):
self.message: str = message
self.command_id: str = command_id
self.command_title: str = title
def __str__(self) -> str:
if self.command_title:
return (f'can not create command {self.command_title}:'
f' {self.message}')
if self.command_id:
return f'can not create command {self.command_id}: {self.message}'
return f'can not create command: {self.message}'
class CommandInitializationError(Exception):
'''Исключение кидаемое при наличии ошибок во время инициализации.'''
def __init__(self, message: str, title: str):
self.message: str = message
self.command_title: str = title
def __str__(self) -> str:
if self.command_title:
return (f'can not initialize command {self.command_title}:'
f' {self.message}')
class Command:
'''Класс команды, предназначен для хранения информации о команде и привязки ее к модулю ввода/вывода'''
def __init__(self, command_id: str = '', category: str = '',
title: str = '',
script: Union[Callable, Script, None] = None,
parameters: Union[Parameters, None] = None,
namespace: Union[str, NamespaceNode, None] = None,
command: str = '', gui: bool = False,
icon: Union[str, List[str]] = '',
setvars: dict = {}, rights: List[str] = []):
self._datavars: Union[Datavars, NamespaceNode, None] = None
if not command_id:
raise CommandCreationError('command ID is not set')
self._id: str = command_id
# Если собственно команда не была указана, получаем ее из ID.
if command:
self._command: str = command
else:
self._command: str = f'cl_{self._id}'
if not title:
raise CommandCreationError("title is not set",
command_id=command_id)
self._title: str = title
if not category:
raise CommandCreationError("category is not set",
command_title=title)
self._category: str = category
# Пространство имен относительно которого, будет выполняться скрипт.
self._namespace: Union[str, NamespaceNode, None] = namespace
# Параметры, указываемые при вызове этой команды.
if parameters is None:
raise CommandCreationError("parameters is not set",
command_title=title)
self._parameters: Parameters = parameters
# Скрипт выполняемый при вызове этой команды.
if not script:
raise CommandCreationError("script is not set",
command_title=title)
elif isinstance(script, FunctionType):
# Поддержка способа описания скрипта в функции.
self._script: Script = script()
else:
self._script: Script = script
# Параметр указывающий, должена ли данная команда отображаться в
# графическом интерфейсе.
self._gui: bool = gui
# Устанавливаем название иконки.
if not icon and self._gui:
raise CommandCreationError("icon is not set",
command_title=title)
self._icon: Union[str, List[str]] = icon
# Словарь с переменными и значениями, которые нужно им присвоить.
self._setvars = setvars
# Права, необходимые данной команде.
if not rights:
raise CommandCreationError("rights is not set",
title=title)
self._rights = rights
@property
def script(self) -> Script:
return self._script
def initialize(self, datavars: Union[Datavars, NamespaceNode],
output: IOModule) -> 'Command':
'''Метод для инициализации всего, что нужно инициализировать в команде.
'''
self._datavars = datavars
self._output = output
# Сначала найдем пространство имен.
if self._namespace is not None:
if isinstance(self._namespace, str):
self._namespace = self._find_namespace(self._namespace,
datavars)
# Инициализируем параметры.
self._parameters.initialize(datavars)
# Инициализируем скрипт.
self._script.initialize(output, datavars, self._namespace)
# Устанавливаем переменные, если нужно.
if self._setvars:
self._set_variables(self._setvars, datavars, self._namespace)
return self
def _find_namespace(self, namespace_path: str,
datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод для поиска пространств имен.'''
# TODO сделать где-нибудь единственный статический вариант.
parts = namespace_path.split('.')
namespace = datavars
for part in parts:
namespace = namespace[part]
return namespace
def _set_variables(self, setvars: dict,
datavars: Union[Datavars, NamespaceNode],
namespace: Union[NamespaceNode, None]) -> None:
'''Метод для установки указанных значений, указанным переменным.'''
force = False
for varname, value in setvars.items():
if varname.endswith('!'):
varname = varname[:-1]
force = True
else:
force = False
try:
variable = DependenceAPI.find_variable(
varname, datavars,
current_namespace=namespace)
except VariableNotFoundError:
self._output.set_error(f"Can not set variable '{varname}':"
" variable does not exist.")
if not variable.readonly or force:
variable.set(value)
else:
self._output.set_error("Can not set readonly variable "
f"'{variable.get_fullname}'.")

0
tests/variables/old_vars/variables/__init__.py → calculate/scripts/__init__.py

986
calculate/scripts/scripts.py

@ -0,0 +1,986 @@
# vim: fileencoding=utf-8
#
import re
import inspect
from typing import Callable, Any, Union, List, Generator
from calculate.variables.datavars import DependenceAPI, DependenceError,\
VariableNode, NamespaceNode,\
HashType, TableType
from calculate.variables.loader import Datavars
from calculate.utils.io_module import IOModule
from collections.abc import Iterable, Mapping
class TaskInitializationError(Exception):
pass
class ScriptError(Exception):
pass
class TaskError(Exception):
def __init__(self, message, handlers=set()):
super().__init__(message)
self.handlers = handlers
class ActionError(Exception):
'''Класс исключения для создания штатных исключений в функциях действий.'''
pass
class ConditionError(Exception):
pass
class Script:
'''Класс скрипта, собирает задачи, обработчики, блоки и прочее. Принимает
аргументы.'''
def __init__(self, script_id: str, args: List[Any] = [],
success_message: str = None,
failed_message: str = None,
interrupt_message: str = None):
self._id: str = script_id
self._items: List[Union['Task', 'Block', 'Handler', 'Run']] = None
self._args_names: list = args
self._args_vars: list = []
self._success_message: str = success_message
self._failed_message: str = failed_message
self._interrupt_message: str = interrupt_message
self._output: IOModule = None
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: Union[NamespaceNode, None] = None
self._handlers: set = set()
self._initialized: bool = False
def tasks(self, *items: List[Union['Task', 'Block', 'Handler', 'Run']]
) -> 'Script':
'''Метод для указания задач и контейнеров, из которых состоит скрипт.
'''
self._items = items
return self
def initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode = None) -> 'Script':
'''Метод для инициализации, состоящей в установке модулей вывода,
модуля переменных, текущего пространства имен, а также создании
переменных скрипта.'''
self._output = output
self._datavars = datavars
self._namespace = namespace
if not self._initialized:
self._script_namespace, self._args_vars =\
self.make_script_variables(self._id,
self._args_names,
self._datavars)
self._initialized = True
return self
def run(self) -> None:
'''Метод для запуска задач, содержащихся в данном скрипте.'''
essential_error = None
handlers_to_run = []
for task_item in self._items:
if isinstance(task_item, Handler):
if task_item.id in self._handlers:
handlers_to_run.append(task_item)
elif essential_error is None:
try:
output = task_item.run(self._output,
self._datavars,
self._script_namespace,
self._namespace)
self._handlers.update(output)
except Exception as error:
if isinstance(error, TaskError):
self._handlers.update(error.handlers)
essential_error = error
for handler in handlers_to_run:
try:
task_item.run(self._output, self._datavars,
self._script_namespace, self._namespace)
except Exception as error:
# TODO Разобраться что делать с ошибками в обработчике.
self._output.set_error(f"error in handler '{task_item.id}':"
f" {str(error)}")
if essential_error is not None:
raise essential_error
@staticmethod
def make_script_variables(script_id,
args: List[Any],
datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод для создания переменных скрипта. Создает пространство имен
скрипта и переменных аргументов.'''
if 'scripts' not in datavars:
scripts = Script.make_scripts_namespace(datavars)
else:
scripts = datavars.scripts
if script_id not in scripts:
current_script = NamespaceNode(script_id)
scripts.add_namespace(current_script)
else:
current_script = scripts[script_id]
current_script.clear()
args_vars = []
for arg in args:
args_vars.append(VariableNode(arg, current_script))
return current_script, args_vars
@staticmethod
def make_scripts_namespace(datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод создающий пространства имен, необходимые для работы задач.'''
# Для тестирования скорее.
scripts = NamespaceNode('scripts')
datavars.add_namespace(scripts)
env = NamespaceNode('env')
scripts.add_namespace(env)
loop = NamespaceNode('loop')
env.add_namespace(loop)
return scripts
def __call__(self, *args):
'''Метод для запуска скрипта с аргументами.'''
if len(args) < len(self._args_names):
raise ScriptError((f"script '{self._id}' missing"
f" {len(self._args_names) - len(args)} required"
" arguments: '") +
"', '".join(self._args_names[len(args):]) + "'")
elif len(args) > len(self._args_names):
raise ScriptError(f"script '{self._id}' takes"
f" {len(self._args_names)} arguments, but"
f" {len(args)} were given")
args_values = self._get_args_values(args, self._datavars,
self._namespace)
for arg_variable, arg_value in zip(self._args_vars, args_values):
arg_variable.source = arg_value
self.run()
def _get_args_values(self, args: List[Any],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode):
'''Метод для получения значений аргументов.'''
args_values = []
for argument in args:
if isinstance(argument, str):
variable = DependenceAPI.find_variable(
argument, datavars,
current_namespace=namespace)
args_values.append(variable.get_value())
elif isinstance(argument, Static):
args_values.append(argument.value)
else:
args_values.append(argument)
return args_values
class Run:
'''Класс запускателя скриптов.'''
def __init__(self, script: Script, namespace: str = None,
args: List[Any] = [], when: 'Var' = None,
essential: bool = True):
self._script: Script = script
self._namespace: str = namespace
self._args: List[Any] = args
self._condition: Var = when
self._essential = essential
def run(self, output: IOModule, datavars: Union[Datavars, NamespaceNode],
parent_namespace: NamespaceNode, namespace: NamespaceNode) -> set:
'''Метод для запуска скрипта, указанного имеющегося в запускателе.'''
if self._condition is None or self._condition(datavars,
namespace,
parent_namespace):
# пространство имен или наследуем, либо берем по указанному пути.
if not self._namespace:
self._namespace = namespace
elif isinstance(self._namespace, str):
parts = self._namespace.split('.')
parts.reverse()
self._namespace = datavars[parts.pop()]
while parts:
self._namespace = self._namespace[parts.pop()]
try:
# if not self._script._initialized:
self._script.initialize(output, datavars, self._namespace)
self._script(*self._args)
except (ScriptError, TaskError) as error:
if self._essential:
raise ScriptError(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
output.set_error(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
return set()
class Static:
'''Класс, оборачивающий значение, которое не должно восприниматься как
переменная.'''
def __init__(self, value: Any):
self._value: Any = value
@property
def value(self) -> Any:
return self._value
class Task:
'''Класс реализующий задачи -- объекты, исполняющие указанные функции
действия action, и содержащий всю необходимую информацию для их работы.'''
def __init__(self, **kwargs):
if 'id' in kwargs:
self._id: str = kwargs['id']
else:
raise TaskInitializationError("id is not set for task")
# Имя не обязательно.
self._name: str = kwargs.get('name', None)
if 'action' in kwargs:
self._action = kwargs['action']
else:
raise TaskInitializationError("action is not set for task"
f" '{self._name or self._id}'")
# Список аргументов и их типы
self._args_sources: Union[tuple, list] = kwargs.get('args', [])
self._args_types: List[type] = []
self._arg_names: List[str] = []
self._args: Union[tuple, list] = []
# Переменные, которым через set будет присвоен результат работы action.
self._set: Union[list, tuple] = kwargs.get('set', None)
self._return_type: type = None
# Условие выполнения задачи.
self._condition: Var = kwargs.get('when', None)
# Существенность задачи. Если задача существенная, ошибка в ней
# приведет к остановке скрипта, если несущественная -- ошибка будет
# записана в переменную этой задачи, а скрипт будет выполняться дальше.
self._essential: bool = kwargs.get('essential', True)
# Параметр для указания обработчиков, которые нужно запустить в конце
# выполнения скрипта.
self._handlers = set()
handlers: set = kwargs.get('notify', set())
if handlers:
if isinstance(handlers, (tuple, list)):
self._handlers.update(handlers)
else:
self._handlers.add(handlers)
self._initialized: bool = False
self._namespace: NamespaceNode = None
self._script_namespace: NamespaceNode = None
self._datavars: Union[Datavars, NamespaceNode] = None
self._output: IOModule = None
@property
def essential(self) -> bool:
return self._essential
@property
def initialized(self) -> bool:
return self._initialized
def _initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> 'Task':
'''Метод для инициализации задач, привязывает задачу к указанному
модулю переменных, модулю вывода, формирует список аргументов,
определяет их типы и находит переменные, к которым в последствии
нужно будет присвоить значение, возвращенное action.'''
self._datavars = datavars
self._output = output
self._namespace = namespace
self._script_namespace = script_namespace
if not self._initialized:
self._arg_names,\
self._args_types,\
self._return_type = self._get_action_info(self._action)
self._args = self._update_arguments(self._args_sources)
if self._set is not None:
self._set = self._get_set_variables(self._set, datavars,
namespace)
self._initialized = True
else:
self._args = self._update_arguments(self._args_sources)
return self
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска данной задачи.'''
self._initialize(output, datavars, script_namespace, namespace)
if (self._condition is None or self._condition(
self._datavars,
self._namespace,
self._script_namespace)):
arguments_values = self._get_args_values(self._args,
self._args_types)
try:
result = self._action(*arguments_values)
self._create_task_var(result=result,
success=True)
if self._return_type:
self._check_return_type(result)
if self._set:
self._set_variables(result)
return self._handlers
except Exception as error:
if self._essential and not isinstance(error, ActionError):
raise TaskError(
f"essential task '{self._name or self._id}' is"
f" failed. Reason: {error}")
self._create_task_var(success=False,
error_message=str(error))
return set()
def _get_action_info(self, action: Callable):
'''Метод для получения информации о функции действия.'''
arguments = [arg.name for arg in inspect.signature(
action).parameters.values()]
annotation = action.__annotations__
action_types = []
for argument in arguments:
action_types.append(annotation.get(argument, None))
return arguments, action_types, annotation.get('return', None)
def _update_arguments(self, args_sources: List[Any]) -> tuple:
'''Метод для обновления и установки аргументов функции действия.'''
arguments = []
# Проверяем количество аргументов.
args_length = len(self._arg_names)
if 'output' in self._arg_names:
if (args_length - 1) != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length - 1} arguments"
f" but {len(args_sources)}"
" is given")
elif args_length != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length} arguments but"
f" {len(args_sources)} is given")
# Формируем список значений аргументов.
args_iterator = iter(args_sources)
for arg_name in self._arg_names:
if arg_name == 'output':
arguments.append(self._output)
else:
argument = next(args_iterator)
if isinstance(argument, str):
try:
variable = DependenceAPI.find_variable(
argument,
self._datavars,
current_namespace=self._namespace)
arguments.append(variable)
except DependenceError as error:
raise TaskInitializationError(str(error))
elif isinstance(argument, Static):
arguments.append(argument.value)
else:
arguments.append(argument)
return tuple(arguments)
def _get_args_values(self, args: List[Any],
args_types: List[type]) -> List[Any]:
'''Метод для получения значений из списка аргументов функции action.'''
arguments_values = []
for arg, arg_type in zip(args, args_types):
if isinstance(arg, VariableNode):
value = arg.get_value()
if arg.variable_type is HashType:
value = value.get_hash()
elif arg.variable_type is TableType:
value = value.get_table()
arguments_values.append(self._check_type(value, arg_type))
else:
arguments_values.append(self._check_type(arg, arg_type))
return arguments_values
def _check_type(self, value: Any, arg_type: type) -> Any:
'''Метод для проверки типа аргумента.'''
if arg_type is None or isinstance(value, arg_type):
return value
else:
raise TaskError(f"'{arg_type}' is expected, not {type(value)}")
def _create_task_var(self,
result: Any = None,
success: bool = True,
error_message: str = None) -> None:
'''Метод для создания переменной задачи, в которую отправляются
результаты вычислений, имеющихся в функции action.'''
VariableNode(self._id, self._script_namespace, variable_type=HashType,
source={'result': result,
'success': success,
'error_message': error_message})
def _get_set_variables(self, to_set: Union[str, list, tuple, dict],
datavars: Union[NamespaceNode, Datavars],
namespace: NamespaceNode) -> Union[str, list, dict]:
'''Метод для поиска переменных, в которые нужно положить значения,
полученные в результате вычислений в action.'''
if isinstance(to_set, str):
return DependenceAPI.find_variable(to_set, datavars,
current_namespace=namespace)
elif isinstance(to_set, (list, tuple)):
vars_to_set = []
for var in to_set:
vars_to_set.append(
DependenceAPI.find_variable(
var, datavars,
current_namespace=namespace))
return vars_to_set
elif isinstance(to_set, dict):
for key in to_set:
variable = DependenceAPI.find_variable(
to_set[key], datavars,
current_namespace=namespace)
to_set[key] = variable
return to_set
else:
raise TaskError(f"set value must be {dict}, {list}, {tuple} or "
f"{str}, not {type(to_set)}")
def _set_variables(self, result: Any) -> None:
'''Метод для присвоения переменным результатов работы функции action.
'''
self._check_set_vars_types(result)
if isinstance(self._set, dict):
if not isinstance(result, dict):
raise TaskError(f"set parameter value is '{dict}' but action"
f" result is '{type(result)}'")
for key in self._set:
if key in result:
self._set[key].set(result[key])
elif isinstance(self._set, (list, tuple)):
for var in self._set:
var.set(result)
else:
self._set.set(result)
def _check_set_vars_types(self, result: Any) -> None:
'''Метод для проверки соответствия типов значений, полученных из
action, и переменных, в которые нужно положить эти значения.'''
# Теперь проверяем соответствие типа переменных из set и типа значений.
if isinstance(self._set, dict):
for key, variable in self._set.items():
if (key in result and not
isinstance(result[key],
variable.variable_type.python_type)):
raise TaskError("can not set value of"
f" '{type(result[key])}' type to the"
" variable of"
f" '{variable.variable_type.name}'"
" type")
elif isinstance(self._set, (list, tuple)):
for variable in self._set:
if not isinstance(result, variable.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{variable.variable_type.name}' type")
else:
if not isinstance(result, self._set.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{self._set.variable_type.name}' type")
def _check_return_type(self, result: Any) -> None:
'''Метод для проверки типа значения, возвращенного функцией action.'''
if not isinstance(result, self._return_type):
raise TaskError(f"action returned value of '{type(result)}' "
f"type, but expected is '{self._return_type}'")
class Loop:
'''Базовый класс всех объектов цикла для блоков.'''
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
pass
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
yield False
class For(Loop):
'''Класс цикла блока, использующего указанное итерируемое значение.'''
def __init__(self, value: str, iter_arg: Union[str, Iterable]):
self._iter_arg = iter_arg
self._item_name = value
self._iterable_value = None
self._item_variable = None
self._initialized = False
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._iterable_value = self._get_iterable_value(self._iter_arg,
datavars,
namespace)
self._item_variable = self._get_item_variable(self._item_name,
script_namespace)
self._initialized = True
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
if not self._initialized:
yield False
else:
for item in self._iterable_value:
self._item_variable.source = item
yield True
yield False
def _get_iterable_value(self, iter_argument: Union[str, Iterable],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode) -> Iterable:
'''Метод для получения и проверки значения, которое будем далее
итерировать.'''
value = self._iter_arg
if isinstance(self._iter_arg, str):
variable = DependenceAPI.find_variable(self._iter_arg,
datavars,
current_namespace=namespace)
value = variable.get_value()
if variable.variable_type is HashType:
value = value.get_hash()
elif variable.variable_type is TableType:
value = value.table_hash()
if not isinstance(value, Iterable):
raise TaskError("For() argument '{value}' is not iterable.")
if isinstance(value, Mapping):
value = value.items()
return value
def _get_item_variable(self, item_name: str,
script_namespace: NamespaceNode) -> VariableNode:
'''Метод для получения или создания переменной, в которой будет
храниться текущее значение, взятое из итерируемого значения.'''
if item_name in script_namespace:
item_variable = script_namespace[item_name]
else:
item_variable = VariableNode(item_name, script_namespace)
return item_variable
class While(Loop):
'''Класс цикла блока, аналогичного while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: NamespaceNode = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
while True:
condition_result = self._condition(self._datavars,
self._namespace,
self._script_namespace)
yield condition_result
class Until(Loop):
'''Класс цикла блока, аналогичного конструкции do ... while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: Datavars = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
while True:
yield self._condition(self._datavars,
self._namespace,
self._script_namespace)
class Block:
'''Класс блока задач.'''
def __init__(self, *tasks: List[Task], when: Union['Var', None] = None,
loop: Loop = Loop()):
self._tasks: List[Task] = tasks
self._rescue_tasks: List[Task] = None
self._condition: 'Var' = when
if not isinstance(loop, Loop):
raise TaskError('loop block parameter must be Loop type')
self._loop: Loop = loop
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в блоке.'''
handlers = set()
if (self._condition is None or self._condition(datavars, namespace,
script_namespace)):
self._loop.initialize_loop(datavars, script_namespace, namespace)
looper = self._loop.get_looper()
while next(looper):
for task in self._tasks:
try:
output = task.run(output, datavars, script_namespace,
namespace)
handlers.update(output)
except Exception as error:
if self._rescue_tasks is not None:
self._run_rescue(output, datavars,
script_namespace, namespace)
if isinstance(error, TaskError):
error.handlers.update(handlers)
raise error
return handlers
def _run_rescue(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска задач, указанных в rescue. Эти задачи выполняются,
если возникает ошибка при выполнении важной задачи, содержащейся в
блоке.'''
for task in self._rescue_tasks:
try:
task.run(output, datavars, script_namespace, namespace)
except Exception:
# TODO разобраться с тем, что делать если ошибка закралась в
# задачи из rescue.
pass
def rescue(self, *tasks: List[Task]) -> 'Block':
'''Метод для задания задач, выполняемых, если некоторая важная задача,
содержащаяся в блоке выполняется с ошибкой.'''
self._rescue_tasks = tasks
return self
class Handler:
'''Класс обработчика -- объекта скрипта, имеющего идентификатор,
запускаемого в конце выполнения скрипта в том случае, если его
идентификатор присутствовал в параметре notify хотя бы у одной успешно
выполненной задачи.'''
def __init__(self, handler_id, *tasks: List[Task]):
self._id: str = handler_id
self._tasks: List[Task] = tasks
@property
def id(self) -> str:
return self._id
def run(self, output: IOModule, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в обработчике.'''
handlers = set()
for task in self._tasks:
try:
handlers.update(task.run(output, datavars, script_namespace,
namespace))
except Exception as error:
output.set_error(f"error during execution handle '{self._id}'"
f" in task '{task._name or task._id}':"
f" {str(error)}")
return handlers
class ConditionValue:
'''Базовый класс для всех объектов, предназначенных для создания условий.
'''
def __init__(self, value: Any):
self.value: Any = value
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.value
class Var(ConditionValue):
'''Класс для создания условий выполнения задач.'''
def __init__(self, *variable: List[VariableNode],
value: Any = None, other: Any = None,
tasks: Union[List[str], None] = None,
function: Union[Callable, None] = None):
if variable:
self.variable: Union[VariableNode, str, None] = variable[0]
else:
self.variable = None
self.value: Any = value
self.other: Any = other
self.function: Callable = function
self.tasks: List[str] = tasks
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для расчета итогового значения ноды условия выполнения задачи.
'''
if self.function is not None:
if self.tasks:
return self.function(self.tasks, script_namespace)
elif self.other is not None:
return self.function(self.get_value(datavars, namespace,
script_namespace),
self.get_other(datavars, namespace,
script_namespace))
else:
return self.function(self.get_value(datavars, namespace,
script_namespace))
return self.get_value(datavars, namespace, script_namespace)
def get_value(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения собственного значения ноды.'''
if self.value is not None:
return self.value.get(datavars, namespace, script_namespace)
elif self.variable is not None:
if isinstance(self.variable, str):
self.variable = DependenceAPI.find_variable(
self.variable, datavars,
current_namespace=namespace)
return self.variable.get_value()
else:
raise ConditionError('Can not get value for condition')
def get_other(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения значения ноды, необходимой для выполнения
бинарной операции.'''
if isinstance(self.other, ConditionValue):
return self.other.get(datavars, namespace, script_namespace)
else:
return self.other
def __call__(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.get(datavars, namespace, script_namespace)
def __eq__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x == y)
def __ne__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x != y)
def __lt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x < y)
def __gt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x > y)
def __le__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x <= y)
def __ge__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x >= y)
def __and__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x and y)
def __or__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x or y)
def __invert__(self) -> 'Var':
return Var(value=self, function=lambda x: not x)
def __lshift__(self, other: Any) -> 'Var':
'''Метод для переопределения операции <<, которая теперь играет роль
in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def has(self, other: Any) -> 'Var':
'''Метод аналогичный операции in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def match(self, pattern: str) -> 'Var':
'''Метод для проверки с помощью регулярки, если в строке нечто,
соответствующее паттерну.'''
return Var(value=self, other=pattern,
function=lambda x, y: bool(re.search(y, x)))
def regex(self, pattern: str, repl: str) -> 'Var':
'''Метод для реализации sub'''
return Var(value=self, other=(pattern, repl),
function=lambda x, y: re.sub(*y, x))
def replace(self, original: str, repl: str) -> 'Var':
'''Метод для реализации replace.'''
return Var(value=self, other=(original, repl),
function=lambda x, y: x.replace(*y))
def __repr__(self) -> 'Var':
return ("<Condition value>")
def Done(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены.'''
return Var(tasks=tasks,
function=_is_done)
def DoneAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены.'''
return Var(tasks=tasks,
function=_is_done_any)
def Success(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены и притом выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success)
def SuccessAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success_any)
def Failed(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы не выполнены или выполнены с ошибкой.'''
return Var(tasks=tasks,
function=_is_failed)
def FailedAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые не выполнены или выполнены с ошибкой.
'''
return Var(tasks=tasks,
function=_is_failed_any)
def _is_done(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены.'''
for task in tasks:
if task not in script_namespace:
return False
return True
def _is_done_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены.'''
for task in tasks:
if task in script_namespace:
return True
return False
def _is_success(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены успешно.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_success_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены успешно.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return True
return False
def _is_failed(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_failed_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return True
return False

30
calculate/templates/template_engine.py

@ -244,15 +244,24 @@ class ParametersProcessor:
def check_package_parameter(self, parameter_value):
try:
atom_object = self.package_atom_parser.parse_package_parameter(
if isinstance(parameter_value, str):
result = self.package_atom_parser.parse_package_parameter(
parameter_value)
elif isinstance(parameter_value, list):
result = []
for atom in parameter_value:
result.append(
self.package_atom_parser.parse_package_parameter(
atom)
)
except PackageAtomError as error:
if error.errno == NOTEXIST:
raise ConditionFailed(error.message, self.lineno)
else:
raise IncorrectParameter(error.message)
return atom_object
return result
def check_append_parameter(self, parameter_value):
if parameter_value not in self.available_appends:
@ -754,18 +763,18 @@ class CalculateExtension(Extension):
'''Класс расширения для jinja2, поддерживающий теги calculate-шаблонов.'''
_parameters_set = set()
parameters_processor = None
# Виды операций в теге save.
ASSIGN, APPEND, REMOVE = range(3)
def __init__(self, environment, datavars_module=Variables()):
def __init__(self, environment, parameters_processor: ParametersProcessor,
datavars_module=Variables()):
super().__init__(environment)
self.environment = environment
self.environment.globals.update({'pkg': self.pkg})
self._datavars = datavars_module
self.parameters_processor = parameters_processor
self.template_type = DIR
self.tags = {'calculate', 'save', 'set_var'}
@ -782,6 +791,8 @@ class CalculateExtension(Extension):
'save': self.parse_save}
def __call__(self, env):
# Необходимо для обеспечения возможности передать готовый объект
# расширения, а не его класс.
return self
def parse(self, parser):
@ -1245,9 +1256,6 @@ class TemplateEngine:
CalculateExtension._parameters_set =\
ParametersProcessor.available_parameters
CalculateExtension._datavars = datavars_module
self.available_appends = appends_set
ParametersProcessor.available_appends = appends_set
@ -1259,8 +1267,6 @@ class TemplateEngine:
chroot_path=chroot_path,
datavars_module=datavars_module)
CalculateExtension.parameters_processor = self.parameters_processor
if directory_path is not None:
self.environment = Environment(
loader=FileSystemLoader(directory_path))
@ -1269,6 +1275,7 @@ class TemplateEngine:
self.calculate_extension = CalculateExtension(
self.environment,
self.parameters_processor,
datavars_module=datavars_module)
self.environment.add_extension(self.calculate_extension)
@ -1313,8 +1320,7 @@ class TemplateEngine:
if parameters is not None:
self._parameters_object = parameters
else:
self._parameters_object = ParametersContainer(
parameters_dictionary={})
self._parameters_object = ParametersContainer()
if self._parameters_object.env:
CalculateContext._env_set = self._parameters_object.env.copy()

203
calculate/templates/template_processor.py

@ -2,13 +2,15 @@
#
from pprint import pprint
from ..utils.package import PackageAtomParser, Package, PackageNotFound,\
PackageAtomName, Version
PackageAtomName, Version, NonePackage
from ..utils.files import join_paths, write_file, read_file_lines, FilesError,\
check_directory_link, read_link, Process,\
get_target_from_link
from .template_engine import TemplateEngine, Variables, ConditionFailed,\
ParametersProcessor, DIR, FILE,\
ParametersContainer
from calculate.variables.datavars import StringType, ListType, NamespaceNode
from calculate.variables.loader import Datavars
from .format.base_format import Format
from ..utils.io_module import IOModule
from collections import OrderedDict, abc
@ -153,7 +155,8 @@ class TemplateWrapper:
template_text='',
target_package=None,
chroot_path='/',
config_archive_path='/var/lib/calculate/config-archive'):
config_archive_path='/var/lib/calculate/config-archive',
dbpkg=True):
self.target_path = target_file_path
self.template_path = template_path
self.chroot_path = chroot_path
@ -188,6 +191,10 @@ class TemplateWrapper:
# Пакет, к которому относится файл.
self.target_package = target_package
# Флаг, разрешающий работу с CONTENTS. Если False, то выключает
# protected для всех файлов блокирует все операции с CONTENTS и ._cfg.
self.dbpkg = dbpkg
# Флаг, указывающий, что файл является PROTECTED.
self.protected = False
@ -349,10 +356,12 @@ class TemplateWrapper:
# Если для шаблона и целевого файла никаким образом не удается
# определить пакет и есть параметр append -- шаблон пропускаем.
if parameter_package is None and file_package is None:
if self.parameters.append:
if self.parameters.append and self.parameters.append != 'skip':
raise TemplateCollisionError(
"'package' parameter is not defined for"
" template with 'append' parameter.")
else:
return
elif parameter_package is None:
self.target_package_name = file_package
@ -383,16 +392,19 @@ class TemplateWrapper:
# Проверим, является ли файл защищенным.
# Сначала проверяем по переменной CONFIG_PROTECT.
for protected_path in self._protected_set:
if self.target_path.startswith(protected_path):
self.protected = True
break
if self.dbpkg:
for protected_path in self._protected_set:
if self.target_path.startswith(protected_path):
self.protected = True
break
# Затем по переменной CONFIG_PROTECT_MASK.
for unprotected_path in self._unprotected_set:
if self.target_path.startswith(unprotected_path):
self.protected = False
break
# Затем по переменной CONFIG_PROTECT_MASK.
for unprotected_path in self._unprotected_set:
if self.target_path.startswith(unprotected_path):
self.protected = False
break
else:
self.protected = False
# Собираем список имеющихся ._cfg файлов.
cfg_pattern = os.path.join(os.path.dirname(self.target_path),
@ -586,7 +598,9 @@ class TemplateExecutor:
def __init__(self, datavars_module=Variables(), chroot_path='/',
cl_config_archive='/var/lib/calculate/config-archive',
cl_config_path='/var/lib/calculate/config',
execute_archive_path='/var/lib/calculate/.execute/'):
execute_archive_path='/var/lib/calculate/.execute/',
dbpkg=True):
# TODO добавить список измененных файлов.
self.datavars_module = datavars_module
self.chroot_path = chroot_path
@ -598,6 +612,8 @@ class TemplateExecutor:
self.execute_archive_path = execute_archive_path
self.execute_files = OrderedDict()
self.dbpkg = dbpkg
# Список целевых путей измененных файлов. Нужен для корректиного
# формирования calculate-заголовка.
self.processed_targets = []
@ -661,7 +677,8 @@ class TemplateExecutor:
template_text=template_text,
target_package=target_package,
chroot_path=self.chroot_path,
config_archive_path=self.cl_config_archive_path)
config_archive_path=self.cl_config_archive_path,
dbpkg=self.<