# vim: fileencoding=utf-8 # import re import ast import dis from contextlib import contextmanager from inspect import signature, getsource from types import FunctionType, LambdaType from calculate.utils.tools import Singleton from typing import List, Any, Union, Generator, Callable class DependenceError(Exception): pass class VariableError(Exception): pass class VariableTypeError(VariableError): pass class VariableNotFoundError(VariableError): pass class CyclicVariableError(VariableError): def __init__(self, *queue: List[str]): self.queue: List[str] = queue def __str__(self) -> str: return "Cyclic dependence in variables: {}".format(", ".join( self.queue[:-1])) class VariableType: '''Базовый класс для типов.''' name: str = 'undefined' python_type: type = None @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> Any: return value @classmethod def readonly(cls, variable_object: "VariableNode") -> None: variable_object.variable_type = cls variable_object.readonly = True class IniType(VariableType): '''Класс, соответствующий типу переменных созданных в calculate.ini файлах. ''' name: str = 'ini' python_type: type = str @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> Any: return value class StringType(VariableType): '''Класс, соответствующий типу переменных с строковым значением.''' name: str = 'string' python_type: type = str @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> str: if isinstance(value, str): return value else: try: return str(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " string variable: {reason}".format( value=value, reason=str(error))) class IntegerType(VariableType): '''Класс, соответствующий типу переменных с целочисленным значением.''' name: str = 'integer' python_type: type = int @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> int: if isinstance(value, int): return value else: try: return int(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " interger variable: {reason}".format( value=value, reason=str(error))) class FloatType(VariableType): '''Класс, соответствующий типу переменных с вещественным значением.''' name: str = 'float' python_type: type = float @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> float: if isinstance(value, float): return value else: try: return float(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " float variable: {reason}".format( value=value, reason=str(error))) class BooleanType(VariableType): '''Класс, соответствующий типу переменных с булевым значением.''' name: str = 'bool' python_type: type = bool true_values: set = {'True', 'true'} false_values: set = {'False', 'false'} @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> bool: if isinstance(value, bool): return value elif isinstance(value, str): if value in cls.true_values: return True if value in cls.false_values: return False try: return bool(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " bool variable: {reason}".format( value=value, reason=str(error))) class ListType(VariableType): name = 'list' python_type: type = list @classmethod def process_value(cls, value: Any, variable: "VariableNode") -> list: if isinstance(value, list): return value elif isinstance(value, str): output_list = list() values = value.split(',') for value in values: output_list.append(value.strip()) return output_list try: return list(value) except Exception as error: raise VariableTypeError("can not set value '{value}' to" " list variable: {reason}".format( value=value, reason=str(error))) class HashValue: '''Класс значения хэша, передающий некоторые характеристики переменной хозяина, что позволяет инвалидировать подписки переменной хозяина при любом изменении хэша или инвалидировать весь хэш при изменении одной из зависимостей.''' def __init__(self, key: Any, value: Any, master_variable: "VariableNode", parent: "Hash"): self.key: Any = key self.value: Any = value self.master_variable: "VariableNode" = master_variable self.parent: "Hash" = parent @property def subscriptions(self) -> set: return self.master_variable.subscriptions @property def subscribers(self) -> set: return self.master_variable.subscribers def get_value(self) -> str: '''Метод для получения значения хэша. Перед возвращением значения обновляет себя на наиболее актуальную версию значения хэша.''' self = self.master_variable.get_value()[self.key] return self.value def set(self, value: Any) -> None: '''Метод для задания одного значения хэша без изменения источника значения.''' current_hash = self.master_variable.get_value().get_hash() current_hash[self.key] = value self.master_variable.set(current_hash) def reset(self): '''Метод для сброса значения, установленного поверх источника.''' self.master_variable.reset() class Hash: '''Класс реализующий контейнер для хранения хэша в переменной соответствующего типа.''' def __init__(self, values: dict, master_variable: "VariableNode"): self.fixed: bool = master_variable.fixed self._values: dict = dict() self._fields: set = set() for key, value in values.items(): self._values.update({key: HashValue(key, value, master_variable, self)}) if self.fixed: self._fields.add(key) self.master_variable: "VariableNode" = master_variable self.row_index: Union[int, None] = None def get_hash(self) -> dict: '''Метод для получения словаря из значений хэша.''' dict_value = {} for key in self._values.keys(): dict_value.update({key: self._values[key].get_value()}) return dict_value def update_hash(self, values: dict) -> None: '''Метод для обновления значения хэша.''' for key, value in values.items(): if key in self._values: self._values[key].value = value elif self.fixed: raise VariableError("key '{}' is unavailable for fixed" " hash, available keys: '{}'.". format(key, ', '.join(self._fields))) else: self._values[key] = HashValue(key, value, self.master_variable, self) return self def __getattr__(self, key: str): '''Метод возвращает ноду пространства имен или значение переменной.''' if key in self._values: return self._values[key].get_value() else: raise VariableError(("'{key}' is not found in the hash" " '{hash_name}'").format( key=key, hash_name=self.master_variable.get_fullname())) def __getitem__(self, key: str) -> HashValue: if key in self._values: return self._values[key] else: raise VariableError(("'{key}' is not found in the hash" " '{hash_name}'").format( key=key, hash_name=self.master_variable.get_fullname())) def __iter__(self) -> Generator[Any, None, None]: for key in self._values.keys(): yield key def __contains__(self, key: str) -> bool: return key in self._values def __str__(self): return str(self.get_hash()) class HashType(VariableType): '''Класс, соответствующий типу переменных хэшей.''' name: str = 'hash' python_type: type = dict @classmethod def process_value(cls, values: Any, variable: "VariableNode") -> Hash: if not isinstance(values, dict): raise VariableTypeError("can not set value with type '{_type}' to" " hash variable: value must be 'dict' type" .format(_type=type(values))) if variable.value is not None: updated_hash = variable.value.update_hash(values) else: updated_hash = Hash(values, variable) return updated_hash @classmethod def fixed(cls, variable_object: "VariableNode") -> None: '''Метод, который передается переменной вместо типа, если нужно задать тип фиксированного хэша.''' variable_object.variable_type = cls variable_object.fixed = True class Table: '''Класс, соответствующий типу переменных таблиц.''' def __init__(self, values: List[dict], master_variable: "VariableNode", fields: Union[List[str], None] = None): self._rows: List[dict] = list() self.master_variable: "VariableNode" = master_variable self.columns: set = set() if fields is not None: self.columns.update(fields) else: self.columns = set(values[0].keys()) for row in values: if isinstance(row, dict): self._check_columns(row) self._rows.append(row) else: raise VariableTypeError("can not create table using value '{}'" " with type '{}'".format(row, type(row))) def get_table(self) -> List[dict]: '''Метод для получения всего списка строк таблицы.''' return self._rows def add_row(self, row: dict) -> None: '''Метод для добавления строк в таблицу.''' self._check_columns(row) self._rows.append(row) def change_row(self, row: dict, index: int) -> None: '''Метод для замены существующей строки.''' self._check_columns(row) self._rows[index] = row def clear(self) -> None: self._rows.clear() def _check_columns(self, row: dict) -> None: '''Метод для проверки наличия в хэше только тех полей, которые соответствуют заданным для таблицы колонкам.''' for column in row: if column not in self.columns: raise VariableError("unknown column value '{}'" " available: '{}'".format( column, ', '.join(self.columns))) def __getitem__(self, index: int) -> Hash: if isinstance(index, int): if index < len(self._rows): return self._rows[index] else: return {key: None for key in self.columns} raise VariableError("'{index}' index value is out of range" .format(index=index)) else: raise VariableError("Table value is not subscriptable") def __iter__(self) -> Generator[dict, None, None]: for row in self._rows: yield row def __contains__(self, key: str) -> bool: if isinstance(key, str): return key in self._values return False def __len__(self) -> int: return len(self._rows) class TableType(VariableType): name: str = 'table' # TODO Сомнительно. python_type: type = list @classmethod def process_value(self, value: List[dict], variable: "VariableNode") -> Table: if not isinstance(value, list) and not isinstance(value, Table): raise VariableTypeError("can not set value with type '{_type}' to" " hash variable: value must be 'dict' type" .format(_type=type(value))) else: if isinstance(value, Table): return value if variable.fields: return Table(value, variable, fields=variable.fields) else: return Table(value, variable) class Static: '''Класс для указания в качестве аргументов зависимостей статичных значений, а не только переменных.''' def __init__(self, value: Any): self.value: Any = value def get_value(self) -> Any: return self.value class VariableWrapper: '''Класс обертки для переменных, с помощью которого отслеживается применение переменной в образовании значения переменной от нее зависящей. ''' def __init__(self, variable: "VariableNode", subscriptions: set): self._variable: "VariableNode" = variable self._subscriptions: set = subscriptions @property def value(self): '''Метод возвращающий значение переменной и при этом добавляющий его в подписки.''' if not isinstance(self._variable, Static): self._subscriptions.add(self._variable) value = self._variable.get_value() if isinstance(value, Hash): value = value.get_hash() elif isinstance(value, Table): value = value.get_table() return value @property def subscriptions(self): return self._subscriptions @subscriptions.setter def subscriptions(self, subscriptions): self._subscriptions = subscriptions class DependenceSource: '''Класс зависимости как источника значения переменной.''' def __init__(self, variables: tuple, depend: Union[Callable, None] = None): self._args: Union[tuple, list] = variables self.depend_function: Union[Callable, None] = depend self._subscriptions: set = set() self._args_founded: bool = False def check(self) -> None: '''Метод для запуска проверки корректности функции зависимости, а также сопоставления ее числу заданных зависимостей.''' if self.depend_function is None: if len(self._args) > 1: raise DependenceError('the depend function is needed if the' ' number of dependencies is more than' ' one') elif len(self._args) == 0: raise DependenceError('dependence is set without variables') else: self._check_function(self.depend_function) def calculate_value(self) -> Any: '''Метод для расчета значения переменной с использованием зависимостей и заданной для них функции.''' self._subscriptions = set() args = tuple(VariableWrapper(arg, self._subscriptions) for arg in self._args) try: if self.depend_function is None: return args[-1].value return self.depend_function(*args) except CyclicVariableError: raise except Exception as error: raise DependenceError('can not calculate using dependencies: {}' ' reason: {}'.format(', '.join( [subscription.get_fullname() for subscription in self._args]), str(error))) def _get_args(self, namespace: "NamespaceNode") -> None: '''Метод для преобразования списка аргументов функции зависимости, содержащего переменные и строки, в список аргументов состоящий только из нод переменных и значений хэшей.''' if not self._args_founded: self._args = self.find_variables(self._args, namespace) self._args_founded = True def find_variables(self, variables: Union[tuple, list], namespace: "NamespaceNode") -> List["VariableNode"]: '''Метод для поиска переменных по заданным путям к ним.''' output = [] for variable in variables: if isinstance(variable, str): variable = Dependence._get_variable( variable, current_namespace=namespace) if variable is None: raise DependenceError("variable '{}' not found". format(variable)) output.append(variable) return output @property def subscriptions(self) -> set: return self._subscriptions def _check_function(self, function_to_check: FunctionType) -> None: '''Метод для проверки того, возращает ли функция какое-либо значение, а также того, совпадает ли число подписок с числом аргументов этой функции.''' # if not isinstance(function_to_check, LambdaType): # # Если функция не лямбда, проверяем есть ли у нее возвращаемое # # значение. # source_code = getsource(function_to_check) # for node in ast.walk(ast.parse(source_code)): # if isinstance(node, ast.Return): # break # else: # raise VariableError("the depend function does not return" # " anything in variable") # Проверяем совпадение количества аргументов функции и заданных для # функции переменных. self.check_signature(function_to_check, self._args) @staticmethod def check_signature(function_to_check: Callable, arguments: Union[tuple, list]) -> None: '''Метод для проверки соответствия сигнатуры функции и заданного для нее набора аргументов.''' function_signature = signature(function_to_check) if not len(arguments) == len(function_signature.parameters): raise DependenceError("the depend function takes {} arguments," " while {} is given".format( len(function_signature.parameters), len(arguments))) def __ne__(self, other: Any) -> bool: if not isinstance(other, DependenceSource): return True return not self == other def __eq__(self, other: Any) -> bool: if not isinstance(other, DependenceSource): return False # Сначала сравниваем аргументы. for l_var, r_var in zip(self._args, other._args): if l_var != r_var: return False if self.depend_function == other.depend_function: return True if (self.depend_function is None or other.depend_function is None or not self._compare_depend_functions(self.depend_function, other.depend_function)): return False return True def _compare_depend_functions(self, l_func: FunctionType, r_func: FunctionType) -> bool: '''Метод для сравнения функций путем сравнения инструкций, полученных в результате их дизассемблирования.''' l_instructions = list(dis.get_instructions(l_func)) r_instructions = list(dis.get_instructions(r_func)) for l_instr, r_instr in zip(l_instructions, r_instructions): if l_instr.opname != r_instr.opname: return False if l_instr.arg != r_instr.arg: return False if ((l_instr.argval != l_instr.argrepr) and (r_instr.argval != r_instr.argrepr)): if r_instr.argval != l_instr.argval: return False return True class VariableNode: '''Класс ноды соответствующей переменной в дереве переменных.''' def __init__(self, name: str, namespace: "NamespaceNode", variable_type: type = VariableType, source: Any = None, fields: list = []): self.name: str = name if issubclass(variable_type, VariableType): self.variable_type: type = variable_type else: raise VariableTypeError('variable_type must be VariableType' ', but not {}'.format(type(variable_type))) self.calculating: bool = False self.fields: list = fields self.namespace: "NamespaceNode" = namespace self.namespace.add_variable(self) self.subscribers: set = set() # Список текущих подписок, для проверки их актуальности при # динамическом связывании. self._subscriptions: set = set() self.value: Any = None self._invalidated: bool = True # Флаг имеющий значение только для переменных типа HashType. # Предназначен для включения проверки соответствия полей хэша при # установке значения. self._fixed: bool = False # Источник значения переменной, может быть значением, а может быть # зависимостью. self._source: Any = source if source is not None: self.update_value() # Флаг, указывающий, что значение было изменено в процессе работы # утилит или с помощью тега set из шаблона. self.set_by_user: bool = False self._readonly: bool = False def update_value(self) -> None: '''Метод для обновления значения переменной с помощью указанного источника ее значения.''' if self.calculating: raise CyclicVariableError(self.name) if self._source is None: raise VariableError("No sources to update variable '{}'". format(self.get_fullname())) if isinstance(self._source, DependenceSource): self._source._get_args(self.namespace) with self._start_calculate(): try: value = self._source.calculate_value() # Обновляем подписки. Сначала убираем лишние. for subscription in self._subscriptions: if subscription not in self._source.subscriptions: subscription.subscribers.remove(self) # Теперь добавляем новые. for subscription in self._source.subscriptions: subscription.subscribers.add(self) self._subscriptions = self._source.subscriptions except CyclicVariableError as error: raise CyclicVariableError(self.name, *error.queue) except DependenceError as error: raise VariableError('{}: {}'.format(self.get_fullname(), str(error))) else: value = self._source if self.variable_type is TableType: self.value = self.variable_type.process_value(value, self) else: self.value = self.variable_type.process_value(value, self) self._invalidated = False def set_variable_type(self, variable_type: VariableType, readonly: Union[bool, None] = None, fixed: Union[bool, None] = None) -> None: '''Метод для установки типа переменной.''' if readonly is not None and isinstance(readonly, bool): self._readonly = readonly elif fixed is not None and isinstance(fixed, bool): self._fixed = fixed if self.variable_type is VariableType: if isinstance(variable_type, type): if issubclass(variable_type, VariableType): self.variable_type = variable_type else: raise VariableError('variable type object must be' ' VariableType or its class method,' ' not {}'.format(type(variable_type))) elif callable(variable_type): variable_type(self) def set(self, value: Any) -> None: '''Метод для установки временного пользовательского значения переменной.''' # Сбрасываем флаги, чтобы провести инвалидацию. self._invalidated = False self.set_by_user = False self.value = self.variable_type.process_value(value, self) self._invalidate(set_by_user=True) def reset(self) -> None: '''Метод для сброса пользовательского значения.''' if self.set_by_user: self._invalidated = False self.set_by_user = False self._invalidate() @property def source(self) -> None: return self._source @source.setter def source(self, source: Any) -> None: # Если источники не совпадают или текущее значение переменной было # установлено пользователем, то инвалидируем переменную и меняем # источник. if self._source != source or self.set_by_user: self.set_by_user = False self._invalidate() self._source = source @property def readonly(self) -> bool: return self._readonly @readonly.setter def readonly(self, value: bool) -> None: # if self.value is None and self._source is not None: # # TODO выводить предупреждение если переменная инвалидирована, # # нет источника и при этом устанавливается флаг readonly. # self.update_value() self._readonly = value @property def fixed(self) -> bool: return self._fixed @fixed.setter def fixed(self, value: bool) -> bool: self._fixed = value def _invalidate(self, set_by_user: bool = False) -> None: '''Метод для инвалидации данной переменной и всех зависящих от нее переменных.''' if not self._invalidated and not self.set_by_user: self._invalidated = True self.set_by_user = set_by_user for subscriber in self.subscribers: subscriber._invalidate() @contextmanager def _start_calculate(self): '''Менеджер контекста устанавливающий флаг, указывающий, что данная переменная в состоянии расчета. В данном случае необходима только для того, чтобы при получении значения параметра внутри метода для расчета не иницировалось ее обновление, которое может привести к рекурсии.''' try: self.calculating = True yield self finally: self.calculating = False def get_value(self) -> Any: '''Метод для получения значения переменной.''' if self._invalidated and self._source is None and self._value is None: return None if self._invalidated and not self.set_by_user: self.update_value() return self.value def get_fullname(self) -> str: if self.namespace: return "{}.{}".format(self.namespace.get_fullname(), self.name) else: return self.name def __getitem__(self, key: str) -> HashValue: if self.variable_type is HashType: if key in self.get_value(): return self.value[key] else: raise VariableNotFoundError("value '{}' is not found in hash" " variable '{}'".format( key, self.get_fullname())) else: raise VariableError("'{}' variable type is not subscriptable.". format(self.variable_type.name)) def __repr__(self) -> str: return ''.format(self.get_fullname(), self.value or 'INVALIDATED') class NamespaceNode: '''Класс ноды соответствующей пространству имен в дереве переменных.''' def __init__(self, name: str = '', parent: Union["NamespaceNode", None] = None): self._name = name self._variables = dict() self._namespaces = dict() self._parent = parent def add_variable(self, variable: VariableNode) -> None: '''Метод для добавления переменной в пространство имен.''' if variable.name in self._namespaces: raise VariableError("namespace with the name '{}' is already in" " the namespace '{}'".format( variable.name, self.get_fullname())) self._variables.update({variable.name: variable}) variable.namespace = self def add_namespace(self, namespace: "NamespaceNode") -> None: '''Метод для добавления пространства имен в пространство имен.''' if namespace._name in self._variables: raise VariableError("variable with the name '{}' is already in" " the namespace '{}'".format( namespace._name, self.get_fullname())) self._namespaces.update({namespace._name: namespace}) namespace._parent = self def clear(self) -> None: '''Метод для очистки пространства имен. Очищает и пространства имен и переменные. Предназначен только для использования в calculate.ini.''' for namespace_name in self._namespaces.keys(): self._namespaces[namespace_name].clear() self._variables.clear() self._namespaces.clear() def get_fullname(self) -> str: '''Метод для получения полного имени пространства имен.''' if self._parent is not None and self._parent._name != '': return '{}.{}'.format(self._parent.get_fullname(), self._name) else: return self._name def get_package_name(self) -> str: if self._parent._name == '': return self._name else: return self._parent.get_package_name() def __getattr__(self, name: str) -> Any: '''Метод возвращает ноду пространства имен или значение переменной.''' if name in self._namespaces: return self._namespaces[name] elif name in self._variables: variable = self._variables[name] if variable.variable_type is TableType: return variable.get_value() return variable.get_value() else: if self.get_package_name() == "custom": return None raise VariableNotFoundError("'{variable_name}' is not found in the" " namespace '{namespace_name}'".format( variable_name=name, namespace_name=self._name)) def __getitem__(self, name: str) -> None: '''Метод возвращает ноду пространства имен или ноду переменной.''' if name in self._namespaces: return self._namespaces[name] elif name in self._variables: return self._variables[name] else: if self.get_package_name() == "custom": return None raise VariableNotFoundError("'{variable_name}' is not found in the" " namespace '{namespace_name}'".format( variable_name=name, namespace_name=self._name)) def __contains__(self, name: str) -> bool: return name in self._namespaces or name in self._variables def __repr__(self) -> str: return ''.format(self.get_fullname()) def __deepcopy__(self, memo: dict) -> "NamespaceNode": '''Пространство имен не копируется даже при глубоком копировании.''' return self class DependenceAPI(metaclass=Singleton): '''Класс образующий интерфейс для создания зависимостей.''' def __init__(self): self.current_namespace: NamespaceNode = None self.datavars_root = None def __call__(self, *variables: list, depend: Union[Callable, None] = None) -> DependenceSource: subscriptions = list() for variable in variables: if not (isinstance(variable, str) or isinstance(variable, VariableNode) or isinstance(variable, Static)): variable = Static(variable) subscriptions.append(variable) return DependenceSource(subscriptions, depend=depend) def _get_variable(self, variable_name: str, current_namespace: Union[NamespaceNode, None] = None ) -> VariableNode: '''Метод для поиска переменной в пространствах имен.''' if current_namespace is None: current_namespace = self.current_namespace return self.find_variable(variable_name, self.datavars_root, current_namespace=current_namespace) @staticmethod def find_variable(variable_name: str, datavars_root, current_namespace: Union[NamespaceNode, None] = None ) -> VariableNode: '''Метод для поиска переменных по строковому пути от корня переменных или от текущего пространства имен.''' name_parts = variable_name.split('.') if not name_parts[0]: if current_namespace is not None: namespace = current_namespace for index in range(1, len(name_parts)): if not name_parts[index]: namespace = namespace._parent else: name_parts = name_parts[index:] break else: raise VariableNotFoundError( f"variable '{variable_name}' is not found.") else: namespace = datavars_root search_result = namespace for part in name_parts: search_result = search_result[part] return search_result class CopyAPI(metaclass=Singleton): '''Класс для создания зависимостей представляющих собой простое копирование значения переменной в зависимую переменную.''' def __call__(self, variable: Union[str, VariableNode]) -> "Dependence": return Dependence(variable) class FormatAPI(metaclass=Singleton): '''Класс для создания зависимостей представляющих собой форматируемую строку с указанием переменных в тех местах, где их значения должны быть подставлены в строку.''' pattern = re.compile( r'{\s*([a-zA-Z][a-zA-Z_0-9]+)?(.[a-zA-Z][a-zA-Z_0-9]+)+\s*}') def __call__(self, string: str) -> "Dependence": vars_list = [] def subfunc(matchobj): vars_list.append(matchobj.group(0)[1:-1].strip()) return '{}' format_string = self.pattern.sub(subfunc, string) def depend_function(*args): values = [arg.value for arg in args] return format_string.format(*values) return Dependence(*vars_list, depend=depend_function) class CalculateAPI(metaclass=Singleton): '''Метод для создания зависимостей, представляющих собой функцию для вычисления значения зависимой переменной на основе значений указанных переменных.''' def __call__(self, *args) -> "Dependence": depend_function = args[0] return Dependence(*args[1:], depend=depend_function) class VariableAPI(metaclass=Singleton): '''Класс для создания переменных при задании их через python-скрипты.''' def __init__(self): self.current_namespace: Union[NamespaceNode, None] = None # TODO Продумать другой способ обработки ошибок. self.errors: list = [] def __call__(self, name: str, source: Any = None, type=VariableType, readonly: bool = False, fixed: bool = False, force: bool = False, fields: Union[list, None] = None) -> "Dependence": '''Метод для создания переменных внутри with Namespace('name').''' if name not in self.current_namespace._variables: variable = VariableNode(name, self.current_namespace) else: variable = self.current_namespace[name] if isinstance(source, DependenceSource): try: source.check() variable.source = source except DependenceError as error: raise VariableError('Dependence error: {} in variable: {}'. format(str(error), variable.get_fullname())) else: variable.source = source if fields: variable.fields = fields if readonly: variable.set_variable_type(type, readonly=True) elif fixed: variable.set_variable_type(type, fixed=True) else: variable.set_variable_type(type) return variable class NamespaceAPI(metaclass=Singleton): '''Класс для создания пространств имен при задании переменных через python-скрипты.''' def __init__(self, var_fabric: VariableAPI, dependence_fabric: DependenceAPI(), datavars_root=NamespaceNode('')): self._datavars = datavars_root self.current_namespace = self._datavars # Привязываем фабрику переменных. self._variables_fabric = var_fabric self._variables_fabric.current_namespace = self._datavars # Привязываем фабрику зависимостей. self._dependence_fabric = dependence_fabric self._dependence_fabric.current_namespace = self._datavars self._dependence_fabric.datavars_root = self._datavars @property def datavars(self): '''Метод для получения корневого пространства имен, через которое далее можно получить доступ к переменным.''' return self._datavars def set_datavars(self, datavars) -> None: '''Метод для установки корневого пространства имен, которое пока что будет использоваться для предоставления доступа к переменным.''' self._datavars = datavars self._variables_fabric.current_namespace = self._datavars def reset(self) -> None: '''Метод для сброса корневого пространства имен.''' if isinstance(self._datavars, NamespaceNode): self._datavars = NamespaceNode('') else: self._datavars.reset() self.current_namespace = self._datavars self._variables_fabric.current_namespace = self._datavars self._dependence_fabric.current_namespace = self._datavars def set_current_namespace(self, namespace: NamespaceNode) -> None: '''Метод для установки текущего пространства имен, в которое будут добавляться далее переменные и пространства имен.''' self.current_namespace = namespace self._variables_fabric.current_namespace = namespace self._dependence_fabric.current_namespace = namespace @contextmanager def __call__(self, namespace_name: str): '''Метод для создания пространств имен с помощью with.''' if namespace_name not in self.current_namespace._namespaces: namespace = NamespaceNode(namespace_name, parent=self.current_namespace) else: namespace = self.current_namespace._namespaces[namespace_name] self.current_namespace.add_namespace(namespace) self.current_namespace = namespace # Устанавливаем текущее пространство имен фабрике переменных. self._variables_fabric.current_namespace = self.current_namespace # Устанавливаем текущее пространство имен фабрике зависимостей. self._dependence_fabric.current_namespace = namespace self._dependence_fabric.datavars_root = self._datavars try: yield self finally: self.current_namespace = self.current_namespace._parent self._variables_fabric.current_namespace = self.current_namespace self._dependence_fabric.current_namespace = self.current_namespace Dependence = DependenceAPI() Copy = CopyAPI() Format = FormatAPI() Calculate = CalculateAPI() Variable = VariableAPI() Namespace = NamespaceAPI(Variable, Dependence)