# vim: fileencoding=utf-8 # import re # import ast import dis from contextlib import contextmanager # from inspect import signature, getsource from inspect import signature # from types import FunctionType, LambdaType from types import FunctionType from calculate.utils.tools import Singleton from typing import List, Any, Union, Generator, Callable, Optional class DependenceError(Exception): pass class VariableError(Exception): pass class VariableTypeError(VariableError): pass class VariableNotFoundError(VariableError): pass class CyclicVariableError(VariableError): def __init__(self, *queue: List[str]): self.queue: List[str] = queue def __str__(self) -> str: return "Cyclic dependence in variables: {}".format(", ".join( self.queue[:-1])) class VariableType: '''Базовый класс для типов.''' name: str = 'undefined' python_type: type = None @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> Any: return value @classmethod def readonly(cls, variable_object: "VariableNode") -> None: variable_object.variable_type = cls variable_object.readonly = True class IniType(VariableType): '''Класс, соответствующий типу переменных созданных в calculate.ini файлах. ''' name: str = 'ini' python_type: type = str @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> Any: return value class StringType(VariableType): '''Класс, соответствующий типу переменных с строковым значением.''' name: str = 'string' python_type: type = str @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> str: if isinstance(value, str): return value else: try: return str(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " string variable: {reason}".format( value=value, reason=str(error))) class IntegerType(VariableType): '''Класс, соответствующий типу переменных с целочисленным значением.''' name: str = 'integer' python_type: type = int @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> int: if isinstance(value, int): return value else: try: return int(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " interger variable: {reason}".format( value=value, reason=str(error))) class FloatType(VariableType): '''Класс, соответствующий типу переменных с вещественным значением.''' name: str = 'float' python_type: type = float @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> float: if isinstance(value, float): return value else: try: return float(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " float variable: {reason}".format( value=value, reason=str(error))) class BooleanType(VariableType): '''Класс, соответствующий типу переменных с булевым значением.''' name: str = 'bool' python_type: type = bool true_values: set = {'True', 'true'} false_values: set = {'False', 'false'} @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> bool: if isinstance(value, bool): return value elif isinstance(value, str): if value in cls.true_values: return True if value in cls.false_values: return False try: return bool(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " bool variable: {reason}".format( value=value, reason=str(error))) class ListType(VariableType): name = 'list' python_type: type = list @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> list: if isinstance(value, list): return value elif isinstance(value, str): output_list = list() values = value.split(',') for value in values: output_list.append(value.strip()) return output_list try: return list(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " list variable: {reason}".format( value=value, reason=str(error))) class HashValue: '''Класс значения хэша, передающий некоторые характеристики переменной хозяина, что позволяет инвалидировать подписки переменной хозяина при любом изменении хэша или инвалидировать весь хэш при изменении одной из зависимостей.''' def __init__(self, key: Any, value: Any, master_variable: "VariableNode", parent: "Hash"): self.key: Any = key self.value: Any = value self.master_variable: "VariableNode" = master_variable self.parent: "Hash" = parent @property def subscriptions(self) -> set: return self.master_variable.subscriptions @property def subscribers(self) -> set: return self.master_variable.subscribers def get_value(self) -> str: '''Метод для получения значения хэша. Перед возвращением значения обновляет себя на наиболее актуальную версию значения хэша.''' self = self.master_variable.get_value()[self.key] return self.value def set(self, value: Any) -> None: '''Метод для задания одного значения хэша без изменения источника значения.''' current_hash = self.master_variable.get_value().get_hash() current_hash[self.key] = value self.master_variable.set(current_hash) def reset(self): '''Метод для сброса значения, установленного поверх источника.''' self.master_variable.reset() class Hash: '''Класс реализующий контейнер для хранения хэша в переменной соответствующего типа.''' def __init__(self, values: dict, master_variable: "VariableNode"): self.fixed: bool = master_variable.fixed self._values: dict = dict() self._fields: set = set() for key, value in values.items(): self._values.update({key: HashValue(key, value, master_variable, self)}) if self.fixed: self._fields.add(key) self.master_variable: "VariableNode" = master_variable self.row_index: Union[int, None] = None def get_hash(self) -> dict: '''Метод для получения словаря из значений хэша.''' dict_value = {} for key in self._values.keys(): dict_value.update({key: self._values[key].get_value()}) return dict_value def update_hash(self, values: dict) -> None: '''Метод для обновления значения хэша.''' for key, value in values.items(): if key in self._values: self._values[key].value = value elif self.fixed: raise VariableError("key '{}' is unavailable for fixed" " hash, available keys: '{}'.". format(key, ', '.join(self._fields))) else: self._values[key] = HashValue(key, value, self.master_variable, self) return self def __getattr__(self, key: str): '''Метод возвращает ноду пространства имен или значение переменной.''' if key in self._values: return self._values[key].get_value() else: raise VariableError(("'{key}' is not found in the hash" " '{hash_name}'").format( key=key, hash_name=self.master_variable.get_fullname())) def __getitem__(self, key: str) -> HashValue: if key in self._values: return self._values[key] else: raise VariableError(("'{key}' is not found in the hash" " '{hash_name}'").format( key=key, hash_name=self.master_variable.get_fullname())) def __iter__(self) -> Generator[Any, None, None]: for key in self._values.keys(): yield key def __contains__(self, key: str) -> bool: return key in self._values def __str__(self): return str(self.get_hash()) class HashType(VariableType): '''Класс, соответствующий типу переменных хэшей.''' name: str = 'hash' python_type: type = dict @classmethod def process_value(cls, values: Any, variable: "VariableNode") -> Hash: if not isinstance(values, dict): raise VariableTypeError("can not set value with type '{_type}' to" " hash variable: value must be 'dict' type" .format(_type=type(values))) if variable.value is not None: updated_hash = variable.value.update_hash(values) else: updated_hash = Hash(values, variable) return updated_hash @classmethod def fixed(cls, variable_object: "VariableNode") -> None: '''Метод, который передается переменной вместо типа, если нужно задать тип фиксированного хэша.''' variable_object.variable_type = cls variable_object.fixed = True class Table: '''Класс, соответствующий типу переменных таблиц.''' def __init__(self, values: List[dict], master_variable: "VariableNode", fields: Union[List[str], None] = None): self._rows: List[dict] = list() self.master_variable: "VariableNode" = master_variable self.columns: set = set() if fields is not None: self.columns.update(fields) else: self.columns = set(values[0].keys()) for row in values: if isinstance(row, dict): self._check_columns(row) self._rows.append(row) else: raise VariableTypeError("can not create table using value '{}'" " with type '{}'".format(row, type(row))) def get_table(self) -> List[dict]: '''Метод для получения всего списка строк таблицы.''' return self._rows def add_row(self, row: dict) -> None: '''Метод для добавления строк в таблицу.''' self._check_columns(row) self._rows.append(row) def change_row(self, row: dict, index: int) -> None: '''Метод для замены существующей строки.''' self._check_columns(row) self._rows[index] = row def clear(self) -> None: self._rows.clear() def _check_columns(self, row: dict) -> None: '''Метод для проверки наличия в хэше только тех полей, которые соответствуют заданным для таблицы колонкам.''' for column in row: if column not in self.columns: raise VariableError("unknown column value '{}'" " available: '{}'".format( column, ', '.join(self.columns))) def __getitem__(self, index: int) -> Hash: if isinstance(index, int): if index < len(self._rows): return self._rows[index] else: return {key: None for key in self.columns} raise VariableError("'{index}' index value is out of range" .format(index=index)) else: raise VariableError("Table value is not subscriptable") def __iter__(self) -> Generator[dict, None, None]: for row in self._rows: yield row def __contains__(self, key: str) -> bool: if isinstance(key, str): return key in self._values return False def __len__(self) -> int: return len(self._rows) class TableType(VariableType): name: str = 'table' # TODO Сомнительно. python_type: type = list @classmethod def process_value(self, value: List[dict], variable: "VariableNode") -> Table: if not isinstance(value, list) and not isinstance(value, Table): raise VariableTypeError("can not set value with type '{_type}' to" " hash variable: value must be 'dict' type" .format(_type=type(value))) else: if isinstance(value, Table): return value if variable.fields: return Table(value, variable, fields=variable.fields) else: return Table(value, variable) class Static: '''Класс для указания в качестве аргументов зависимостей статичных значений, а не только переменных.''' def __init__(self, value: Any): self.value: Any = value def get_value(self) -> Any: return self.value class VariableWrapper: '''Класс обертки для переменных, с помощью которого отслеживается применение переменной в образовании значения переменной от нее зависящей. ''' def __init__(self, variable: "VariableNode", subscriptions: set): self._variable: "VariableNode" = variable self._subscriptions: set = subscriptions @property def value(self): '''Метод возвращающий значение переменной и при этом добавляющий его в подписки.''' if not isinstance(self._variable, Static): self._subscriptions.add(self._variable) value = self._variable.get_value() if isinstance(value, Hash): value = value.get_hash() elif isinstance(value, Table): value = value.get_table() return value @property def subscriptions(self): return self._subscriptions @subscriptions.setter def subscriptions(self, subscriptions): self._subscriptions = subscriptions class DependenceSource: '''Класс зависимости как источника значения переменной.''' def __init__(self, variables: tuple, depend: Optional[Callable] = None): self._args: Union[tuple, list] = variables self.depend_function: Union[Callable, None] = depend self._subscriptions: set = set() self._args_founded: bool = False def check(self) -> None: '''Метод для запуска проверки корректности функции зависимости, а также сопоставления ее числу заданных зависимостей.''' if self.depend_function is None: if len(self._args) > 1: raise DependenceError('the depend function is needed if the' ' number of dependencies is more than' ' one') elif len(self._args) == 0: raise DependenceError('dependence is set without variables') else: self._check_function(self.depend_function) def calculate_value(self) -> Any: '''Метод для расчета значения переменной с использованием зависимостей и заданной для них функции.''' self._subscriptions = set() args = tuple(VariableWrapper(arg, self._subscriptions) for arg in self._args) try: if self.depend_function is None: return args[-1].value return self.depend_function(*args) except CyclicVariableError: raise except Exception as error: raise DependenceError('can not calculate using dependencies: {}' ' reason: {}'.format(', '.join( [subscription.get_fullname() for subscription in self._args]), str(error))) def _get_args(self, namespace: "NamespaceNode") -> None: '''Метод для преобразования списка аргументов функции зависимости, содержащего переменные и строки, в список аргументов состоящий только из нод переменных и значений хэшей.''' if not self._args_founded: self._args = self.find_variables(self._args, namespace) self._args_founded = True def find_variables(self, variables: Union[tuple, list], namespace: "NamespaceNode") -> List["VariableNode"]: '''Метод для поиска переменных по заданным путям к ним.''' output = [] for variable in variables: if isinstance(variable, str): variable = Dependence._get_variable( variable, current_namespace=namespace) if variable is None: raise DependenceError("variable '{}' not found". format(variable)) output.append(variable) return output @property def subscriptions(self) -> set: return self._subscriptions def _check_function(self, function_to_check: FunctionType) -> None: '''Метод для проверки того, возращает ли функция какое-либо значение, а также того, совпадает ли число подписок с числом аргументов этой функции.''' # if not isinstance(function_to_check, LambdaType): # # Если функция не лямбда, проверяем есть ли у нее возвращаемое # # значение. # source_code = getsource(function_to_check) # for node in ast.walk(ast.parse(source_code)): # if isinstance(node, ast.Return): # break # else: # raise VariableError("the depend function does not return" # " anything in variable") # Проверяем совпадение количества аргументов функции и заданных для # функции переменных. self.check_signature(function_to_check, self._args) @staticmethod def check_signature(function_to_check: Callable, arguments: Union[tuple, list]) -> None: '''Метод для проверки соответствия сигнатуры функции и заданного для нее набора аргументов.''' function_signature = signature(function_to_check) if not len(arguments) == len(function_signature.parameters): raise DependenceError("the depend function takes {} arguments," " while {} is given".format( len(function_signature.parameters), len(arguments))) def __ne__(self, other: Any) -> bool: if not isinstance(other, DependenceSource): return True return not self == other def __eq__(self, other: Any) -> bool: if not isinstance(other, DependenceSource): return False # Сначала сравниваем аргументы. for l_var, r_var in zip(self._args, other._args): if l_var != r_var: return False if self.depend_function == other.depend_function: return True if (self.depend_function is None or other.depend_function is None or not self._compare_depend_functions(self.depend_function, other.depend_function)): return False return True def _compare_depend_functions(self, l_func: FunctionType, r_func: FunctionType) -> bool: '''Метод для сравнения функций путем сравнения инструкций, полученных в результате их дизассемблирования.''' l_instructions = list(dis.get_instructions(l_func)) r_instructions = list(dis.get_instructions(r_func)) for l_instr, r_instr in zip(l_instructions, r_instructions): if l_instr.opname != r_instr.opname: return False if l_instr.arg != r_instr.arg: return False if ((l_instr.argval != l_instr.argrepr) and (r_instr.argval != r_instr.argrepr)): if r_instr.argval != l_instr.argval: return False return True class VariableNode: '''Класс ноды соответствующей переменной в дереве переменных.''' def __init__(self, name: str, namespace: "NamespaceNode", variable_type: type = VariableType, source: Any = None, fields: list = []): self.name: str = name if issubclass(variable_type, VariableType): self.variable_type: type = variable_type else: raise VariableTypeError('variable_type must be VariableType' ', but not {}'.format(type(variable_type))) self.calculating: bool = False self.fields: list = fields self.namespace: "NamespaceNode" = namespace self.namespace.add_variable(self) self.subscribers: set = set() # Список текущих подписок, для проверки их актуальности при # динамическом связывании. self._subscriptions: set = set() self.value: Any = None self._invalidated: bool = True # Флаг имеющий значение только для переменных типа HashType. # Предназначен для включения проверки соответствия полей хэша при # установке значения. self._fixed: bool = False # Источник значения переменной, может быть значением, а может быть # зависимостью. self._source: Any = source if source is not None: self.update_value() # Флаг, указывающий, что значение было изменено в процессе работы # утилит или с помощью тега set из шаблона. self.set_by_user: bool = False self._readonly: bool = False def update_value(self) -> None: '''Метод для обновления значения переменной с помощью указанного источника ее значения.''' if self.calculating: raise CyclicVariableError(self.name) if self._source is None: raise VariableError("No sources to update variable '{}'". format(self.get_fullname())) if isinstance(self._source, DependenceSource): self._source._get_args(self.namespace) with self._start_calculate(): try: value = self._source.calculate_value() # Обновляем подписки. Сначала убираем лишние. for subscription in self._subscriptions: if subscription not in self._source.subscriptions: subscription.subscribers.remove(self) # Теперь добавляем новые. for subscription in self._source.subscriptions: subscription.subscribers.add(self) self._subscriptions = self._source.subscriptions except CyclicVariableError as error: raise CyclicVariableError(self.name, *error.queue) except DependenceError as error: raise VariableError('{}: {}'.format(self.get_fullname(), str(error))) else: value = self._source if self.variable_type is TableType: self.value = self.variable_type.process_value(value, self) else: self.value = self.variable_type.process_value(value, self) self._invalidated = False def set_variable_type(self, variable_type: VariableType, readonly: Union[bool, None] = None, fixed: Union[bool, None] = None) -> None: '''Метод для установки типа переменной.''' if readonly is not None and isinstance(readonly, bool): self._readonly = readonly elif fixed is not None and isinstance(fixed, bool): self._fixed = fixed if self.variable_type is VariableType: if isinstance(variable_type, type): if issubclass(variable_type, VariableType): self.variable_type = variable_type else: raise VariableError('variable type object must be' ' VariableType or its class method,' ' not {}'.format(type(variable_type))) elif callable(variable_type): variable_type(self) def set(self, value: Any) -> None: '''Метод для установки временного пользовательского значения переменной.''' # Сбрасываем флаги, чтобы провести инвалидацию. self._invalidated = False self.set_by_user = False self.value = self.variable_type.process_value(value, self) self._invalidate(set_by_user=True) def reset(self) -> None: '''Метод для сброса пользовательского значения.''' if self.set_by_user: self._invalidated = False self.set_by_user = False self._invalidate() @property def source(self) -> None: return self._source @source.setter def source(self, source: Any) -> None: # Если источники не совпадают или текущее значение переменной было # установлено пользователем, то инвалидируем переменную и меняем # источник. if self._source != source or self.set_by_user: self.set_by_user = False self._invalidate() self._source = source @property def readonly(self) -> bool: return self._readonly @readonly.setter def readonly(self, value: bool) -> None: # if self.value is None and self._source is not None: # # TODO выводить предупреждение если переменная инвалидирована, # # нет источника и при этом устанавливается флаг readonly. # self.update_value() self._readonly = value @property def fixed(self) -> bool: return self._fixed @fixed.setter def fixed(self, value: bool) -> bool: self._fixed = value def _invalidate(self, set_by_user: bool = False) -> None: '''Метод для инвалидации данной переменной и всех зависящих от нее переменных.''' if not self._invalidated and not self.set_by_user: self._invalidated = True self.set_by_user = set_by_user for subscriber in self.subscribers: subscriber._invalidate() @contextmanager def _start_calculate(self) -> Generator["VariableNode", None, None]: '''Менеджер контекста устанавливающий флаг, указывающий, что данная переменная в состоянии расчета. В данном случае необходима только для того, чтобы при получении значения параметра внутри метода для расчета не иницировалось ее обновление, которое может привести к рекурсии.''' try: self.calculating = True yield self finally: self.calculating = False def get_value(self) -> Any: '''Метод для получения значения переменной.''' if self._invalidated and self._source is None and self._value is None: return None if self._invalidated and not self.set_by_user: self.update_value() return self.value def get_fullname(self) -> str: if self.namespace: return "{}.{}".format(self.namespace.get_fullname(), self.name) else: return self.name def __getitem__(self, key: str) -> HashValue: if self.variable_type is HashType: if key in self.get_value(): return self.value[key] else: raise VariableNotFoundError("value '{}' is not found in hash" " variable '{}'".format( key, self.get_fullname())) else: raise VariableError("'{}' variable type is not subscriptable.". format(self.variable_type.name)) def __repr__(self) -> str: return ''.format(self.get_fullname(), self.value or 'INVALIDATED') class NamespaceNode: '''Класс ноды соответствующей пространству имен в дереве переменных.''' def __init__(self, name: str = '', parent: Optional["NamespaceNode"] = None): self._name = name self._variables = dict() self._namespaces = dict() self._parent = parent def add_variable(self, variable: VariableNode) -> None: '''Метод для добавления переменной в пространство имен.''' if variable.name in self._namespaces: raise VariableError("namespace with the name '{}' is already in" " the namespace '{}'".format( variable.name, self.get_fullname())) self._variables.update({variable.name: variable}) variable.namespace = self def add_namespace(self, namespace: "NamespaceNode") -> None: '''Метод для добавления пространства имен в пространство имен.''' if namespace._name in self._variables: raise VariableError("variable with the name '{}' is already in" " the namespace '{}'".format( namespace._name, self.get_fullname())) self._namespaces.update({namespace._name: namespace}) namespace._parent = self def clear(self) -> None: '''Метод для очистки пространства имен. Очищает и пространства имен и переменные. Предназначен только для использования в calculate.ini.''' for namespace_name in self._namespaces.keys(): self._namespaces[namespace_name].clear() self._variables.clear() self._namespaces.clear() def get_fullname(self) -> str: '''Метод для получения полного имени пространства имен.''' if self._parent is not None and self._parent._name != '': return '{}.{}'.format(self._parent.get_fullname(), self._name) else: return self._name def get_package_name(self) -> str: if self._parent._name == '': return self._name else: return self._parent.get_package_name() def __getattr__(self, name: str) -> Any: '''Метод возвращает ноду пространства имен или значение переменной.''' if name in self._namespaces: return self._namespaces[name] elif name in self._variables: variable = self._variables[name] if variable.variable_type is TableType: return variable.get_value() return variable.get_value() else: if self.get_package_name() == "custom": return None raise VariableNotFoundError("'{variable_name}' is not found in the" " namespace '{namespace_name}'".format( variable_name=name, namespace_name=self._name)) def __getitem__(self, name: str) -> None: '''Метод возвращает ноду пространства имен или ноду переменной.''' if name in self._namespaces: return self._namespaces[name] elif name in self._variables: return self._variables[name] else: if self.get_package_name() == "custom": return None raise VariableNotFoundError("'{variable_name}' is not found in the" " namespace '{namespace_name}'".format( variable_name=name, namespace_name=self._name)) def __contains__(self, name: str) -> bool: return name in self._namespaces or name in self._variables def __repr__(self) -> str: return ''.format(self.get_fullname()) def __deepcopy__(self, memo: dict) -> "NamespaceNode": '''Пространство имен не копируется даже при глубоком копировании.''' return self class DependenceAPI(metaclass=Singleton): '''Класс образующий интерфейс для создания зависимостей.''' def __init__(self): self.current_namespace: NamespaceNode = None self.datavars_root = None def __call__(self, *variables: list, depend: Union[Callable, None] = None) -> DependenceSource: subscriptions = list() for variable in variables: if not (isinstance(variable, str) or isinstance(variable, VariableNode) or isinstance(variable, Static)): variable = Static(variable) subscriptions.append(variable) return DependenceSource(subscriptions, depend=depend) def _get_variable(self, variable_name: str, current_namespace: Union[NamespaceNode, None] = None ) -> VariableNode: '''Метод для поиска переменной в пространствах имен.''' if current_namespace is None: current_namespace = self.current_namespace return self.find_variable(variable_name, self.datavars_root, current_namespace=current_namespace) @staticmethod def find_variable(variable_name: str, datavars_root, current_namespace: Union[NamespaceNode, None] = None ) -> VariableNode: '''Метод для поиска переменных по строковому пути от корня переменных или от текущего пространства имен.''' name_parts = variable_name.split('.') if not name_parts[0]: if current_namespace is not None: namespace = current_namespace for index in range(1, len(name_parts)): if not name_parts[index]: namespace = namespace._parent else: name_parts = name_parts[index:] break else: raise VariableNotFoundError( f"variable '{variable_name}' is not found.") else: namespace = datavars_root search_result = namespace for part in name_parts: search_result = search_result[part] return search_result class CopyAPI(metaclass=Singleton): '''Класс для создания зависимостей представляющих собой простое копирование значения переменной в зависимую переменную.''' def __call__(self, variable: Union[str, VariableNode]) -> "Dependence": return Dependence(variable) class FormatAPI(metaclass=Singleton): '''Класс для создания зависимостей представляющих собой форматируемую строку с указанием переменных в тех местах, где их значения должны быть подставлены в строку.''' pattern = re.compile( r'{\s*([a-zA-Z][a-zA-Z_0-9]+)?(.[a-zA-Z][a-zA-Z_0-9]+)+\s*}') def __call__(self, string: str) -> "Dependence": vars_list = [] def subfunc(matchobj: re.Match): vars_list.append(matchobj.group(0)[1:-1].strip()) return '{}' format_string = self.pattern.sub(subfunc, string) def depend_function(*args): values = [arg.value for arg in args] return format_string.format(*values) return Dependence(*vars_list, depend=depend_function) class CalculateAPI(metaclass=Singleton): '''Метод для создания зависимостей, представляющих собой функцию для вычисления значения зависимой переменной на основе значений указанных переменных.''' def __call__(self, *args) -> "Dependence": depend_function = args[0] return Dependence(*args[1:], depend=depend_function) class VariableAPI(metaclass=Singleton): '''Класс для создания переменных при задании их через python-скрипты.''' def __init__(self): self.current_namespace: Union[NamespaceNode, None] = None # TODO Продумать другой способ обработки ошибок. self.errors: list = [] def __call__(self, name: str, source: Any = None, type=VariableType, readonly: bool = False, fixed: bool = False, force: bool = False, fields: Union[list, None] = None) -> "Dependence": '''Метод для создания переменных внутри with Namespace('name').''' if name not in self.current_namespace._variables: variable = VariableNode(name, self.current_namespace) else: variable = self.current_namespace[name] if isinstance(source, DependenceSource): try: source.check() variable.source = source except DependenceError as error: raise VariableError('Dependence error: {} in variable: {}'. format(str(error), variable.get_fullname())) else: variable.source = source if fields: variable.fields = fields if readonly: variable.set_variable_type(type, readonly=True) elif fixed: variable.set_variable_type(type, fixed=True) else: variable.set_variable_type(type) return variable class NamespaceAPI(metaclass=Singleton): '''Класс для создания пространств имен при задании переменных через python-скрипты.''' def __init__(self, var_fabric: VariableAPI, dependence_fabric: DependenceAPI, datavars_root=NamespaceNode('')): self._datavars = datavars_root self.current_namespace = self._datavars # Привязываем фабрику переменных. self._variables_fabric = var_fabric self._variables_fabric.current_namespace = self._datavars # Привязываем фабрику зависимостей. self._dependence_fabric = dependence_fabric self._dependence_fabric.current_namespace = self._datavars self._dependence_fabric.datavars_root = self._datavars @property def datavars(self) -> NamespaceNode: '''Метод для получения корневого пространства имен, через которое далее можно получить доступ к переменным.''' return self._datavars def set_datavars(self, datavars) -> None: '''Метод для установки корневого пространства имен, которое пока что будет использоваться для предоставления доступа к переменным.''' self._datavars = datavars self._variables_fabric.current_namespace = self._datavars def reset(self) -> None: '''Метод для сброса корневого пространства имен.''' if isinstance(self._datavars, NamespaceNode): self._datavars = NamespaceNode('') else: self._datavars.reset() self.current_namespace = self._datavars self._variables_fabric.current_namespace = self._datavars self._dependence_fabric.current_namespace = self._datavars def set_current_namespace(self, namespace: NamespaceNode) -> None: '''Метод для установки текущего пространства имен, в которое будут добавляться далее переменные и пространства имен.''' self.current_namespace = namespace self._variables_fabric.current_namespace = namespace self._dependence_fabric.current_namespace = namespace @contextmanager def __call__(self, namespace_name: str ) -> Generator["NamespaceAPI", None, None]: '''Метод для создания пространств имен с помощью with.''' if namespace_name not in self.current_namespace._namespaces: namespace = NamespaceNode(namespace_name, parent=self.current_namespace) else: namespace = self.current_namespace._namespaces[namespace_name] self.current_namespace.add_namespace(namespace) self.current_namespace = namespace # Устанавливаем текущее пространство имен фабрике переменных. self._variables_fabric.current_namespace = self.current_namespace # Устанавливаем текущее пространство имен фабрике зависимостей. self._dependence_fabric.current_namespace = namespace self._dependence_fabric.datavars_root = self._datavars try: yield self finally: self.current_namespace = self.current_namespace._parent self._variables_fabric.current_namespace = self.current_namespace self._dependence_fabric.current_namespace = self.current_namespace Dependence = DependenceAPI() Copy = CopyAPI() Format = FormatAPI() Calculate = CalculateAPI() Variable = VariableAPI() Namespace = NamespaceAPI(Variable, Dependence)