您最多能選擇 25 個主題 主題必須以字母或數字為開頭,可包含連接號「-」且最長為 35 個字元。

1132 行
50 KiB

此檔案含有易混淆的 Unicode 字元!

此檔案含有易混淆的 Unicode 字元,這些字元的處理方式可能和下面呈現的不同。若您是有意且合理的使用,您可以放心地忽略此警告。使用 Escape 按鈕標記這些字元。

# vim: fileencoding=utf-8
#
import re
import inspect
from typing import Callable, Any, Union, List, Generator, Optional
from calculate.templates.template_processor import DirectoryProcessor
from calculate.variables.datavars import (
DependenceAPI,
DependenceError,
VariableNode,
NamespaceNode,
HashType,
TableType,
StringType,
)
from calculate.variables.loader import Datavars
from calculate.utils.io_module import IOModule
from collections.abc import Iterable, Mapping
class TaskInitializationError(Exception):
pass
class ScriptError(Exception):
pass
class TaskError(Exception):
def __init__(self, message, handlers=set()):
super().__init__(message)
self.handlers = handlers
class ActionError(Exception):
'''Класс исключения для создания штатных исключений в функциях действий.'''
pass
class ConditionError(Exception):
pass
class Script:
'''Класс скрипта, собирает задачи, обработчики, блоки и прочее, хранит их
в себе. Создает экземпляры лаунчеров.'''
def __init__(self, script_id: str,
args: List[Any] = [],
success_message: Optional[str] = None,
failed_message: Optional[str] = None,
interrupt_message: Optional[str] = None):
self._id: str = script_id
self.__items: List[Union['Task', 'Block', 'Handler', 'Run']] = None
self.__tasks_is_set: bool = False
self.__args: list = args
self.__success_message: Optional[str] = success_message
self.__failed_message: Optional[str] = failed_message
self.__interrupt_message: Optional[str] = interrupt_message
@property
def id(self) -> str:
return self._id
@property
def args(self) -> list:
return self.__args
@property
def items(self) -> list:
return self.__items
@property
def success_message(self) -> str:
return self.__success_message
@property
def failed_message(self) -> str:
return self.__failed_message
@property
def interrupt_message(self) -> str:
return self.__interrupt_message
def tasks(self, *items: List[Union['Task', 'Block', 'Handler', 'Run']]
) -> 'Script':
'''Метод для указания задач и контейнеров, из которых состоит скрипт.
'''
if self.__tasks_is_set:
raise ScriptError("Script object is immutable.")
self.__items = items
self.__tasks_is_set = bool(self.__items)
return self
def make_launcher(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode = None) -> 'ScriptLauncher':
'''Метод для создания экземпляра объекта, для запуска данного
скрипта.'''
return ScriptLauncher(self, output, datavars, namespace)
class ScriptLauncher:
def __init__(self, script: Script,
output: IOModule,
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode):
'''Метод для инициализации, состоящей в установке модулей вывода,
модуля переменных, текущего пространства имен, а также создании
переменных скрипта.'''
self._script = script
self._output = output
self._datavars = datavars
self._namespace = namespace
self._script_namespace, self._args_vars = self.make_script_variables(
script.id,
script.args,
datavars)
def run(self) -> None:
'''Метод для запуска задач, содержащихся в данном скрипте.'''
essential_error = None
founded_handlers = []
handlers_to_run = set()
for task_item in self._script.items:
if isinstance(task_item, Handler):
founded_handlers.append(task_item)
elif essential_error is None:
try:
output = task_item.run(self._output,
self._datavars,
self._script_namespace,
self._namespace)
handlers_to_run.update(output)
except Exception as error:
if isinstance(error, TaskError):
handlers_to_run.update(error.handlers)
essential_error = error
for handler in founded_handlers:
if handler.id not in handlers_to_run:
continue
try:
handler.run(self._output, self._datavars,
self._script_namespace, self._namespace)
except Exception as error:
# TODO Разобраться что делать с ошибками в обработчике.
self._output.set_error(f"error in handler '{task_item.id}':"
f" {str(error)}")
if essential_error is not None:
raise essential_error
@staticmethod
def make_script_variables(script_id,
args: List[Any],
datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод для создания переменных скрипта. Создает пространство имен
скрипта и переменных аргументов.'''
if 'scripts' not in datavars:
scripts = ScriptLauncher.make_scripts_namespace(datavars)
else:
scripts = datavars.scripts
if script_id not in scripts:
current_script = NamespaceNode(script_id)
scripts.add_namespace(current_script)
else:
current_script = scripts[script_id]
# current_script.clear()
args_vars = []
for arg in args:
if arg not in current_script._variables:
args_vars.append(VariableNode(arg, current_script))
else:
args_vars.append(current_script._variables[arg])
return current_script, args_vars
@staticmethod
def make_scripts_namespace(datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод создающий пространства имен, необходимые для работы задач.'''
# Для тестирования скорее.
scripts = NamespaceNode('scripts')
datavars.add_namespace(scripts)
env = NamespaceNode('env')
scripts.add_namespace(env)
loop = NamespaceNode('loop')
env.add_namespace(loop)
return scripts
def __call__(self, *args) -> None:
'''Метод для запуска скрипта с аргументами.'''
if len(args) < len(self._script.args):
raise ScriptError(
(f"script '{self._script.id}' missing"
f" {len(self._script.args) - len(args)} required"
" arguments: '") +
"', '".join(self._script.args[len(args):]) + "'")
elif len(args) > len(self._script.args):
raise ScriptError(f"script '{self._script.id}' takes"
f" {len(self._script.args)} arguments, but"
f" {len(args)} were given")
args_values = self._get_args_values(args, self._datavars,
self._namespace)
for arg_variable, arg_value in zip(self._args_vars, args_values):
arg_variable.source = arg_value
self.run()
def _get_args_values(self, args: List[Any],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode) -> list:
'''Метод для получения значений аргументов.'''
args_values = []
for argument in args:
if isinstance(argument, str):
variable = DependenceAPI.find_variable(
argument, datavars,
current_namespace=namespace)
args_values.append(variable.get_value())
elif isinstance(argument, Static):
args_values.append(argument.value)
else:
args_values.append(argument)
return args_values
class RunTemplates:
'''Класс запускателя наложения шаблонов.'''
def __init__(self, id: str = '',
action: str = '',
package: str = '',
chroot_path: str = '',
root_path: str = '',
dbpkg: bool = True,
essential: bool = True,
**group_packages):
self._id: str = id
self._action: str = action
self._package: str = package or None
self._chroot_path: str = chroot_path or None
self._root_path: str = root_path or None
self._dbpkg = dbpkg
self._essential = essential
self._groups: dict = group_packages
def _initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode]):
# Проверяем наличие идентификатора.
if not self._id:
error_msg = "id is not set for templates runner"
if self._essential:
raise TaskError(error_msg)
else:
output.set_error(error_msg)
return set()
# Проверяем наличие параметра action, который обязателен.
if not self._action:
error_msg = ("action parameter is not set for templates runner"
f" '{self._id}'")
if self._essential:
raise TaskError(error_msg)
else:
output.set_error(error_msg)
return set()
# Если установлен chroot_path -- устанавливаем значение соответствующей
# переменной.
if self._chroot_path is not None:
if 'cl_chroot_path' in datavars.main:
datavars.main['cl_chroot_path'].source = self._chroot_path
else:
VariableNode("cl_chroot_path", datavars.main,
variable_type=StringType,
source=self._chroot_path)
# Если установлен root_path -- устанавливаем значение соответствующей
# переменной.
if self._root_path is not None:
if 'cl_root_path' in datavars.main:
datavars.main['cl_root_path'].source = self._root_path
else:
VariableNode("cl_root_path", datavars.main,
variable_type=StringType,
source=self._chroot_path)
if self._groups:
groups = list(self._groups.keys())
for group, atoms in groups:
if isinstance(atoms, str):
self._groups[group] = [name.strip() for name
in atoms.split(',')]
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска наложения шаблонов.'''
self._initialize(output, datavars)
template_processor = DirectoryProcessor(self._action,
datavars_module=datavars,
package=self._package,
output_module=output,
dbpkg=self._dbpkg,
namespace=namespace,
**self._groups)
changed_files = template_processor.process_template_directories()
self._create_result_var(script_namespace,
changed_files=changed_files)
return set()
def _create_result_var(self, script_namespace: NamespaceNode,
changed_files: dict = {},
skipped: list = []) -> None:
'''Метод для создания переменной задачи, в которую отправляются
результаты вычислений, имеющихся в функции action.'''
VariableNode(self._id, script_namespace, variable_type=HashType,
source={"changed": changed_files,
"skipped": skipped})
class Run:
'''Класс запускателя скриптов.'''
def __init__(self, script: Script, namespace: str = None,
args: List[Any] = [],
when: 'Var' = None,
essential: bool = True):
self._script: Script = script
self._namespace: str = namespace
self._args: List[Any] = args
self._condition: Var = when
self._essential = essential
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
parent_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска скрипта, указанного в запускателе.'''
if self._condition is None or self._condition(datavars,
namespace,
parent_namespace):
# пространство имен или наследуем, либо берем по указанному пути.
if not self._namespace:
self._namespace = namespace
elif isinstance(self._namespace, str):
parts = self._namespace.split('.')
parts.reverse()
self._namespace = datavars[parts.pop()]
while parts:
self._namespace = self._namespace[parts.pop()]
try:
# if not self._script._initialized:
launcher = self._script.make_launcher(
output, datavars,
namespace=self._namespace)
launcher(*self._args)
except (ScriptError, TaskError) as error:
if self._essential:
raise ScriptError(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
output.set_error(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
return set()
class Static:
'''Класс, оборачивающий значение, которое не должно восприниматься как
переменная.'''
def __init__(self, value: Any):
self._value: Any = value
@property
def value(self) -> Any:
return self._value
class Task:
'''Класс реализующий задачи -- объекты, исполняющие указанные функции
действия action, и содержащий всю необходимую информацию для их работы.'''
def __init__(self, **kwargs):
if 'id' in kwargs:
self._id: str = kwargs['id']
else:
raise TaskInitializationError("id is not set for task")
# Имя не обязательно.
self._name: str = kwargs.get('name', None)
if 'action' in kwargs:
self._action = kwargs['action']
else:
raise TaskInitializationError("action is not set for task"
f" '{self._name or self._id}'")
# Список аргументов и их типы
self._args_sources: Union[tuple, list] = kwargs.get('args', [])
self._args_types: List[type] = []
self._arg_names: List[str] = []
self._args: Union[tuple, list] = []
# Переменные, которым через set будет присвоен результат работы action.
self._set: Union[list, tuple] = kwargs.get('set', None)
self._return_type: type = None
# Условие выполнения задачи.
self._condition: Var = kwargs.get('when', None)
# Существенность задачи. Если задача существенная, ошибка в ней
# приведет к остановке скрипта, если несущественная -- ошибка будет
# записана в переменную этой задачи, а скрипт будет выполняться дальше.
self._essential: bool = kwargs.get('essential', True)
# Параметр для указания обработчиков, которые нужно запустить в конце
# выполнения скрипта.
self._handlers = set()
handlers: set = kwargs.get('notify', set())
if handlers:
if isinstance(handlers, (tuple, list)):
self._handlers.update(handlers)
else:
self._handlers.add(handlers)
self._initialized: bool = False
self._namespace: NamespaceNode = None
self._script_namespace: NamespaceNode = None
self._datavars: Union[Datavars, NamespaceNode] = None
self._output: IOModule = None
@property
def essential(self) -> bool:
return self._essential
@property
def initialized(self) -> bool:
return self._initialized
def _initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> 'Task':
'''Метод для инициализации задач, привязывает задачу к указанному
модулю переменных, модулю вывода, формирует список аргументов,
определяет их типы и находит переменные, к которым в последствии
нужно будет присвоить значение, возвращенное action.'''
self._datavars = datavars
self._output = output
self._namespace = namespace
self._script_namespace = script_namespace
if not self._initialized:
self._arg_names,\
self._args_types,\
self._return_type = self._get_action_info(self._action)
self._args = self._update_arguments(self._args_sources)
if self._set is not None:
self._set = self._get_set_variables(self._set, datavars,
namespace)
self._initialized = True
else:
self._args = self._update_arguments(self._args_sources)
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска данной задачи.'''
self._initialize(output, datavars, script_namespace, namespace)
if (self._condition is None or self._condition(
self._datavars,
self._namespace,
self._script_namespace)):
arguments_values = self._get_args_values(self._args,
self._args_types)
try:
result = self._action(*arguments_values)
self._create_task_var(result=result,
success=True)
if self._return_type:
self._check_return_type(result)
if self._set:
self._set_variables(result)
return self._handlers
except Exception as error:
if self._essential and not isinstance(error, ActionError):
raise TaskError(
f"essential task '{self._name or self._id}' is"
f" failed. Reason: {error}")
self._create_task_var(success=False,
error_message=str(error))
return set()
def _get_action_info(self, action: Callable):
'''Метод для получения информации о функции действия.'''
arguments = [arg.name for arg in inspect.signature(
action).parameters.values()]
annotation = action.__annotations__
action_types = []
for argument in arguments:
action_types.append(annotation.get(argument, None))
return arguments, action_types, annotation.get('return', None)
def _update_arguments(self, args_sources: List[Any]) -> tuple:
'''Метод для обновления и установки аргументов функции действия.'''
arguments = []
# Проверяем количество аргументов.
args_length = len(self._arg_names)
if 'output' in self._arg_names:
if (args_length - 1) != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length - 1} arguments"
f" but {len(args_sources)}"
" is given")
elif args_length != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length} arguments but"
f" {len(args_sources)} is given")
# Формируем список значений аргументов.
args_iterator = iter(args_sources)
for arg_name in self._arg_names:
if arg_name == 'output':
arguments.append(self._output)
else:
argument = next(args_iterator)
if isinstance(argument, str):
try:
variable = DependenceAPI.find_variable(
argument,
self._datavars,
current_namespace=self._namespace)
arguments.append(variable)
except DependenceError as error:
raise TaskInitializationError(str(error))
elif isinstance(argument, Static):
arguments.append(argument.value)
else:
arguments.append(argument)
return tuple(arguments)
def _get_args_values(self, args: List[Any],
args_types: List[type]) -> List[Any]:
'''Метод для получения значений из списка аргументов функции action.'''
arguments_values = []
for arg, arg_type in zip(args, args_types):
if isinstance(arg, VariableNode):
value = arg.get_value()
if arg.variable_type is HashType:
value = value.get_hash()
elif arg.variable_type is TableType:
value = value.get_table()
arguments_values.append(self._check_type(value, arg_type))
else:
arguments_values.append(self._check_type(arg, arg_type))
return arguments_values
def _check_type(self, value: Any, arg_type: type) -> Any:
'''Метод для проверки типа аргумента.'''
if arg_type is None or isinstance(value, arg_type):
return value
else:
raise TaskError(f"'{arg_type}' is expected, not {type(value)}")
def _create_task_var(self,
result: Any = None,
success: bool = True,
error_message: str = None) -> None:
'''Метод для создания переменной задачи, в которую отправляются
результаты вычислений, имеющихся в функции action.'''
VariableNode(self._id, self._script_namespace, variable_type=HashType,
source={'result': result,
'success': success,
'error_message': error_message})
def _get_set_variables(self, to_set: Union[str, list, tuple, dict],
datavars: Union[NamespaceNode, Datavars],
namespace: NamespaceNode) -> Union[str, list, dict]:
'''Метод для поиска переменных, в которые нужно положить значения,
полученные в результате вычислений в action.'''
if isinstance(to_set, str):
return DependenceAPI.find_variable(to_set, datavars,
current_namespace=namespace)
elif isinstance(to_set, (list, tuple)):
vars_to_set = []
for var in to_set:
vars_to_set.append(
DependenceAPI.find_variable(
var, datavars,
current_namespace=namespace))
return vars_to_set
elif isinstance(to_set, dict):
for key in to_set:
variable = DependenceAPI.find_variable(
to_set[key], datavars,
current_namespace=namespace)
to_set[key] = variable
return to_set
else:
raise TaskError(f"set value must be {dict}, {list}, {tuple} or "
f"{str}, not {type(to_set)}")
def _set_variables(self, result: Any) -> None:
'''Метод для присвоения переменным результатов работы функции action.
'''
self._check_set_vars_types(result)
if isinstance(self._set, dict):
if not isinstance(result, dict):
raise TaskError(f"set parameter value is '{dict}' but action"
f" result is '{type(result)}'")
for key in self._set:
if key in result:
self._set[key].set(result[key])
elif isinstance(self._set, (list, tuple)):
for var in self._set:
var.set(result)
else:
self._set.set(result)
def _check_set_vars_types(self, result: Any) -> None:
'''Метод для проверки соответствия типов значений, полученных из
action, и переменных, в которые нужно положить эти значения.'''
# Теперь проверяем соответствие типа переменных из set и типа значений.
if isinstance(self._set, dict):
for key, variable in self._set.items():
if (key in result and not
isinstance(result[key],
variable.variable_type.python_type)):
raise TaskError("can not set value of"
f" '{type(result[key])}' type to the"
" variable of"
f" '{variable.variable_type.name}'"
" type")
elif isinstance(self._set, (list, tuple)):
for variable in self._set:
if not isinstance(result, variable.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{variable.variable_type.name}' type")
else:
if not isinstance(result, self._set.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{self._set.variable_type.name}' type")
def _check_return_type(self, result: Any) -> None:
'''Метод для проверки типа значения, возвращенного функцией action.'''
if not isinstance(result, self._return_type):
raise TaskError(f"action returned value of '{type(result)}' "
f"type, but expected is '{self._return_type}'")
class Loop:
'''Базовый класс всех объектов цикла для блоков.'''
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
pass
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
yield False
class For(Loop):
'''Класс цикла блока, использующего указанное итерируемое значение.'''
def __init__(self, value: str, iter_arg: Union[str, Iterable]):
self._iter_arg = iter_arg
self._item_name = value
self._iterable_value = None
self._item_variable = None
self._initialized = False
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._iterable_value = self._get_iterable_value(self._iter_arg,
datavars,
namespace)
self._item_variable = self._get_item_variable(self._item_name,
script_namespace)
self._initialized = True
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
if not self._initialized:
yield False
else:
for item in self._iterable_value:
self._item_variable.source = item
yield True
yield False
def _get_iterable_value(self, iter_argument: Union[str, Iterable],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode) -> Iterable:
'''Метод для получения и проверки значения, которое будем далее
итерировать.'''
value = self._iter_arg
if isinstance(self._iter_arg, str):
variable = DependenceAPI.find_variable(self._iter_arg,
datavars,
current_namespace=namespace)
value = variable.get_value()
if variable.variable_type is HashType:
value = value.get_hash()
elif variable.variable_type is TableType:
value = value.table_hash()
if not isinstance(value, Iterable):
raise TaskError("For() argument '{value}' is not iterable.")
if isinstance(value, Mapping):
value = value.items()
return value
def _get_item_variable(self, item_name: str,
script_namespace: NamespaceNode) -> VariableNode:
'''Метод для получения или создания переменной, в которой будет
храниться текущее значение, взятое из итерируемого значения.'''
if item_name in script_namespace:
item_variable = script_namespace[item_name]
else:
item_variable = VariableNode(item_name, script_namespace)
return item_variable
class While(Loop):
'''Класс цикла блока, аналогичного while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: NamespaceNode = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
while True:
condition_result = self._condition(self._datavars,
self._namespace,
self._script_namespace)
yield condition_result
class Until(Loop):
'''Класс цикла блока, аналогичного конструкции do ... while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: Datavars = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
while True:
yield self._condition(self._datavars,
self._namespace,
self._script_namespace)
class Block:
'''Класс блока задач.'''
def __init__(self, *tasks: List[Task],
when: Union['Var', None] = None,
loop: Loop = Loop()):
self._tasks: List[Task] = tasks
self._rescue_tasks: List[Task] = None
self._condition: 'Var' = when
if not isinstance(loop, Loop):
raise TaskError('loop block parameter must be Loop type')
self._loop: Loop = loop
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в блоке.'''
handlers = set()
if (self._condition is None or self._condition(datavars, namespace,
script_namespace)):
self._loop.initialize_loop(datavars, script_namespace, namespace)
looper = self._loop.get_looper()
while next(looper):
for task in self._tasks:
try:
output = task.run(output, datavars, script_namespace,
namespace)
handlers.update(output)
except Exception as error:
if self._rescue_tasks is not None:
self._run_rescue(output, datavars,
script_namespace, namespace)
if isinstance(error, TaskError):
error.handlers.update(handlers)
raise error
return handlers
def _run_rescue(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска задач, указанных в rescue. Эти задачи выполняются,
если возникает ошибка при выполнении важной задачи, содержащейся в
блоке.'''
for task in self._rescue_tasks:
try:
task.run(output, datavars, script_namespace, namespace)
except Exception:
# TODO разобраться с тем, что делать если ошибка закралась в
# задачи из rescue.
pass
def rescue(self, *tasks: List[Task]) -> 'Block':
'''Метод для задания задач, выполняемых, если некоторая важная задача,
содержащаяся в блоке выполняется с ошибкой.'''
self._rescue_tasks = tasks
return self
class Handler:
'''Класс обработчика -- объекта скрипта, имеющего идентификатор,
запускаемого в конце выполнения скрипта в том случае, если его
идентификатор присутствовал в параметре notify хотя бы у одной успешно
выполненной задачи.'''
def __init__(self, handler_id, *tasks: List[Task]):
self._id: str = handler_id
self._tasks: List[Task] = tasks
@property
def id(self) -> str:
return self._id
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в обработчике.'''
handlers = set()
for task in self._tasks:
try:
handlers.update(task.run(output, datavars, script_namespace,
namespace))
except Exception as error:
output.set_error(f"error during execution handle '{self._id}'"
f" in task '{task._name or task._id}':"
f" {str(error)}")
return handlers
class ConditionValue:
'''Базовый класс для всех объектов, предназначенных для создания условий.
'''
def __init__(self, value: Any):
self.value: Any = value
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.value
class Var(ConditionValue):
'''Класс для создания условий выполнения задач.'''
def __init__(self, *variable: List[VariableNode],
value: Any = None, other: Any = None,
tasks: Union[List[str], None] = None,
function: Union[Callable, None] = None):
if variable:
self.variable: Union[VariableNode, str, None] = variable[0]
else:
self.variable = None
self.value: Any = value
self.other: Any = other
self.function: Callable = function
self.tasks: List[str] = tasks
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для расчета итогового значения ноды условия выполнения задачи.
'''
if self.function is not None:
if self.tasks:
return self.function(self.tasks, script_namespace)
elif self.other is not None:
return self.function(self.get_value(datavars, namespace,
script_namespace),
self.get_other(datavars, namespace,
script_namespace))
else:
return self.function(self.get_value(datavars, namespace,
script_namespace))
return self.get_value(datavars, namespace, script_namespace)
def get_value(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения собственного значения ноды.'''
if self.value is not None:
return self.value.get(datavars, namespace, script_namespace)
elif self.variable is not None:
if isinstance(self.variable, str):
self.variable = DependenceAPI.find_variable(
self.variable, datavars,
current_namespace=namespace)
return self.variable.get_value()
else:
raise ConditionError('Can not get value for condition')
def get_other(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения значения ноды, необходимой для выполнения
бинарной операции.'''
if isinstance(self.other, ConditionValue):
return self.other.get(datavars, namespace, script_namespace)
else:
return self.other
def __call__(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.get(datavars, namespace, script_namespace)
def __eq__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x == y)
def __ne__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x != y)
def __lt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x < y)
def __gt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x > y)
def __le__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x <= y)
def __ge__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x >= y)
def __and__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x and y)
def __or__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x or y)
def __invert__(self) -> 'Var':
return Var(value=self, function=lambda x: not x)
def __lshift__(self, other: Any) -> 'Var':
'''Метод для переопределения операции <<, которая теперь играет роль
in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def has(self, other: Any) -> 'Var':
'''Метод аналогичный операции in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def match(self, pattern: str) -> 'Var':
'''Метод для проверки с помощью регулярки, если в строке нечто,
соответствующее паттерну.'''
return Var(value=self, other=pattern,
function=lambda x, y: bool(re.search(y, x)))
def regex(self, pattern: str, repl: str) -> 'Var':
'''Метод для реализации sub'''
return Var(value=self, other=(pattern, repl),
function=lambda x, y: re.sub(*y, x))
def replace(self, original: str, repl: str) -> 'Var':
'''Метод для реализации replace.'''
return Var(value=self, other=(original, repl),
function=lambda x, y: x.replace(*y))
def __repr__(self) -> 'Var':
return ("<Condition value>")
def Done(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены.'''
return Var(tasks=tasks,
function=_is_done)
def DoneAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены.'''
return Var(tasks=tasks,
function=_is_done_any)
def Success(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены и притом выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success)
def SuccessAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success_any)
def Failed(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы не выполнены или выполнены с ошибкой.'''
return Var(tasks=tasks,
function=_is_failed)
def FailedAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые не выполнены или выполнены с ошибкой.
'''
return Var(tasks=tasks,
function=_is_failed_any)
def _is_done(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены.'''
for task in tasks:
if task not in script_namespace:
return False
return True
def _is_done_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены.'''
for task in tasks:
if task in script_namespace:
return True
return False
def _is_success(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены успешно.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_success_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены успешно.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return True
return False
def _is_failed(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_failed_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return True
return False