|
|
import re
|
|
|
import ast
|
|
|
import dis
|
|
|
from contextlib import contextmanager
|
|
|
from inspect import signature, getsource
|
|
|
from types import FunctionType, LambdaType
|
|
|
|
|
|
|
|
|
class VariableType:
|
|
|
'''Базовый класс для типов.'''
|
|
|
@classmethod
|
|
|
def check(self, value: str) -> bool:
|
|
|
return True
|
|
|
|
|
|
|
|
|
class DependenceError(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class VariableError(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class VariableTypeError(VariableError):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class VariableNotFoundError(VariableError):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class CyclicVariableError(VariableError):
|
|
|
def __init__(self, *queue):
|
|
|
self.queue = queue
|
|
|
|
|
|
def __str__(self):
|
|
|
return "Cyclic dependence in variables: {}".format(", ".join(
|
|
|
self.queue[:-1]))
|
|
|
|
|
|
|
|
|
class StringType(VariableType):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class IntegerType(VariableType):
|
|
|
integer_pattern = re.compile(r"^-?\d+$")
|
|
|
|
|
|
@classmethod
|
|
|
def check(self, value) -> bool:
|
|
|
if isinstance(value, int):
|
|
|
return True
|
|
|
return self.integer_pattern.match(value)
|
|
|
|
|
|
|
|
|
class FloatType(VariableType):
|
|
|
float_pattern = re.compile(r"^-?\d+(\.\d+)?$")
|
|
|
|
|
|
@classmethod
|
|
|
def check(self, value) -> bool:
|
|
|
if isinstance(value, float):
|
|
|
return True
|
|
|
return self.integer_pattern.match(value)
|
|
|
|
|
|
|
|
|
class BooleanType(VariableType):
|
|
|
bool_available = {'True', 'False'}
|
|
|
|
|
|
@classmethod
|
|
|
def check(self, value) -> bool:
|
|
|
return isinstance(value, bool) or value in self.bool_available
|
|
|
|
|
|
|
|
|
class ListType(VariableType):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class ReadonlyType(VariableType):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class DependenceSource:
|
|
|
current_namespace = None
|
|
|
datavars_root = None
|
|
|
|
|
|
def __init__(self, variables, depend=None):
|
|
|
self.error = None
|
|
|
self._subscriptions = variables
|
|
|
self.depend_function = depend
|
|
|
|
|
|
def check(self):
|
|
|
'''Метод для запуска проверки корректности функции зависимости, а также
|
|
|
сопоставления ее числу заданных зависимостей.'''
|
|
|
if self.depend_function is None:
|
|
|
if len(self._subscriptions) > 1:
|
|
|
raise DependenceError('the depend function is needed if the'
|
|
|
' number of dependencies is more than'
|
|
|
' one')
|
|
|
elif len(self._subscriptions) == 0:
|
|
|
raise DependenceError('dependence is set without variables')
|
|
|
else:
|
|
|
self._check_function(self.depend_function)
|
|
|
|
|
|
def calculate_value(self):
|
|
|
'''Метод для расчета значения переменной с использованием зависимостей
|
|
|
и заданной для них функции.'''
|
|
|
args_values = tuple(subscription.get_value() for subscription
|
|
|
in self._subscriptions)
|
|
|
print('args_values: {}'.format(args_values))
|
|
|
try:
|
|
|
if self.depend_function is None:
|
|
|
return args_values[-1]
|
|
|
return self.depend_function(*args_values)
|
|
|
except Exception as error:
|
|
|
raise DependenceError('can not calculate using dependencies: {}'
|
|
|
' reason: {}'.format(', '.join(
|
|
|
[subscription.fullname
|
|
|
for subscription
|
|
|
in self._subscriptions]),
|
|
|
str(error)))
|
|
|
|
|
|
@property
|
|
|
def subscriptions(self):
|
|
|
return self._subscriptions
|
|
|
|
|
|
def _check_function(self, function_to_check: FunctionType):
|
|
|
'''Метод для проверки того, возращает ли функция какое-либо значение, а
|
|
|
также того, совпадает ли число подписок с числом аргументов этой
|
|
|
функции.'''
|
|
|
if not isinstance(function_to_check, LambdaType):
|
|
|
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
|
|
|
# значение.
|
|
|
for node in ast.walk(ast.parse(getsource(function_to_check))):
|
|
|
if isinstance(node, ast.Return):
|
|
|
break
|
|
|
else:
|
|
|
raise VariableError("the depend function does not return"
|
|
|
" anything in variable")
|
|
|
|
|
|
# Проверяем совпадение количества аргументов функции и заданных для
|
|
|
# функции переменных.
|
|
|
function_signature = signature(function_to_check)
|
|
|
print('subscribers: {}'.format(self._subscriptions))
|
|
|
if not len(self._subscriptions) == len(function_signature.parameters):
|
|
|
raise VariableError("the depend function takes {} arguments,"
|
|
|
" while {} is given".format(
|
|
|
len(function_signature.parameters),
|
|
|
len(self._subscriptions)))
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
if not isinstance(other, DependenceSource):
|
|
|
return True
|
|
|
return not self == other
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
if not isinstance(other, DependenceSource):
|
|
|
return False
|
|
|
|
|
|
if not self._compare_depend_functions(self.depend_function,
|
|
|
other.depend_function):
|
|
|
return False
|
|
|
for l_var, r_var in zip(self._subscriptions, other._subscriptions):
|
|
|
if l_var != r_var:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def _compare_depend_functions(self, l_func: FunctionType,
|
|
|
r_func: FunctionType) -> bool:
|
|
|
'''Метод для сравнения функций путем сравнения инструкций, полученных
|
|
|
в результате их дизассемблирования.'''
|
|
|
l_instructions = list(dis.get_instructions(l_func))
|
|
|
r_instructions = list(dis.get_instructions(r_func))
|
|
|
|
|
|
for l_instr, r_instr in zip(l_instructions, r_instructions):
|
|
|
if l_instr.opname != r_instr.opname:
|
|
|
return False
|
|
|
if l_instr.arg != r_instr.arg:
|
|
|
return False
|
|
|
if ((l_instr.argval != l_instr.argrepr) and
|
|
|
(r_instr.argval != r_instr.argrepr)):
|
|
|
if r_instr.argval != l_instr.argval:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
class VariableNode:
|
|
|
'''Класс ноды соответствующей переменной в дереве переменных.'''
|
|
|
def __init__(self, name, namespace, variable_type=StringType, source=None):
|
|
|
self.name = name
|
|
|
if issubclass(variable_type, VariableType):
|
|
|
self.variable_type = variable_type
|
|
|
else:
|
|
|
raise VariableTypeError('variable_type must be VariableType'
|
|
|
', but not {}'.format(type(variable_type)))
|
|
|
self.calculating = False
|
|
|
|
|
|
self.namespace = namespace
|
|
|
self.namespace.add_variable(self)
|
|
|
|
|
|
self.subscribers = set()
|
|
|
|
|
|
# Если значение переменной None -- значит она обнулена.
|
|
|
self.value = None
|
|
|
# Источник значения переменной, может быть значением, а может быть
|
|
|
# зависимостью.
|
|
|
self._source = source
|
|
|
if source is not None:
|
|
|
self.update_value()
|
|
|
|
|
|
def update_value(self):
|
|
|
'''Метод для обновления значения переменной с помощью указанного
|
|
|
источника ее значения.'''
|
|
|
if self.calculating:
|
|
|
raise CyclicVariableError(self.name)
|
|
|
|
|
|
if self._source is None:
|
|
|
raise VariableError("No sources to update variable '{}'".
|
|
|
format(self.fullname))
|
|
|
|
|
|
if isinstance(self._source, DependenceSource):
|
|
|
with self._start_calculate():
|
|
|
try:
|
|
|
self.value = self._source.calculate_value()
|
|
|
except CyclicVariableError as error:
|
|
|
raise CyclicVariableError(self.name, *error.queue)
|
|
|
else:
|
|
|
self.value = self._source
|
|
|
|
|
|
@property
|
|
|
def source(self):
|
|
|
return self._source
|
|
|
|
|
|
@source.setter
|
|
|
def source(self, source):
|
|
|
if self._source != source:
|
|
|
self._invalidate()
|
|
|
self._source = source
|
|
|
if isinstance(source, DependenceSource):
|
|
|
for subscription in source._subscriptions:
|
|
|
subscription.subscribers.add(self)
|
|
|
|
|
|
def _invalidate(self):
|
|
|
if self.value is not None:
|
|
|
self.value = None
|
|
|
for subscriber in self.subscribers:
|
|
|
subscriber._invalidate()
|
|
|
|
|
|
@contextmanager
|
|
|
def _start_calculate(self):
|
|
|
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
|
|
|
переменная в состоянии расчета.'''
|
|
|
try:
|
|
|
self.calculating = True
|
|
|
yield self
|
|
|
finally:
|
|
|
self.calculating = False
|
|
|
|
|
|
def get_value(self):
|
|
|
'''Метод для получения значения переменной.'''
|
|
|
if self.value is None:
|
|
|
self.update_value()
|
|
|
return self.value
|
|
|
|
|
|
@property
|
|
|
def fullname(self) -> str:
|
|
|
if self.namespace:
|
|
|
return "{}.{}".format(self.namespace.fullname, self.name)
|
|
|
else:
|
|
|
return self.name
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Variable: {} with value: {}>'.format(self.fullname,
|
|
|
self.value or
|
|
|
'INVALIDATED')
|
|
|
|
|
|
|
|
|
class NamespaceNode:
|
|
|
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
|
|
|
def __init__(self, name='', parent=None):
|
|
|
self.name = name
|
|
|
self.variables = dict()
|
|
|
self.namespaces = dict()
|
|
|
self.parent = parent
|
|
|
|
|
|
def add_variable(self, variable: VariableNode):
|
|
|
'''Метод для добавления переменной в пространство имен.'''
|
|
|
self.variables.update({variable.name: variable})
|
|
|
variable.namespace = self
|
|
|
|
|
|
def add_namespace(self, namespace):
|
|
|
'''Метод для добавления пространства имен в пространство имен.'''
|
|
|
self.namespaces.update({namespace.name: namespace})
|
|
|
namespace.parent = self
|
|
|
|
|
|
@property
|
|
|
def fullname(self) -> str:
|
|
|
if self.parent is not None:
|
|
|
return '{}.{}'.format(self.parent.fullname, self.name)
|
|
|
else:
|
|
|
return self.name
|
|
|
|
|
|
def __getattr__(self, name: str):
|
|
|
'''Метод возвращает ноду пространства имен или значение переменной.'''
|
|
|
if name in self.namespaces:
|
|
|
return self.namespaces[name]
|
|
|
elif name in self.variables:
|
|
|
return self.variables[name].get_value()
|
|
|
else:
|
|
|
raise VariableError("'{variable_name}' is not found in the"
|
|
|
" namespace '{namespace_name}'".format(
|
|
|
variable_name=name,
|
|
|
namespace_name=self.name))
|
|
|
|
|
|
def __getitem__(self, name: str):
|
|
|
'''Метод возвращает ноду пространства имен или ноду переменной.'''
|
|
|
if name in self.namespaces:
|
|
|
return self.namespaces[name]
|
|
|
elif name in self.variables:
|
|
|
return self.variables[name]
|
|
|
else:
|
|
|
raise VariableError("'{variable_name}' is not found in the"
|
|
|
" namespace '{namespace_name}'".format(
|
|
|
variable_name=name,
|
|
|
namespace_name=self.name))
|
|
|
|
|
|
def __contains__(self, name):
|
|
|
return name in self.namespaces or name in self.variables
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Namespace: {}>'.format(self.fullname)
|
|
|
|
|
|
|
|
|
class DependenceAPI:
|
|
|
def __init__(self):
|
|
|
self.current_namespace = None
|
|
|
self.datavars_root = None
|
|
|
|
|
|
def __call__(self, *variables, depend=None):
|
|
|
subscriptions = list()
|
|
|
for variable in variables:
|
|
|
if isinstance(variable, str):
|
|
|
variable = self._find_variable(variable)
|
|
|
if variable is None:
|
|
|
raise DependenceError("variable '{}' not found".format(
|
|
|
variable))
|
|
|
elif not isinstance(variable, VariableNode):
|
|
|
raise DependenceError("dependence variables must be 'str' or"
|
|
|
" 'VariableNode' not '{}'".format(
|
|
|
type(variable)))
|
|
|
subscriptions.append(variable)
|
|
|
return DependenceSource(subscriptions, depend=depend)
|
|
|
|
|
|
def _find_variable(self, variable_name):
|
|
|
'''Метод для поиска переменной в пространстве'''
|
|
|
variable = None
|
|
|
name_parts = variable_name.split('.')
|
|
|
|
|
|
if not name_parts[0]:
|
|
|
namespace = self.current_namespace
|
|
|
index = 1
|
|
|
while not name_parts[index]:
|
|
|
namespace = namespace.parent
|
|
|
index += 1
|
|
|
name_parts = name_parts[index:]
|
|
|
else:
|
|
|
namespace = self.datavars_root
|
|
|
|
|
|
variable = namespace
|
|
|
for part in name_parts:
|
|
|
variable = variable[part]
|
|
|
|
|
|
return variable
|
|
|
|
|
|
|
|
|
class VariableAPI:
|
|
|
'''Класс для создания переменных при задании их через
|
|
|
python-скрипты.'''
|
|
|
def __init__(self):
|
|
|
self.current_namespace = None
|
|
|
# TODO Продумать другой способ обработки ошибок.
|
|
|
self.errors = []
|
|
|
|
|
|
def __call__(self, name: str, source=None):
|
|
|
'''Метод для создания переменных внутри with Namespace('name').'''
|
|
|
if name not in self.current_namespace.variables:
|
|
|
variable = VariableNode(name, self.current_namespace)
|
|
|
else:
|
|
|
variable = self.current_namespace[name]
|
|
|
|
|
|
if isinstance(source, DependenceSource):
|
|
|
try:
|
|
|
source.check()
|
|
|
variable.source = source
|
|
|
except DependenceError as error:
|
|
|
raise VariableError('Dependence error: {} in variable: {}'.
|
|
|
format(str(error), variable.fullname))
|
|
|
else:
|
|
|
variable.source = source
|
|
|
|
|
|
return variable
|
|
|
|
|
|
|
|
|
class NamespaceAPI:
|
|
|
'''Класс для создания пространств имен при задании переменных через
|
|
|
python-скрипты.'''
|
|
|
def __init__(self, var_fabric: VariableAPI,
|
|
|
dependence_fabric: DependenceAPI(),
|
|
|
datavars_root=NamespaceNode('<root>')):
|
|
|
self._datavars = datavars_root
|
|
|
self.namespace_stack = [datavars_root]
|
|
|
|
|
|
# Привязываем фабрику переменных.
|
|
|
self._variables_fabric = var_fabric
|
|
|
self._variables_fabric.current_namespace = self._datavars
|
|
|
|
|
|
# Привязываем фабрику зависимостей.
|
|
|
self._dependence_fabric = dependence_fabric
|
|
|
self._dependence_fabric.current_namespace = self._datavars
|
|
|
self._dependence_fabric.datavars_root = self._datavars
|
|
|
|
|
|
@property
|
|
|
def datavars(self):
|
|
|
'''Метод для получения корневого пространства имен, через которое далее
|
|
|
можно получить доступ к переменным.'''
|
|
|
return self._datavars
|
|
|
|
|
|
def set_root(self, datavars_root: NamespaceNode):
|
|
|
'''Метод для установки корневого пространства имен, которое пока что
|
|
|
будет использоваться для предоставления доступа к переменным.'''
|
|
|
self._datavars = datavars_root
|
|
|
self.namespace_stack = [datavars_root]
|
|
|
self._variables_fabric.current_namespace = self._datavars
|
|
|
|
|
|
def reset(self):
|
|
|
'''Метод для сброса корневого пространства имен.'''
|
|
|
self._datavars = NamespaceNode('<root>')
|
|
|
self.namespace_stack = [self._datavars]
|
|
|
|
|
|
self._variables_fabric.current_namespace = self._datavars
|
|
|
self._dependence_fabric.current_namespace = self._datavars
|
|
|
|
|
|
@contextmanager
|
|
|
def __call__(self, namespace_name):
|
|
|
'''Метод для создания пространств имен с помощью with.'''
|
|
|
if namespace_name not in self.namespace_stack[-1].namespaces:
|
|
|
namespace = NamespaceNode(namespace_name,
|
|
|
parent=self.namespace_stack[-1])
|
|
|
else:
|
|
|
namespace = self.namespace_stack[-1].namespaces[namespace_name]
|
|
|
|
|
|
self.namespace_stack[-1].add_namespace(namespace)
|
|
|
self.namespace_stack.append(namespace)
|
|
|
# Устанавливаем текущее пространство имен фабрике переменных.
|
|
|
self._variables_fabric.current_namespace = namespace
|
|
|
|
|
|
# Устанавливаем текущее пространство имен фабрике зависимостей.
|
|
|
self._dependence_fabric.current_namespace = namespace
|
|
|
print('current_namespace in DependenceAPI: {}'.format(
|
|
|
namespace.fullname))
|
|
|
try:
|
|
|
yield self
|
|
|
finally:
|
|
|
current_namespace = self.namespace_stack.pop()
|
|
|
self._variables_fabric.current_namespace = current_namespace
|
|
|
self._dependence_fabric.current_namespace = current_namespace
|
|
|
|
|
|
|
|
|
Dependence = DependenceAPI()
|
|
|
Variable = VariableAPI()
|
|
|
Namespace = NamespaceAPI(Variable, Dependence)
|