選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

1097 行
47 KiB

このファイルには曖昧(ambiguous)なUnicode文字が含まれています!

このファイルには曖昧(ambiguous)なUnicode文字が含まれており、あなたが使用しているロケールにおいて他の文字と混同する可能性があります。 あなたのユースケースが意図的かつ正当な場合はこの警告を無視して構いません。 それらの文字をハイライトするにはエスケープボタンを使用します。

# vim: fileencoding=utf-8
#
import re
# import ast
import dis
from contextlib import contextmanager
# from inspect import signature, getsource
from inspect import signature
# from types import FunctionType, LambdaType
from types import FunctionType
from calculate.utils.tools import Singleton
from typing import List, Any, Union, Generator, Callable, Optional
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue: List[str]):
self.queue: List[str] = queue
def __str__(self) -> str:
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
name: str = 'undefined'
python_type: type = None
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> Any:
return value
@classmethod
def readonly(cls, variable_object: "VariableNode") -> None:
variable_object.variable_type = cls
variable_object.readonly = True
class IniType(VariableType):
'''Класс, соответствующий типу переменных созданных в calculate.ini файлах.
'''
name: str = 'ini'
python_type: type = str
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> Any:
return value
class StringType(VariableType):
'''Класс, соответствующий типу переменных с строковым значением.'''
name: str = 'string'
python_type: type = str
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> str:
if isinstance(value, str):
return value
else:
try:
return str(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" string variable: {reason}".format(
value=value,
reason=str(error)))
class IntegerType(VariableType):
'''Класс, соответствующий типу переменных с целочисленным значением.'''
name: str = 'integer'
python_type: type = int
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> int:
if isinstance(value, int):
return value
else:
try:
return int(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" interger variable: {reason}".format(
value=value,
reason=str(error)))
class FloatType(VariableType):
'''Класс, соответствующий типу переменных с вещественным значением.'''
name: str = 'float'
python_type: type = float
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> float:
if isinstance(value, float):
return value
else:
try:
return float(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" float variable: {reason}".format(
value=value,
reason=str(error)))
class BooleanType(VariableType):
'''Класс, соответствующий типу переменных с булевым значением.'''
name: str = 'bool'
python_type: type = bool
true_values: set = {'True', 'true'}
false_values: set = {'False', 'false'}
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> bool:
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value in cls.true_values:
return True
if value in cls.false_values:
return False
try:
return bool(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" bool variable: {reason}".format(
value=value,
reason=str(error)))
class ListType(VariableType):
name = 'list'
python_type: type = list
@classmethod
def process_value(cls, value: Any, variable: "VariableNode") -> list:
if isinstance(value, list):
return value
elif isinstance(value, str):
output_list = list()
values = value.split(',')
for value in values:
output_list.append(value.strip())
return output_list
try:
return list(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" list variable: {reason}".format(
value=value,
reason=str(error)))
class HashValue:
'''Класс значения хэша, передающий некоторые характеристики переменной
хозяина, что позволяет инвалидировать подписки переменной хозяина при любом
изменении хэша или инвалидировать весь хэш при изменении одной из
зависимостей.'''
def __init__(self, key: Any, value: Any,
master_variable: "VariableNode",
parent: "Hash"):
self.key: Any = key
self.value: Any = value
self.master_variable: "VariableNode" = master_variable
self.parent: "Hash" = parent
@property
def subscriptions(self) -> set:
return self.master_variable.subscriptions
@property
def subscribers(self) -> set:
return self.master_variable.subscribers
def get_value(self) -> str:
'''Метод для получения значения хэша. Перед возвращением значения
обновляет себя на наиболее актуальную версию значения хэша.'''
self = self.master_variable.get_value()[self.key]
return self.value
def set(self, value: Any) -> None:
'''Метод для задания одного значения хэша без изменения источника
значения.'''
current_hash = self.master_variable.get_value().get_hash()
current_hash[self.key] = value
self.master_variable.set(current_hash)
def reset(self):
'''Метод для сброса значения, установленного поверх источника.'''
self.master_variable.reset()
class Hash:
'''Класс реализующий контейнер для хранения хэша в переменной
соответствующего типа.'''
def __init__(self, values: dict,
master_variable: "VariableNode"):
self.fixed: bool = master_variable.fixed
self._values: dict = dict()
self._fields: set = set()
for key, value in values.items():
self._values.update({key: HashValue(key, value, master_variable,
self)})
if self.fixed:
self._fields.add(key)
self.master_variable: "VariableNode" = master_variable
self.row_index: Union[int, None] = None
def get_hash(self) -> dict:
'''Метод для получения словаря из значений хэша.'''
dict_value = {}
for key in self._values.keys():
dict_value.update({key: self._values[key].get_value()})
return dict_value
def update_hash(self, values: dict) -> None:
'''Метод для обновления значения хэша.'''
for key, value in values.items():
if key in self._values:
self._values[key].value = value
elif self.fixed:
raise VariableError("key '{}' is unavailable for fixed"
" hash, available keys: '{}'.".
format(key, ', '.join(self._fields)))
else:
self._values[key] = HashValue(key, value, self.master_variable,
self)
return self
def __getattr__(self, key: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if key in self._values:
return self._values[key].get_value()
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __getitem__(self, key: str) -> HashValue:
if key in self._values:
return self._values[key]
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __iter__(self) -> Generator[Any, None, None]:
for key in self._values.keys():
yield key
def __contains__(self, key: str) -> bool:
return key in self._values
def __str__(self):
return str(self.get_hash())
class HashType(VariableType):
'''Класс, соответствующий типу переменных хэшей.'''
name: str = 'hash'
python_type: type = dict
@classmethod
def process_value(cls, values: Any, variable: "VariableNode") -> Hash:
if not isinstance(values, dict):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(values)))
if variable.value is not None:
updated_hash = variable.value.update_hash(values)
else:
updated_hash = Hash(values, variable)
return updated_hash
@classmethod
def fixed(cls, variable_object: "VariableNode") -> None:
'''Метод, который передается переменной вместо типа, если нужно задать
тип фиксированного хэша.'''
variable_object.variable_type = cls
variable_object.fixed = True
class Table:
'''Класс, соответствующий типу переменных таблиц.'''
def __init__(self, values: List[dict], master_variable: "VariableNode",
fields: Union[List[str], None] = None):
self._rows: List[dict] = list()
self.master_variable: "VariableNode" = master_variable
self.columns: set = set()
if fields is not None:
self.columns.update(fields)
else:
self.columns = set(values[0].keys())
for row in values:
if isinstance(row, dict):
self._check_columns(row)
self._rows.append(row)
else:
raise VariableTypeError("can not create table using value '{}'"
" with type '{}'".format(row,
type(row)))
def get_table(self) -> List[dict]:
'''Метод для получения всего списка строк таблицы.'''
return self._rows
def add_row(self, row: dict) -> None:
'''Метод для добавления строк в таблицу.'''
self._check_columns(row)
self._rows.append(row)
def change_row(self, row: dict, index: int) -> None:
'''Метод для замены существующей строки.'''
self._check_columns(row)
self._rows[index] = row
def clear(self) -> None:
self._rows.clear()
def _check_columns(self, row: dict) -> None:
'''Метод для проверки наличия в хэше только тех полей, которые
соответствуют заданным для таблицы колонкам.'''
for column in row:
if column not in self.columns:
raise VariableError("unknown column value '{}'"
" available: '{}'".format(
column,
', '.join(self.columns)))
def __getitem__(self, index: int) -> Hash:
if isinstance(index, int):
if index < len(self._rows):
return self._rows[index]
else:
return {key: None for key in self.columns}
raise VariableError("'{index}' index value is out of range"
.format(index=index))
else:
raise VariableError("Table value is not subscriptable")
def __iter__(self) -> Generator[dict, None, None]:
for row in self._rows:
yield row
def __contains__(self, key: str) -> bool:
if isinstance(key, str):
return key in self._values
return False
def __len__(self) -> int:
return len(self._rows)
class TableType(VariableType):
name: str = 'table'
# TODO Сомнительно.
python_type: type = list
@classmethod
def process_value(self, value: List[dict],
variable: "VariableNode") -> Table:
if not isinstance(value, list) and not isinstance(value, Table):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(value)))
else:
if isinstance(value, Table):
return value
if variable.fields:
return Table(value, variable, fields=variable.fields)
else:
return Table(value, variable)
class Static:
'''Класс для указания в качестве аргументов зависимостей статичных
значений, а не только переменных.'''
def __init__(self, value: Any):
self.value: Any = value
def get_value(self) -> Any:
return self.value
class VariableWrapper:
'''Класс обертки для переменных, с помощью которого отслеживается
применение переменной в образовании значения переменной от нее зависящей.
'''
def __init__(self, variable: "VariableNode",
subscriptions: set):
self._variable: "VariableNode" = variable
self._subscriptions: set = subscriptions
@property
def value(self):
'''Метод возвращающий значение переменной и при этом добавляющий его в
подписки.'''
if not isinstance(self._variable, Static):
self._subscriptions.add(self._variable)
value = self._variable.get_value()
if isinstance(value, Hash):
value = value.get_hash()
elif isinstance(value, Table):
value = value.get_table()
return value
@property
def subscriptions(self):
return self._subscriptions
@subscriptions.setter
def subscriptions(self, subscriptions):
self._subscriptions = subscriptions
class DependenceSource:
'''Класс зависимости как источника значения переменной.'''
def __init__(self, variables: tuple,
depend: Optional[Callable] = None):
self._args: Union[tuple, list] = variables
self.depend_function: Union[Callable, None] = depend
self._subscriptions: set = set()
self._args_founded: bool = False
def check(self) -> None:
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._args) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._args) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self) -> Any:
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
self._subscriptions = set()
args = tuple(VariableWrapper(arg, self._subscriptions)
for arg in self._args)
try:
if self.depend_function is None:
return args[-1].value
return self.depend_function(*args)
except CyclicVariableError:
raise
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
def _get_args(self, namespace: "NamespaceNode") -> None:
'''Метод для преобразования списка аргументов функции зависимости,
содержащего переменные и строки, в список аргументов состоящий только
из нод переменных и значений хэшей.'''
if not self._args_founded:
self._args = self.find_variables(self._args, namespace)
self._args_founded = True
def find_variables(self, variables: Union[tuple, list],
namespace: "NamespaceNode") -> List["VariableNode"]:
'''Метод для поиска переменных по заданным путям к ним.'''
output = []
for variable in variables:
if isinstance(variable, str):
variable = Dependence._get_variable(
variable,
current_namespace=namespace)
if variable is None:
raise DependenceError("variable '{}' not found".
format(variable))
output.append(variable)
return output
@property
def subscriptions(self) -> set:
return self._subscriptions
def _check_function(self, function_to_check: FunctionType) -> None:
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
# if not isinstance(function_to_check, LambdaType):
# # Если функция не лямбда, проверяем есть ли у нее возвращаемое
# # значение.
# source_code = getsource(function_to_check)
# for node in ast.walk(ast.parse(source_code)):
# if isinstance(node, ast.Return):
# break
# else:
# raise VariableError("the depend function does not return"
# " anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
self.check_signature(function_to_check, self._args)
@staticmethod
def check_signature(function_to_check: Callable,
arguments: Union[tuple, list]) -> None:
'''Метод для проверки соответствия сигнатуры функции и заданного для
нее набора аргументов.'''
function_signature = signature(function_to_check)
if not len(arguments) == len(function_signature.parameters):
raise DependenceError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(arguments)))
def __ne__(self, other: Any) -> bool:
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other: Any) -> bool:
if not isinstance(other, DependenceSource):
return False
# Сначала сравниваем аргументы.
for l_var, r_var in zip(self._args, other._args):
if l_var != r_var:
return False
if self.depend_function == other.depend_function:
return True
if (self.depend_function is None or other.depend_function is None
or not self._compare_depend_functions(self.depend_function,
other.depend_function)):
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name: str, namespace: "NamespaceNode",
variable_type: type = VariableType,
source: Any = None, fields: list = []):
self.name: str = name
if issubclass(variable_type, VariableType):
self.variable_type: type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating: bool = False
self.fields: list = fields
self.namespace: "NamespaceNode" = namespace
self.namespace.add_variable(self)
self.subscribers: set = set()
# Список текущих подписок, для проверки их актуальности при
# динамическом связывании.
self._subscriptions: set = set()
self.value: Any = None
self._invalidated: bool = True
# Флаг имеющий значение только для переменных типа HashType.
# Предназначен для включения проверки соответствия полей хэша при
# установке значения.
self._fixed: bool = False
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source: Any = source
if source is not None:
self.update_value()
# Флаг, указывающий, что значение было изменено в процессе работы
# утилит или с помощью тега set из шаблона.
self.set_by_user: bool = False
self._readonly: bool = False
def update_value(self) -> None:
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.get_fullname()))
if isinstance(self._source, DependenceSource):
self._source._get_args(self.namespace)
with self._start_calculate():
try:
value = self._source.calculate_value()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in self._source.subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in self._source.subscriptions:
subscription.subscribers.add(self)
self._subscriptions = self._source.subscriptions
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
except DependenceError as error:
raise VariableError('{}: {}'.format(self.get_fullname(),
str(error)))
else:
value = self._source
if self.variable_type is TableType:
self.value = self.variable_type.process_value(value, self)
else:
self.value = self.variable_type.process_value(value, self)
self._invalidated = False
def set_variable_type(self, variable_type: VariableType,
readonly: Union[bool, None] = None,
fixed: Union[bool, None] = None) -> None:
'''Метод для установки типа переменной.'''
if readonly is not None and isinstance(readonly, bool):
self._readonly = readonly
elif fixed is not None and isinstance(fixed, bool):
self._fixed = fixed
if self.variable_type is VariableType:
if isinstance(variable_type, type):
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableError('variable type object must be'
' VariableType or its class method,'
' not {}'.format(type(variable_type)))
elif callable(variable_type):
variable_type(self)
def set(self, value: Any) -> None:
'''Метод для установки временного пользовательского значения
переменной.'''
# Сбрасываем флаги, чтобы провести инвалидацию.
self._invalidated = False
self.set_by_user = False
self.value = self.variable_type.process_value(value, self)
self._invalidate(set_by_user=True)
def reset(self) -> None:
'''Метод для сброса пользовательского значения.'''
if self.set_by_user:
self._invalidated = False
self.set_by_user = False
self._invalidate()
@property
def source(self) -> None:
return self._source
@source.setter
def source(self, source: Any) -> None:
# Если источники не совпадают или текущее значение переменной было
# установлено пользователем, то инвалидируем переменную и меняем
# источник.
if self._source != source or self.set_by_user:
self.set_by_user = False
self._invalidate()
self._source = source
@property
def readonly(self) -> bool:
return self._readonly
@readonly.setter
def readonly(self, value: bool) -> None:
# if self.value is None and self._source is not None:
# # TODO выводить предупреждение если переменная инвалидирована,
# # нет источника и при этом устанавливается флаг readonly.
# self.update_value()
self._readonly = value
@property
def fixed(self) -> bool:
return self._fixed
@fixed.setter
def fixed(self, value: bool) -> bool:
self._fixed = value
def _invalidate(self, set_by_user: bool = False) -> None:
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
if not self._invalidated and not self.set_by_user:
self._invalidated = True
self.set_by_user = set_by_user
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self) -> Generator["VariableNode", None, None]:
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета. В данном случае необходима только для
того, чтобы при получении значения параметра внутри метода для расчета
не иницировалось ее обновление, которое может привести к рекурсии.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self) -> Any:
'''Метод для получения значения переменной.'''
if self._invalidated and self._source is None and self._value is None:
return None
if self._invalidated and not self.set_by_user:
self.update_value()
return self.value
def get_fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.get_fullname(), self.name)
else:
return self.name
def __getitem__(self, key: str) -> HashValue:
if self.variable_type is HashType:
if key in self.get_value():
return self.value[key]
else:
raise VariableNotFoundError("value '{}' is not found in hash"
" variable '{}'".format(
key, self.get_fullname()))
else:
raise VariableError("'{}' variable type is not subscriptable.".
format(self.variable_type.name))
def __repr__(self) -> str:
return '<Variable: {} with value: {}>'.format(self.get_fullname(),
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name: str = '',
parent: Optional["NamespaceNode"] = None):
self._name = name
self._variables = dict()
self._namespaces = dict()
self._parent = parent
def add_variable(self, variable: VariableNode) -> None:
'''Метод для добавления переменной в пространство имен.'''
if variable.name in self._namespaces:
raise VariableError("namespace with the name '{}' is already in"
" the namespace '{}'".format(
variable.name,
self.get_fullname()))
self._variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace: "NamespaceNode") -> None:
'''Метод для добавления пространства имен в пространство имен.'''
if namespace._name in self._variables:
raise VariableError("variable with the name '{}' is already in"
" the namespace '{}'".format(
namespace._name,
self.get_fullname()))
self._namespaces.update({namespace._name: namespace})
namespace._parent = self
def clear(self) -> None:
'''Метод для очистки пространства имен. Очищает и пространства имен
и переменные. Предназначен только для использования в calculate.ini.'''
for namespace_name in self._namespaces.keys():
self._namespaces[namespace_name].clear()
self._variables.clear()
self._namespaces.clear()
def get_fullname(self) -> str:
'''Метод для получения полного имени пространства имен.'''
if self._parent is not None and self._parent._name != '<root>':
return '{}.{}'.format(self._parent.get_fullname(), self._name)
else:
return self._name
def get_package_name(self) -> str:
if self._parent._name == '<root>':
return self._name
else:
return self._parent.get_package_name()
def __getattr__(self, name: str) -> Any:
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self._namespaces:
return self._namespaces[name]
elif name in self._variables:
variable = self._variables[name]
if variable.variable_type is TableType:
return variable.get_value()
return variable.get_value()
else:
if self.get_package_name() == "custom":
return None
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self._name))
def __getitem__(self, name: str) -> None:
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self._namespaces:
return self._namespaces[name]
elif name in self._variables:
return self._variables[name]
else:
if self.get_package_name() == "custom":
return None
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self._name))
def __contains__(self, name: str) -> bool:
return name in self._namespaces or name in self._variables
def __repr__(self) -> str:
return '<Namespace: {}>'.format(self.get_fullname())
def __deepcopy__(self, memo: dict) -> "NamespaceNode":
'''Пространство имен не копируется даже при глубоком копировании.'''
return self
class DependenceAPI(metaclass=Singleton):
'''Класс образующий интерфейс для создания зависимостей.'''
def __init__(self):
self.current_namespace: NamespaceNode = None
self.datavars_root = None
def __call__(self, *variables: list,
depend: Union[Callable, None] = None) -> DependenceSource:
subscriptions = list()
for variable in variables:
if not (isinstance(variable, str) or
isinstance(variable, VariableNode) or
isinstance(variable, Static)):
variable = Static(variable)
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _get_variable(self, variable_name: str,
current_namespace: Union[NamespaceNode, None] = None
) -> VariableNode:
'''Метод для поиска переменной в пространствах имен.'''
if current_namespace is None:
current_namespace = self.current_namespace
return self.find_variable(variable_name, self.datavars_root,
current_namespace=current_namespace)
@staticmethod
def find_variable(variable_name: str,
datavars_root,
current_namespace: Union[NamespaceNode, None] = None
) -> VariableNode:
'''Метод для поиска переменных по строковому пути от корня переменных
или от текущего пространства имен.'''
name_parts = variable_name.split('.')
if not name_parts[0]:
if current_namespace is not None:
namespace = current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace._parent
else:
name_parts = name_parts[index:]
break
else:
raise VariableNotFoundError(
f"variable '{variable_name}' is not found.")
else:
namespace = datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class CopyAPI(metaclass=Singleton):
'''Класс для создания зависимостей представляющих собой простое копирование
значения переменной в зависимую переменную.'''
def __call__(self, variable: Union[str, VariableNode]) -> "Dependence":
return Dependence(variable)
class FormatAPI(metaclass=Singleton):
'''Класс для создания зависимостей представляющих собой форматируемую
строку с указанием переменных в тех местах, где их значения должны быть
подставлены в строку.'''
pattern = re.compile(
r'{\s*([a-zA-Z][a-zA-Z_0-9]+)?(.[a-zA-Z][a-zA-Z_0-9]+)+\s*}')
def __call__(self, string: str) -> "Dependence":
vars_list = []
def subfunc(matchobj: re.Match):
vars_list.append(matchobj.group(0)[1:-1].strip())
return '{}'
format_string = self.pattern.sub(subfunc, string)
def depend_function(*args):
values = [arg.value for arg in args]
return format_string.format(*values)
return Dependence(*vars_list, depend=depend_function)
class CalculateAPI(metaclass=Singleton):
'''Метод для создания зависимостей, представляющих собой функцию для
вычисления значения зависимой переменной на основе значений указанных
переменных.'''
def __call__(self, *args) -> "Dependence":
depend_function = args[0]
return Dependence(*args[1:], depend=depend_function)
class VariableAPI(metaclass=Singleton):
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace: Union[NamespaceNode, None] = None
# TODO Продумать другой способ обработки ошибок.
self.errors: list = []
def __call__(self, name: str,
source: Any = None,
type=VariableType,
readonly: bool = False,
fixed: bool = False,
force: bool = False,
fields: Union[list, None] = None) -> "Dependence":
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace._variables:
variable = VariableNode(name, self.current_namespace)
else:
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error),
variable.get_fullname()))
else:
variable.source = source
if fields:
variable.fields = fields
if readonly:
variable.set_variable_type(type, readonly=True)
elif fixed:
variable.set_variable_type(type, fixed=True)
else:
variable.set_variable_type(type)
return variable
class NamespaceAPI(metaclass=Singleton):
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI,
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.current_namespace = self._datavars
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self) -> NamespaceNode:
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_datavars(self, datavars) -> None:
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars
self._variables_fabric.current_namespace = self._datavars
def reset(self) -> None:
'''Метод для сброса корневого пространства имен.'''
if isinstance(self._datavars, NamespaceNode):
self._datavars = NamespaceNode('<root>')
else:
self._datavars.reset()
self.current_namespace = self._datavars
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
def set_current_namespace(self, namespace: NamespaceNode) -> None:
'''Метод для установки текущего пространства имен, в которое будут
добавляться далее переменные и пространства имен.'''
self.current_namespace = namespace
self._variables_fabric.current_namespace = namespace
self._dependence_fabric.current_namespace = namespace
@contextmanager
def __call__(self, namespace_name: str
) -> Generator["NamespaceAPI", None, None]:
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.current_namespace._namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.current_namespace)
else:
namespace = self.current_namespace._namespaces[namespace_name]
self.current_namespace.add_namespace(namespace)
self.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = self.current_namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
try:
yield self
finally:
self.current_namespace = self.current_namespace._parent
self._variables_fabric.current_namespace = self.current_namespace
self._dependence_fabric.current_namespace = self.current_namespace
Dependence = DependenceAPI()
Copy = CopyAPI()
Format = FormatAPI()
Calculate = CalculateAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)