You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1097 lines
47 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
import re
# import ast
import dis
from contextlib import contextmanager
# from inspect import signature, getsource
from inspect import signature
# from types import FunctionType, LambdaType
from types import FunctionType
from calculate.utils.tools import Singleton
from typing import List, Any, Union, Generator, Callable, Optional
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue: List[str]):
self.queue: List[str] = queue
def __str__(self) -> str:
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
name: str = 'undefined'
python_type: type = None
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> Any:
return value
@classmethod
def readonly(cls, variable_object: "VariableNode") -> None:
variable_object.variable_type = cls
variable_object.readonly = True
class IniType(VariableType):
'''Класс, соответствующий типу переменных созданных в calculate.ini файлах.
'''
name: str = 'ini'
python_type: type = str
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> Any:
return value
class StringType(VariableType):
'''Класс, соответствующий типу переменных с строковым значением.'''
name: str = 'string'
python_type: type = str
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> str:
if isinstance(value, str):
return value
else:
try:
return str(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" string variable: {reason}".format(
value=value,
reason=str(error)))
class IntegerType(VariableType):
'''Класс, соответствующий типу переменных с целочисленным значением.'''
name: str = 'integer'
python_type: type = int
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> int:
if isinstance(value, int):
return value
else:
try:
return int(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" interger variable: {reason}".format(
value=value,
reason=str(error)))
class FloatType(VariableType):
'''Класс, соответствующий типу переменных с вещественным значением.'''
name: str = 'float'
python_type: type = float
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> float:
if isinstance(value, float):
return value
else:
try:
return float(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" float variable: {reason}".format(
value=value,
reason=str(error)))
class BooleanType(VariableType):
'''Класс, соответствующий типу переменных с булевым значением.'''
name: str = 'bool'
python_type: type = bool
true_values: set = {'True', 'true'}
false_values: set = {'False', 'false'}
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> bool:
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value in cls.true_values:
return True
if value in cls.false_values:
return False
try:
return bool(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" bool variable: {reason}".format(
value=value,
reason=str(error)))
class ListType(VariableType):
name = 'list'
python_type: type = list
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> list:
if isinstance(value, list):
return value
elif isinstance(value, str):
output_list = list()
values = value.split(',')
for value in values:
output_list.append(value.strip())
return output_list
try:
return list(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" list variable: {reason}".format(
value=value,
reason=str(error)))
class HashValue:
'''Класс значения хэша, передающий некоторые характеристики переменной
хозяина, что позволяет инвалидировать подписки переменной хозяина при любом
изменении хэша или инвалидировать весь хэш при изменении одной из
зависимостей.'''
def __init__(self, key: Any, value: Any,
master_variable: "VariableNode",
parent: "Hash"):
self.key: Any = key
self.value: Any = value
self.master_variable: "VariableNode" = master_variable
self.parent: "Hash" = parent
@property
def subscriptions(self) -> set:
return self.master_variable.subscriptions
@property
def subscribers(self) -> set:
return self.master_variable.subscribers
def get_value(self) -> str:
'''Метод для получения значения хэша. Перед возвращением значения
обновляет себя на наиболее актуальную версию значения хэша.'''
self = self.master_variable.get_value()[self.key]
return self.value
def set(self, value: Any) -> None:
'''Метод для задания одного значения хэша без изменения источника
значения.'''
current_hash = self.master_variable.get_value().get_hash()
current_hash[self.key] = value
self.master_variable.set(current_hash)
def reset(self):
'''Метод для сброса значения, установленного поверх источника.'''
self.master_variable.reset()
class Hash:
'''Класс реализующий контейнер для хранения хэша в переменной
соответствующего типа.'''
def __init__(self, values: dict,
master_variable: "VariableNode"):
self.fixed: bool = master_variable.fixed
self._values: dict = dict()
self._fields: set = set()
for key, value in values.items():
self._values.update({key: HashValue(key, value, master_variable,
self)})
if self.fixed:
self._fields.add(key)
self.master_variable: "VariableNode" = master_variable
self.row_index: Union[int, None] = None
def get_hash(self) -> dict:
'''Метод для получения словаря из значений хэша.'''
dict_value = {}
for key in self._values.keys():
dict_value.update({key: self._values[key].get_value()})
return dict_value
def update_hash(self, values: dict) -> None:
'''Метод для обновления значения хэша.'''
for key, value in values.items():
if key in self._values:
self._values[key].value = value
elif self.fixed:
raise VariableError("key '{}' is unavailable for fixed"
" hash, available keys: '{}'.".
format(key, ', '.join(self._fields)))
else:
self._values[key] = HashValue(key, value, self.master_variable,
self)
return self
def __getattr__(self, key: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if key in self._values:
return self._values[key].get_value()
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __getitem__(self, key: str) -> HashValue:
if key in self._values:
return self._values[key]
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __iter__(self) -> Generator[Any, None, None]:
for key in self._values.keys():
yield key
def __contains__(self, key: str) -> bool:
return key in self._values
def __str__(self):
return str(self.get_hash())
class HashType(VariableType):
'''Класс, соответствующий типу переменных хэшей.'''
name: str = 'hash'
python_type: type = dict
@classmethod
def process_value(cls, values: Any, variable: "VariableNode") -> Hash:
if not isinstance(values, dict):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(values)))
if variable.value is not None:
updated_hash = variable.value.update_hash(values)
else:
updated_hash = Hash(values, variable)
return updated_hash
@classmethod
def fixed(cls, variable_object: "VariableNode") -> None:
'''Метод, который передается переменной вместо типа, если нужно задать
тип фиксированного хэша.'''
variable_object.variable_type = cls
variable_object.fixed = True
class Table:
'''Класс, соответствующий типу переменных таблиц.'''
def __init__(self, values: List[dict], master_variable: "VariableNode",
fields: Union[List[str], None] = None):
self._rows: List[dict] = list()
self.master_variable: "VariableNode" = master_variable
self.columns: set = set()
if fields is not None:
self.columns.update(fields)
else:
self.columns = set(values[0].keys())
for row in values:
if isinstance(row, dict):
self._check_columns(row)
self._rows.append(row)
else:
raise VariableTypeError("can not create table using value '{}'"
" with type '{}'".format(row,
type(row)))
def get_table(self) -> List[dict]:
'''Метод для получения всего списка строк таблицы.'''
return self._rows
def add_row(self, row: dict) -> None:
'''Метод для добавления строк в таблицу.'''
self._check_columns(row)
self._rows.append(row)
def change_row(self, row: dict, index: int) -> None:
'''Метод для замены существующей строки.'''
self._check_columns(row)
self._rows[index] = row
def clear(self) -> None:
self._rows.clear()
def _check_columns(self, row: dict) -> None:
'''Метод для проверки наличия в хэше только тех полей, которые
соответствуют заданным для таблицы колонкам.'''
for column in row:
if column not in self.columns:
raise VariableError("unknown column value '{}'"
" available: '{}'".format(
column,
', '.join(self.columns)))
def __getitem__(self, index: int) -> Hash:
if isinstance(index, int):
if index < len(self._rows):
return self._rows[index]
else:
return {key: None for key in self.columns}
raise VariableError("'{index}' index value is out of range"
.format(index=index))
else:
raise VariableError("Table value is not subscriptable")
def __iter__(self) -> Generator[dict, None, None]:
for row in self._rows:
yield row
def __contains__(self, key: str) -> bool:
if isinstance(key, str):
return key in self._values
return False
def __len__(self) -> int:
return len(self._rows)
class TableType(VariableType):
name: str = 'table'
# TODO Сомнительно.
python_type: type = list
@classmethod
def process_value(self, value: List[dict],
variable: "VariableNode") -> Table:
if not isinstance(value, list) and not isinstance(value, Table):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(value)))
else:
if isinstance(value, Table):
return value
if variable.fields:
return Table(value, variable, fields=variable.fields)
else:
return Table(value, variable)
class Static:
'''Класс для указания в качестве аргументов зависимостей статичных
значений, а не только переменных.'''
def __init__(self, value: Any):
self.value: Any = value
def get_value(self) -> Any:
return self.value
class VariableWrapper:
'''Класс обертки для переменных, с помощью которого отслеживается
применение переменной в образовании значения переменной от нее зависящей.
'''
def __init__(self, variable: "VariableNode",
subscriptions: set):
self._variable: "VariableNode" = variable
self._subscriptions: set = subscriptions
@property
def value(self):
'''Метод возвращающий значение переменной и при этом добавляющий его в
подписки.'''
if not isinstance(self._variable, Static):
self._subscriptions.add(self._variable)
value = self._variable.get_value()
if isinstance(value, Hash):
value = value.get_hash()
elif isinstance(value, Table):
value = value.get_table()
return value
@property
def subscriptions(self):
return self._subscriptions
@subscriptions.setter
def subscriptions(self, subscriptions):
self._subscriptions = subscriptions
class DependenceSource:
'''Класс зависимости как источника значения переменной.'''
def __init__(self, variables: tuple,
depend: Optional[Callable] = None):
self._args: Union[tuple, list] = variables
self.depend_function: Union[Callable, None] = depend
self._subscriptions: set = set()
self._args_founded: bool = False
def check(self) -> None:
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._args) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._args) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self) -> Any:
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
self._subscriptions = set()
args = tuple(VariableWrapper(arg, self._subscriptions)
for arg in self._args)
try:
if self.depend_function is None:
return args[-1].value
return self.depend_function(*args)
except CyclicVariableError:
raise
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
def _get_args(self, namespace: "NamespaceNode") -> None:
'''Метод для преобразования списка аргументов функции зависимости,
содержащего переменные и строки, в список аргументов состоящий только
из нод переменных и значений хэшей.'''
if not self._args_founded:
self._args = self.find_variables(self._args, namespace)
self._args_founded = True
def find_variables(self, variables: Union[tuple, list],
namespace: "NamespaceNode") -> List["VariableNode"]:
'''Метод для поиска переменных по заданным путям к ним.'''
output = []
for variable in variables:
if isinstance(variable, str):
variable = Dependence._get_variable(
variable,
current_namespace=namespace)
if variable is None:
raise DependenceError("variable '{}' not found".
format(variable))
output.append(variable)
return output
@property
def subscriptions(self) -> set:
return self._subscriptions
def _check_function(self, function_to_check: FunctionType) -> None:
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
# if not isinstance(function_to_check, LambdaType):
# # Если функция не лямбда, проверяем есть ли у нее возвращаемое
# # значение.
# source_code = getsource(function_to_check)
# for node in ast.walk(ast.parse(source_code)):
# if isinstance(node, ast.Return):
# break
# else:
# raise VariableError("the depend function does not return"
# " anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
self.check_signature(function_to_check, self._args)
@staticmethod
def check_signature(function_to_check: Callable,
arguments: Union[tuple, list]) -> None:
'''Метод для проверки соответствия сигнатуры функции и заданного для
нее набора аргументов.'''
function_signature = signature(function_to_check)
if not len(arguments) == len(function_signature.parameters):
raise DependenceError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(arguments)))
def __ne__(self, other: Any) -> bool:
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other: Any) -> bool:
if not isinstance(other, DependenceSource):
return False
# Сначала сравниваем аргументы.
for l_var, r_var in zip(self._args, other._args):
if l_var != r_var:
return False
if self.depend_function == other.depend_function:
return True
if (self.depend_function is None or other.depend_function is None
or not self._compare_depend_functions(self.depend_function,
other.depend_function)):
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name: str, namespace: "NamespaceNode",
variable_type: type = VariableType,
source: Any = None, fields: list = []):
self.name: str = name
if issubclass(variable_type, VariableType):
self.variable_type: type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating: bool = False
self.fields: list = fields
self.namespace: "NamespaceNode" = namespace
self.namespace.add_variable(self)
self.subscribers: set = set()
# Список текущих подписок, для проверки их актуальности при
# динамическом связывании.
self._subscriptions: set = set()
self.value: Any = None
self._invalidated: bool = True
# Флаг имеющий значение только для переменных типа HashType.
# Предназначен для включения проверки соответствия полей хэша при
# установке значения.
self._fixed: bool = False
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source: Any = source
if source is not None:
self.update_value()
# Флаг, указывающий, что значение было изменено в процессе работы
# утилит или с помощью тега set из шаблона.
self.set_by_user: bool = False
self._readonly: bool = False
def update_value(self) -> None:
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.get_fullname()))
if isinstance(self._source, DependenceSource):
self._source._get_args(self.namespace)
with self._start_calculate():
try:
value = self._source.calculate_value()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in self._source.subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in self._source.subscriptions:
subscription.subscribers.add(self)
self._subscriptions = self._source.subscriptions
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
except DependenceError as error:
raise VariableError('{}: {}'.format(self.get_fullname(),
str(error)))
else:
value = self._source
if self.variable_type is TableType:
self.value = self.variable_type.process_value(value, self)
else:
self.value = self.variable_type.process_value(value, self)
self._invalidated = False
def set_variable_type(self, variable_type: VariableType,
readonly: Union[bool, None] = None,
fixed: Union[bool, None] = None) -> None:
'''Метод для установки типа переменной.'''
if readonly is not None and isinstance(readonly, bool):
self._readonly = readonly
elif fixed is not None and isinstance(fixed, bool):
self._fixed = fixed
if self.variable_type is VariableType:
if isinstance(variable_type, type):
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableError('variable type object must be'
' VariableType or its class method,'
' not {}'.format(type(variable_type)))
elif callable(variable_type):
variable_type(self)
def set(self, value: Any) -> None:
'''Метод для установки временного пользовательского значения
переменной.'''
# Сбрасываем флаги, чтобы провести инвалидацию.
self._invalidated = False
self.set_by_user = False
self.value = self.variable_type.process_value(value, self)
self._invalidate(set_by_user=True)
def reset(self) -> None:
'''Метод для сброса пользовательского значения.'''
if self.set_by_user:
self._invalidated = False
self.set_by_user = False
self._invalidate()
@property
def source(self) -> None:
return self._source
@source.setter
def source(self, source: Any) -> None:
# Если источники не совпадают или текущее значение переменной было
# установлено пользователем, то инвалидируем переменную и меняем
# источник.
if self._source != source or self.set_by_user:
self.set_by_user = False
self._invalidate()
self._source = source
@property
def readonly(self) -> bool:
return self._readonly
@readonly.setter
def readonly(self, value: bool) -> None:
# if self.value is None and self._source is not None:
# # TODO выводить предупреждение если переменная инвалидирована,
# # нет источника и при этом устанавливается флаг readonly.
# self.update_value()
self._readonly = value
@property
def fixed(self) -> bool:
return self._fixed
@fixed.setter
def fixed(self, value: bool) -> bool:
self._fixed = value
def _invalidate(self, set_by_user: bool = False) -> None:
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
if not self._invalidated and not self.set_by_user:
self._invalidated = True
self.set_by_user = set_by_user
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self) -> Generator["VariableNode", None, None]:
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета. В данном случае необходима только для
того, чтобы при получении значения параметра внутри метода для расчета
не иницировалось ее обновление, которое может привести к рекурсии.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self) -> Any:
'''Метод для получения значения переменной.'''
if self._invalidated and self._source is None and self._value is None:
return None
if self._invalidated and not self.set_by_user:
self.update_value()
return self.value
def get_fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.get_fullname(), self.name)
else:
return self.name
def __getitem__(self, key: str) -> HashValue:
if self.variable_type is HashType:
if key in self.get_value():
return self.value[key]
else:
raise VariableNotFoundError("value '{}' is not found in hash"
" variable '{}'".format(
key, self.get_fullname()))
else:
raise VariableError("'{}' variable type is not subscriptable.".
format(self.variable_type.name))
def __repr__(self) -> str:
return '<Variable: {} with value: {}>'.format(self.get_fullname(),
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name: str = '',
parent: Optional["NamespaceNode"] = None):
self._name = name
self._variables = dict()
self._namespaces = dict()
self._parent = parent
def add_variable(self, variable: VariableNode) -> None:
'''Метод для добавления переменной в пространство имен.'''
if variable.name in self._namespaces:
raise VariableError("namespace with the name '{}' is already in"
" the namespace '{}'".format(
variable.name,
self.get_fullname()))
self._variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace: "NamespaceNode") -> None:
'''Метод для добавления пространства имен в пространство имен.'''
if namespace._name in self._variables:
raise VariableError("variable with the name '{}' is already in"
" the namespace '{}'".format(
namespace._name,
self.get_fullname()))
self._namespaces.update({namespace._name: namespace})
namespace._parent = self
def clear(self) -> None:
'''Метод для очистки пространства имен. Очищает и пространства имен
и переменные. Предназначен только для использования в calculate.ini.'''
for namespace_name in self._namespaces.keys():
self._namespaces[namespace_name].clear()
self._variables.clear()
self._namespaces.clear()
def get_fullname(self) -> str:
'''Метод для получения полного имени пространства имен.'''
if self._parent is not None and self._parent._name != '<root>':
return '{}.{}'.format(self._parent.get_fullname(), self._name)
else:
return self._name
def get_package_name(self) -> str:
if self._parent._name == '<root>':
return self._name
else:
return self._parent.get_package_name()
def __getattr__(self, name: str) -> Any:
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self._namespaces:
return self._namespaces[name]
elif name in self._variables:
variable = self._variables[name]
if variable.variable_type is TableType:
return variable.get_value()
return variable.get_value()
else:
if self.get_package_name() == "custom":
return None
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self._name))
def __getitem__(self, name: str) -> None:
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self._namespaces:
return self._namespaces[name]
elif name in self._variables:
return self._variables[name]
else:
if self.get_package_name() == "custom":
return None
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self._name))
def __contains__(self, name: str) -> bool:
return name in self._namespaces or name in self._variables
def __repr__(self) -> str:
return '<Namespace: {}>'.format(self.get_fullname())
def __deepcopy__(self, memo: dict) -> "NamespaceNode":
'''Пространство имен не копируется даже при глубоком копировании.'''
return self
class DependenceAPI(metaclass=Singleton):
'''Класс образующий интерфейс для создания зависимостей.'''
def __init__(self):
self.current_namespace: NamespaceNode = None
self.datavars_root = None
def __call__(self, *variables: list,
depend: Union[Callable, None] = None) -> DependenceSource:
subscriptions = list()
for variable in variables:
if not (isinstance(variable, str) or
isinstance(variable, VariableNode) or
isinstance(variable, Static)):
variable = Static(variable)
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _get_variable(self, variable_name: str,
current_namespace: Union[NamespaceNode, None] = None
) -> VariableNode:
'''Метод для поиска переменной в пространствах имен.'''
if current_namespace is None:
current_namespace = self.current_namespace
return self.find_variable(variable_name, self.datavars_root,
current_namespace=current_namespace)
@staticmethod
def find_variable(variable_name: str,
datavars_root,
current_namespace: Union[NamespaceNode, None] = None
) -> VariableNode:
'''Метод для поиска переменных по строковому пути от корня переменных
или от текущего пространства имен.'''
name_parts = variable_name.split('.')
if not name_parts[0]:
if current_namespace is not None:
namespace = current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace._parent
else:
name_parts = name_parts[index:]
break
else:
raise VariableNotFoundError(
f"variable '{variable_name}' is not found.")
else:
namespace = datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class CopyAPI(metaclass=Singleton):
'''Класс для создания зависимостей представляющих собой простое копирование
значения переменной в зависимую переменную.'''
def __call__(self, variable: Union[str, VariableNode]) -> "Dependence":
return Dependence(variable)
class FormatAPI(metaclass=Singleton):
'''Класс для создания зависимостей представляющих собой форматируемую
строку с указанием переменных в тех местах, где их значения должны быть
подставлены в строку.'''
pattern = re.compile(
r'{\s*([a-zA-Z][a-zA-Z_0-9]+)?(.[a-zA-Z][a-zA-Z_0-9]+)+\s*}')
def __call__(self, string: str) -> "Dependence":
vars_list = []
def subfunc(matchobj: re.Match):
vars_list.append(matchobj.group(0)[1:-1].strip())
return '{}'
format_string = self.pattern.sub(subfunc, string)
def depend_function(*args):
values = [arg.value for arg in args]
return format_string.format(*values)
return Dependence(*vars_list, depend=depend_function)
class CalculateAPI(metaclass=Singleton):
'''Метод для создания зависимостей, представляющих собой функцию для
вычисления значения зависимой переменной на основе значений указанных
переменных.'''
def __call__(self, *args) -> "Dependence":
depend_function = args[0]
return Dependence(*args[1:], depend=depend_function)
class VariableAPI(metaclass=Singleton):
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace: Union[NamespaceNode, None] = None
# TODO Продумать другой способ обработки ошибок.
self.errors: list = []
def __call__(self, name: str,
source: Any = None,
type=VariableType,
readonly: bool = False,
fixed: bool = False,
force: bool = False,
fields: Union[list, None] = None) -> "Dependence":
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace._variables:
variable = VariableNode(name, self.current_namespace)
else:
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error),
variable.get_fullname()))
else:
variable.source = source
if fields:
variable.fields = fields
if readonly:
variable.set_variable_type(type, readonly=True)
elif fixed:
variable.set_variable_type(type, fixed=True)
else:
variable.set_variable_type(type)
return variable
class NamespaceAPI(metaclass=Singleton):
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI,
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.current_namespace = self._datavars
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self) -> NamespaceNode:
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_datavars(self, datavars) -> None:
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars
self._variables_fabric.current_namespace = self._datavars
def reset(self) -> None:
'''Метод для сброса корневого пространства имен.'''
if isinstance(self._datavars, NamespaceNode):
self._datavars = NamespaceNode('<root>')
else:
self._datavars.reset()
self.current_namespace = self._datavars
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
def set_current_namespace(self, namespace: NamespaceNode) -> None:
'''Метод для установки текущего пространства имен, в которое будут
добавляться далее переменные и пространства имен.'''
self.current_namespace = namespace
self._variables_fabric.current_namespace = namespace
self._dependence_fabric.current_namespace = namespace
@contextmanager
def __call__(self, namespace_name: str
) -> Generator["NamespaceAPI", None, None]:
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.current_namespace._namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.current_namespace)
else:
namespace = self.current_namespace._namespaces[namespace_name]
self.current_namespace.add_namespace(namespace)
self.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = self.current_namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
try:
yield self
finally:
self.current_namespace = self.current_namespace._parent
self._variables_fabric.current_namespace = self.current_namespace
self._dependence_fabric.current_namespace = self.current_namespace
Dependence = DependenceAPI()
Copy = CopyAPI()
Format = FormatAPI()
Calculate = CalculateAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)