Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

1132 righe
50 KiB

Questo file contiene caratteri Unicode ambigui!

Questo file contiene caratteri Unicode ambigui che possono essere confusi con altri nella tua localizzazione attuale. Se il tuo caso di utilizzo è intenzionale e legittimo, puoi tranquillamente ignorare questo avviso. Usa il pulsante Escape per evidenziare questi caratteri.

# vim: fileencoding=utf-8
#
import re
import inspect
from typing import Callable, Any, Union, List, Generator, Optional
from calculate.templates.template_processor import DirectoryProcessor
from calculate.variables.datavars import (
DependenceAPI,
DependenceError,
VariableNode,
NamespaceNode,
HashType,
TableType,
StringType,
)
from calculate.variables.loader import Datavars
from calculate.utils.io_module import IOModule
from collections.abc import Iterable, Mapping
class TaskInitializationError(Exception):
pass
class ScriptError(Exception):
pass
class TaskError(Exception):
def __init__(self, message, handlers=set()):
super().__init__(message)
self.handlers = handlers
class ActionError(Exception):
'''Класс исключения для создания штатных исключений в функциях действий.'''
pass
class ConditionError(Exception):
pass
class Script:
'''Класс скрипта, собирает задачи, обработчики, блоки и прочее, хранит их
в себе. Создает экземпляры лаунчеров.'''
def __init__(self, script_id: str,
args: List[Any] = [],
success_message: Optional[str] = None,
failed_message: Optional[str] = None,
interrupt_message: Optional[str] = None):
self._id: str = script_id
self.__items: List[Union['Task', 'Block', 'Handler', 'Run']] = None
self.__tasks_is_set: bool = False
self.__args: list = args
self.__success_message: Optional[str] = success_message
self.__failed_message: Optional[str] = failed_message
self.__interrupt_message: Optional[str] = interrupt_message
@property
def id(self) -> str:
return self._id
@property
def args(self) -> list:
return self.__args
@property
def items(self) -> list:
return self.__items
@property
def success_message(self) -> str:
return self.__success_message
@property
def failed_message(self) -> str:
return self.__failed_message
@property
def interrupt_message(self) -> str:
return self.__interrupt_message
def tasks(self, *items: List[Union['Task', 'Block', 'Handler', 'Run']]
) -> 'Script':
'''Метод для указания задач и контейнеров, из которых состоит скрипт.
'''
if self.__tasks_is_set:
raise ScriptError("Script object is immutable.")
self.__items = items
self.__tasks_is_set = bool(self.__items)
return self
def make_launcher(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode = None) -> 'ScriptLauncher':
'''Метод для создания экземпляра объекта, для запуска данного
скрипта.'''
return ScriptLauncher(self, output, datavars, namespace)
class ScriptLauncher:
def __init__(self, script: Script,
output: IOModule,
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode):
'''Метод для инициализации, состоящей в установке модулей вывода,
модуля переменных, текущего пространства имен, а также создании
переменных скрипта.'''
self._script = script
self._output = output
self._datavars = datavars
self._namespace = namespace
self._script_namespace, self._args_vars = self.make_script_variables(
script.id,
script.args,
datavars)
def run(self) -> None:
'''Метод для запуска задач, содержащихся в данном скрипте.'''
essential_error = None
founded_handlers = []
handlers_to_run = set()
for task_item in self._script.items:
if isinstance(task_item, Handler):
founded_handlers.append(task_item)
elif essential_error is None:
try:
output = task_item.run(self._output,
self._datavars,
self._script_namespace,
self._namespace)
handlers_to_run.update(output)
except Exception as error:
if isinstance(error, TaskError):
handlers_to_run.update(error.handlers)
essential_error = error
for handler in founded_handlers:
if handler.id not in handlers_to_run:
continue
try:
handler.run(self._output, self._datavars,
self._script_namespace, self._namespace)
except Exception as error:
# TODO Разобраться что делать с ошибками в обработчике.
self._output.set_error(f"error in handler '{task_item.id}':"
f" {str(error)}")
if essential_error is not None:
raise essential_error
@staticmethod
def make_script_variables(script_id,
args: List[Any],
datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод для создания переменных скрипта. Создает пространство имен
скрипта и переменных аргументов.'''
if 'scripts' not in datavars:
scripts = ScriptLauncher.make_scripts_namespace(datavars)
else:
scripts = datavars.scripts
if script_id not in scripts:
current_script = NamespaceNode(script_id)
scripts.add_namespace(current_script)
else:
current_script = scripts[script_id]
# current_script.clear()
args_vars = []
for arg in args:
if arg not in current_script._variables:
args_vars.append(VariableNode(arg, current_script))
else:
args_vars.append(current_script._variables[arg])
return current_script, args_vars
@staticmethod
def make_scripts_namespace(datavars: Union[Datavars, NamespaceNode]
) -> NamespaceNode:
'''Метод создающий пространства имен, необходимые для работы задач.'''
# Для тестирования скорее.
scripts = NamespaceNode('scripts')
datavars.add_namespace(scripts)
env = NamespaceNode('env')
scripts.add_namespace(env)
loop = NamespaceNode('loop')
env.add_namespace(loop)
return scripts
def __call__(self, *args) -> None:
'''Метод для запуска скрипта с аргументами.'''
if len(args) < len(self._script.args):
raise ScriptError(
(f"script '{self._script.id}' missing"
f" {len(self._script.args) - len(args)} required"
" arguments: '") +
"', '".join(self._script.args[len(args):]) + "'")
elif len(args) > len(self._script.args):
raise ScriptError(f"script '{self._script.id}' takes"
f" {len(self._script.args)} arguments, but"
f" {len(args)} were given")
args_values = self._get_args_values(args, self._datavars,
self._namespace)
for arg_variable, arg_value in zip(self._args_vars, args_values):
arg_variable.source = arg_value
self.run()
def _get_args_values(self, args: List[Any],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode) -> list:
'''Метод для получения значений аргументов.'''
args_values = []
for argument in args:
if isinstance(argument, str):
variable = DependenceAPI.find_variable(
argument, datavars,
current_namespace=namespace)
args_values.append(variable.get_value())
elif isinstance(argument, Static):
args_values.append(argument.value)
else:
args_values.append(argument)
return args_values
class RunTemplates:
'''Класс запускателя наложения шаблонов.'''
def __init__(self, id: str = '',
action: str = '',
package: str = '',
chroot_path: str = '',
root_path: str = '',
dbpkg: bool = True,
essential: bool = True,
**group_packages):
self._id: str = id
self._action: str = action
self._package: str = package or None
self._chroot_path: str = chroot_path or None
self._root_path: str = root_path or None
self._dbpkg = dbpkg
self._essential = essential
self._groups: dict = group_packages
def _initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode]):
# Проверяем наличие идентификатора.
if not self._id:
error_msg = "id is not set for templates runner"
if self._essential:
raise TaskError(error_msg)
else:
output.set_error(error_msg)
return set()
# Проверяем наличие параметра action, который обязателен.
if not self._action:
error_msg = ("action parameter is not set for templates runner"
f" '{self._id}'")
if self._essential:
raise TaskError(error_msg)
else:
output.set_error(error_msg)
return set()
# Если установлен chroot_path -- устанавливаем значение соответствующей
# переменной.
if self._chroot_path is not None:
if 'cl_chroot_path' in datavars.main:
datavars.main['cl_chroot_path'].source = self._chroot_path
else:
VariableNode("cl_chroot_path", datavars.main,
variable_type=StringType,
source=self._chroot_path)
# Если установлен root_path -- устанавливаем значение соответствующей
# переменной.
if self._root_path is not None:
if 'cl_root_path' in datavars.main:
datavars.main['cl_root_path'].source = self._root_path
else:
VariableNode("cl_root_path", datavars.main,
variable_type=StringType,
source=self._chroot_path)
if self._groups:
groups = list(self._groups.keys())
for group, atoms in groups:
if isinstance(atoms, str):
self._groups[group] = [name.strip() for name
in atoms.split(',')]
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска наложения шаблонов.'''
self._initialize(output, datavars)
template_processor = DirectoryProcessor(self._action,
datavars_module=datavars,
package=self._package,
output_module=output,
dbpkg=self._dbpkg,
namespace=namespace,
**self._groups)
changed_files = template_processor.process_template_directories()
self._create_result_var(script_namespace,
changed_files=changed_files)
return set()
def _create_result_var(self, script_namespace: NamespaceNode,
changed_files: dict = {},
skipped: list = []) -> None:
'''Метод для создания переменной задачи, в которую отправляются
результаты вычислений, имеющихся в функции action.'''
VariableNode(self._id, script_namespace, variable_type=HashType,
source={"changed": changed_files,
"skipped": skipped})
class Run:
'''Класс запускателя скриптов.'''
def __init__(self, script: Script, namespace: str = None,
args: List[Any] = [],
when: 'Var' = None,
essential: bool = True):
self._script: Script = script
self._namespace: str = namespace
self._args: List[Any] = args
self._condition: Var = when
self._essential = essential
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
parent_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска скрипта, указанного в запускателе.'''
if self._condition is None or self._condition(datavars,
namespace,
parent_namespace):
# пространство имен или наследуем, либо берем по указанному пути.
if not self._namespace:
self._namespace = namespace
elif isinstance(self._namespace, str):
parts = self._namespace.split('.')
parts.reverse()
self._namespace = datavars[parts.pop()]
while parts:
self._namespace = self._namespace[parts.pop()]
try:
# if not self._script._initialized:
launcher = self._script.make_launcher(
output, datavars,
namespace=self._namespace)
launcher(*self._args)
except (ScriptError, TaskError) as error:
if self._essential:
raise ScriptError(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
output.set_error(f"essential script '{self._script._id}'"
f" is failed. Reason: {error}")
return set()
class Static:
'''Класс, оборачивающий значение, которое не должно восприниматься как
переменная.'''
def __init__(self, value: Any):
self._value: Any = value
@property
def value(self) -> Any:
return self._value
class Task:
'''Класс реализующий задачи -- объекты, исполняющие указанные функции
действия action, и содержащий всю необходимую информацию для их работы.'''
def __init__(self, **kwargs):
if 'id' in kwargs:
self._id: str = kwargs['id']
else:
raise TaskInitializationError("id is not set for task")
# Имя не обязательно.
self._name: str = kwargs.get('name', None)
if 'action' in kwargs:
self._action = kwargs['action']
else:
raise TaskInitializationError("action is not set for task"
f" '{self._name or self._id}'")
# Список аргументов и их типы
self._args_sources: Union[tuple, list] = kwargs.get('args', [])
self._args_types: List[type] = []
self._arg_names: List[str] = []
self._args: Union[tuple, list] = []
# Переменные, которым через set будет присвоен результат работы action.
self._set: Union[list, tuple] = kwargs.get('set', None)
self._return_type: type = None
# Условие выполнения задачи.
self._condition: Var = kwargs.get('when', None)
# Существенность задачи. Если задача существенная, ошибка в ней
# приведет к остановке скрипта, если несущественная -- ошибка будет
# записана в переменную этой задачи, а скрипт будет выполняться дальше.
self._essential: bool = kwargs.get('essential', True)
# Параметр для указания обработчиков, которые нужно запустить в конце
# выполнения скрипта.
self._handlers = set()
handlers: set = kwargs.get('notify', set())
if handlers:
if isinstance(handlers, (tuple, list)):
self._handlers.update(handlers)
else:
self._handlers.add(handlers)
self._initialized: bool = False
self._namespace: NamespaceNode = None
self._script_namespace: NamespaceNode = None
self._datavars: Union[Datavars, NamespaceNode] = None
self._output: IOModule = None
@property
def essential(self) -> bool:
return self._essential
@property
def initialized(self) -> bool:
return self._initialized
def _initialize(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> 'Task':
'''Метод для инициализации задач, привязывает задачу к указанному
модулю переменных, модулю вывода, формирует список аргументов,
определяет их типы и находит переменные, к которым в последствии
нужно будет присвоить значение, возвращенное action.'''
self._datavars = datavars
self._output = output
self._namespace = namespace
self._script_namespace = script_namespace
if not self._initialized:
self._arg_names,\
self._args_types,\
self._return_type = self._get_action_info(self._action)
self._args = self._update_arguments(self._args_sources)
if self._set is not None:
self._set = self._get_set_variables(self._set, datavars,
namespace)
self._initialized = True
else:
self._args = self._update_arguments(self._args_sources)
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> set:
'''Метод для запуска данной задачи.'''
self._initialize(output, datavars, script_namespace, namespace)
if (self._condition is None or self._condition(
self._datavars,
self._namespace,
self._script_namespace)):
arguments_values = self._get_args_values(self._args,
self._args_types)
try:
result = self._action(*arguments_values)
self._create_task_var(result=result,
success=True)
if self._return_type:
self._check_return_type(result)
if self._set:
self._set_variables(result)
return self._handlers
except Exception as error:
if self._essential and not isinstance(error, ActionError):
raise TaskError(
f"essential task '{self._name or self._id}' is"
f" failed. Reason: {error}")
self._create_task_var(success=False,
error_message=str(error))
return set()
def _get_action_info(self, action: Callable):
'''Метод для получения информации о функции действия.'''
arguments = [arg.name for arg in inspect.signature(
action).parameters.values()]
annotation = action.__annotations__
action_types = []
for argument in arguments:
action_types.append(annotation.get(argument, None))
return arguments, action_types, annotation.get('return', None)
def _update_arguments(self, args_sources: List[Any]) -> tuple:
'''Метод для обновления и установки аргументов функции действия.'''
arguments = []
# Проверяем количество аргументов.
args_length = len(self._arg_names)
if 'output' in self._arg_names:
if (args_length - 1) != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length - 1} arguments"
f" but {len(args_sources)}"
" is given")
elif args_length != len(self._args_sources):
raise TaskInitializationError(f"action function takes"
f" {args_length} arguments but"
f" {len(args_sources)} is given")
# Формируем список значений аргументов.
args_iterator = iter(args_sources)
for arg_name in self._arg_names:
if arg_name == 'output':
arguments.append(self._output)
else:
argument = next(args_iterator)
if isinstance(argument, str):
try:
variable = DependenceAPI.find_variable(
argument,
self._datavars,
current_namespace=self._namespace)
arguments.append(variable)
except DependenceError as error:
raise TaskInitializationError(str(error))
elif isinstance(argument, Static):
arguments.append(argument.value)
else:
arguments.append(argument)
return tuple(arguments)
def _get_args_values(self, args: List[Any],
args_types: List[type]) -> List[Any]:
'''Метод для получения значений из списка аргументов функции action.'''
arguments_values = []
for arg, arg_type in zip(args, args_types):
if isinstance(arg, VariableNode):
value = arg.get_value()
if arg.variable_type is HashType:
value = value.get_hash()
elif arg.variable_type is TableType:
value = value.get_table()
arguments_values.append(self._check_type(value, arg_type))
else:
arguments_values.append(self._check_type(arg, arg_type))
return arguments_values
def _check_type(self, value: Any, arg_type: type) -> Any:
'''Метод для проверки типа аргумента.'''
if arg_type is None or isinstance(value, arg_type):
return value
else:
raise TaskError(f"'{arg_type}' is expected, not {type(value)}")
def _create_task_var(self,
result: Any = None,
success: bool = True,
error_message: str = None) -> None:
'''Метод для создания переменной задачи, в которую отправляются
результаты вычислений, имеющихся в функции action.'''
VariableNode(self._id, self._script_namespace, variable_type=HashType,
source={'result': result,
'success': success,
'error_message': error_message})
def _get_set_variables(self, to_set: Union[str, list, tuple, dict],
datavars: Union[NamespaceNode, Datavars],
namespace: NamespaceNode) -> Union[str, list, dict]:
'''Метод для поиска переменных, в которые нужно положить значения,
полученные в результате вычислений в action.'''
if isinstance(to_set, str):
return DependenceAPI.find_variable(to_set, datavars,
current_namespace=namespace)
elif isinstance(to_set, (list, tuple)):
vars_to_set = []
for var in to_set:
vars_to_set.append(
DependenceAPI.find_variable(
var, datavars,
current_namespace=namespace))
return vars_to_set
elif isinstance(to_set, dict):
for key in to_set:
variable = DependenceAPI.find_variable(
to_set[key], datavars,
current_namespace=namespace)
to_set[key] = variable
return to_set
else:
raise TaskError(f"set value must be {dict}, {list}, {tuple} or "
f"{str}, not {type(to_set)}")
def _set_variables(self, result: Any) -> None:
'''Метод для присвоения переменным результатов работы функции action.
'''
self._check_set_vars_types(result)
if isinstance(self._set, dict):
if not isinstance(result, dict):
raise TaskError(f"set parameter value is '{dict}' but action"
f" result is '{type(result)}'")
for key in self._set:
if key in result:
self._set[key].set(result[key])
elif isinstance(self._set, (list, tuple)):
for var in self._set:
var.set(result)
else:
self._set.set(result)
def _check_set_vars_types(self, result: Any) -> None:
'''Метод для проверки соответствия типов значений, полученных из
action, и переменных, в которые нужно положить эти значения.'''
# Теперь проверяем соответствие типа переменных из set и типа значений.
if isinstance(self._set, dict):
for key, variable in self._set.items():
if (key in result and not
isinstance(result[key],
variable.variable_type.python_type)):
raise TaskError("can not set value of"
f" '{type(result[key])}' type to the"
" variable of"
f" '{variable.variable_type.name}'"
" type")
elif isinstance(self._set, (list, tuple)):
for variable in self._set:
if not isinstance(result, variable.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{variable.variable_type.name}' type")
else:
if not isinstance(result, self._set.variable_type.python_type):
raise TaskError(f"can not set value of '{type(result)}'"
" type to the variable of"
f" '{self._set.variable_type.name}' type")
def _check_return_type(self, result: Any) -> None:
'''Метод для проверки типа значения, возвращенного функцией action.'''
if not isinstance(result, self._return_type):
raise TaskError(f"action returned value of '{type(result)}' "
f"type, but expected is '{self._return_type}'")
class Loop:
'''Базовый класс всех объектов цикла для блоков.'''
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
pass
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
yield False
class For(Loop):
'''Класс цикла блока, использующего указанное итерируемое значение.'''
def __init__(self, value: str, iter_arg: Union[str, Iterable]):
self._iter_arg = iter_arg
self._item_name = value
self._iterable_value = None
self._item_variable = None
self._initialized = False
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._iterable_value = self._get_iterable_value(self._iter_arg,
datavars,
namespace)
self._item_variable = self._get_item_variable(self._item_name,
script_namespace)
self._initialized = True
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
if not self._initialized:
yield False
else:
for item in self._iterable_value:
self._item_variable.source = item
yield True
yield False
def _get_iterable_value(self, iter_argument: Union[str, Iterable],
datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode) -> Iterable:
'''Метод для получения и проверки значения, которое будем далее
итерировать.'''
value = self._iter_arg
if isinstance(self._iter_arg, str):
variable = DependenceAPI.find_variable(self._iter_arg,
datavars,
current_namespace=namespace)
value = variable.get_value()
if variable.variable_type is HashType:
value = value.get_hash()
elif variable.variable_type is TableType:
value = value.table_hash()
if not isinstance(value, Iterable):
raise TaskError("For() argument '{value}' is not iterable.")
if isinstance(value, Mapping):
value = value.items()
return value
def _get_item_variable(self, item_name: str,
script_namespace: NamespaceNode) -> VariableNode:
'''Метод для получения или создания переменной, в которой будет
храниться текущее значение, взятое из итерируемого значения.'''
if item_name in script_namespace:
item_variable = script_namespace[item_name]
else:
item_variable = VariableNode(item_name, script_namespace)
return item_variable
class While(Loop):
'''Класс цикла блока, аналогичного while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: NamespaceNode = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
while True:
condition_result = self._condition(self._datavars,
self._namespace,
self._script_namespace)
yield condition_result
class Until(Loop):
'''Класс цикла блока, аналогичного конструкции do ... while.'''
def __init__(self, condition: 'Var'):
self._condition: Var = condition
self._datavars: Union[Datavars, NamespaceNode] = None
self._namespace: Datavars = None
def initialize_loop(self, datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для инициализации объекта цикла.'''
self._datavars: Union[Datavars, NamespaceNode] = datavars
self._namespace: Union[NamespaceNode, None] = namespace
self._script_namespace: NamespaceNode = script_namespace
def get_looper(self) -> Generator[bool, bool, None]:
'''Генератор, предназначенный для управления циклом при запуске блока
задач.'''
yield True
while True:
yield self._condition(self._datavars,
self._namespace,
self._script_namespace)
class Block:
'''Класс блока задач.'''
def __init__(self, *tasks: List[Task],
when: Union['Var', None] = None,
loop: Loop = Loop()):
self._tasks: List[Task] = tasks
self._rescue_tasks: List[Task] = None
self._condition: 'Var' = when
if not isinstance(loop, Loop):
raise TaskError('loop block parameter must be Loop type')
self._loop: Loop = loop
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в блоке.'''
handlers = set()
if (self._condition is None or self._condition(datavars, namespace,
script_namespace)):
self._loop.initialize_loop(datavars, script_namespace, namespace)
looper = self._loop.get_looper()
while next(looper):
for task in self._tasks:
try:
output = task.run(output, datavars, script_namespace,
namespace)
handlers.update(output)
except Exception as error:
if self._rescue_tasks is not None:
self._run_rescue(output, datavars,
script_namespace, namespace)
if isinstance(error, TaskError):
error.handlers.update(handlers)
raise error
return handlers
def _run_rescue(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска задач, указанных в rescue. Эти задачи выполняются,
если возникает ошибка при выполнении важной задачи, содержащейся в
блоке.'''
for task in self._rescue_tasks:
try:
task.run(output, datavars, script_namespace, namespace)
except Exception:
# TODO разобраться с тем, что делать если ошибка закралась в
# задачи из rescue.
pass
def rescue(self, *tasks: List[Task]) -> 'Block':
'''Метод для задания задач, выполняемых, если некоторая важная задача,
содержащаяся в блоке выполняется с ошибкой.'''
self._rescue_tasks = tasks
return self
class Handler:
'''Класс обработчика -- объекта скрипта, имеющего идентификатор,
запускаемого в конце выполнения скрипта в том случае, если его
идентификатор присутствовал в параметре notify хотя бы у одной успешно
выполненной задачи.'''
def __init__(self, handler_id, *tasks: List[Task]):
self._id: str = handler_id
self._tasks: List[Task] = tasks
@property
def id(self) -> str:
return self._id
def run(self, output: IOModule,
datavars: Union[Datavars, NamespaceNode],
script_namespace: NamespaceNode,
namespace: NamespaceNode) -> None:
'''Метод для запуска выполнения задач, содержащихся в обработчике.'''
handlers = set()
for task in self._tasks:
try:
handlers.update(task.run(output, datavars, script_namespace,
namespace))
except Exception as error:
output.set_error(f"error during execution handle '{self._id}'"
f" in task '{task._name or task._id}':"
f" {str(error)}")
return handlers
class ConditionValue:
'''Базовый класс для всех объектов, предназначенных для создания условий.
'''
def __init__(self, value: Any):
self.value: Any = value
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.value
class Var(ConditionValue):
'''Класс для создания условий выполнения задач.'''
def __init__(self, *variable: List[VariableNode],
value: Any = None, other: Any = None,
tasks: Union[List[str], None] = None,
function: Union[Callable, None] = None):
if variable:
self.variable: Union[VariableNode, str, None] = variable[0]
else:
self.variable = None
self.value: Any = value
self.other: Any = other
self.function: Callable = function
self.tasks: List[str] = tasks
def get(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для расчета итогового значения ноды условия выполнения задачи.
'''
if self.function is not None:
if self.tasks:
return self.function(self.tasks, script_namespace)
elif self.other is not None:
return self.function(self.get_value(datavars, namespace,
script_namespace),
self.get_other(datavars, namespace,
script_namespace))
else:
return self.function(self.get_value(datavars, namespace,
script_namespace))
return self.get_value(datavars, namespace, script_namespace)
def get_value(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения собственного значения ноды.'''
if self.value is not None:
return self.value.get(datavars, namespace, script_namespace)
elif self.variable is not None:
if isinstance(self.variable, str):
self.variable = DependenceAPI.find_variable(
self.variable, datavars,
current_namespace=namespace)
return self.variable.get_value()
else:
raise ConditionError('Can not get value for condition')
def get_other(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
'''Метод для получения значения ноды, необходимой для выполнения
бинарной операции.'''
if isinstance(self.other, ConditionValue):
return self.other.get(datavars, namespace, script_namespace)
else:
return self.other
def __call__(self, datavars: Union[Datavars, NamespaceNode],
namespace: NamespaceNode,
script_namespace: NamespaceNode) -> Any:
return self.get(datavars, namespace, script_namespace)
def __eq__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x == y)
def __ne__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x != y)
def __lt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x < y)
def __gt__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x > y)
def __le__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x <= y)
def __ge__(self, other: Any) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x >= y)
def __and__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x and y)
def __or__(self, other: bool) -> 'Var':
return Var(value=self, other=other, function=lambda x, y: x or y)
def __invert__(self) -> 'Var':
return Var(value=self, function=lambda x: not x)
def __lshift__(self, other: Any) -> 'Var':
'''Метод для переопределения операции <<, которая теперь играет роль
in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def has(self, other: Any) -> 'Var':
'''Метод аналогичный операции in.'''
return Var(value=self, other=other, function=lambda x, y: y in x)
def match(self, pattern: str) -> 'Var':
'''Метод для проверки с помощью регулярки, если в строке нечто,
соответствующее паттерну.'''
return Var(value=self, other=pattern,
function=lambda x, y: bool(re.search(y, x)))
def regex(self, pattern: str, repl: str) -> 'Var':
'''Метод для реализации sub'''
return Var(value=self, other=(pattern, repl),
function=lambda x, y: re.sub(*y, x))
def replace(self, original: str, repl: str) -> 'Var':
'''Метод для реализации replace.'''
return Var(value=self, other=(original, repl),
function=lambda x, y: x.replace(*y))
def __repr__(self) -> 'Var':
return ("<Condition value>")
def Done(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены.'''
return Var(tasks=tasks,
function=_is_done)
def DoneAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены.'''
return Var(tasks=tasks,
function=_is_done_any)
def Success(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы выполнены и притом выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success)
def SuccessAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые выполнены успешно.'''
return Var(tasks=tasks,
function=_is_success_any)
def Failed(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, все ли
указанные методы не выполнены или выполнены с ошибкой.'''
return Var(tasks=tasks,
function=_is_failed)
def FailedAny(*tasks: List[str]) -> Var:
'''Функция создающая объект Var, получающий информацию о том, есть ли
среди указанных методов те, которые не выполнены или выполнены с ошибкой.
'''
return Var(tasks=tasks,
function=_is_failed_any)
def _is_done(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены.'''
for task in tasks:
if task not in script_namespace:
return False
return True
def _is_done_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены.'''
for task in tasks:
if task in script_namespace:
return True
return False
def _is_success(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных выполнены успешно.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_success_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые выполнены успешно.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return True
return False
def _is_failed(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks все ли методы
из указанных не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task in script_namespace and
script_namespace[task].get_value().get_hash()['success']):
return False
return True
def _is_failed_any(tasks, script_namespace: NamespaceNode) -> bool:
'''Функция проверяющая по содержимому пространства имен tasks есть ли среди
указанных методов те, которые не выполнены или выполнены с ошибкой.'''
for task in tasks:
if (task not in script_namespace or
not script_namespace[task].get_value().get_hash()['success']):
return True
return False