You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

316 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import re
import ast
from contextlib import contextmanager
from inspect import signature, getsource
from types import FunctionType, LambdaType
class VariableType:
'''Базовый класс для типов.'''
@classmethod
def check(self, value: str) -> bool:
return True
class VariableError(Exception):
pass
class VariableTypeError(Exception):
pass
class DependenceError(Exception):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class StringType(VariableType):
pass
class IntegerType(VariableType):
integer_pattern = re.compile(r"^-?\d+$")
@classmethod
def check(self, value: str) -> bool:
return self.integer_pattern.match(value)
class FloatType(VariableType):
float_pattern = re.compile(r"^-?\d+(\.\d+)?$")
@classmethod
def check(self, value: str) -> bool:
return self.integer_pattern.match(value)
class BooleanType(VariableType):
pass
class ListType(VariableType):
pass
class ReadonlyType(VariableType):
pass
class Dependence:
def __init__(self, *variables, depend=None):
self._subscriptions = variables
self.depend_function = depend
def check(self):
if self.depend_function is None:
if len(self._subscriptions) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._subscriptions) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self):
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
try:
if self.depend_function is None:
return self._subscriptions[-1].get_value()
return self.depend_function(*(subscription.get_value()
for subscription
in self._subscriptions))
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
self._subscriptions),
str(error)))
@property
def subscriptions(self):
return self._subscriptions
def _check_function(self, function_to_check: FunctionType):
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
if not isinstance(function_to_check, LambdaType):
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
# значение.
for node in ast.walk(ast.parse(getsource(function_to_check))):
if isinstance(node, ast.Return):
break
else:
raise VariableError("the depend function does not return"
" anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
function_signature = signature(function_to_check)
if not len(self._subscriptions) == len(function_signature.parameters):
raise VariableError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(self._subscriptions)))
class VariableNode:
def __init__(self, name, namespace, variable_type=StringType, source=None):
self.name = name
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating = False
self.namespace = namespace
self.namespace.add_variable(self)
self.subscribers = set()
# Если значение переменной None -- значит она обнулена.
self.value = None
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source = source
if source is not None:
self.update_value()
def update_value(self):
if self.calculating:
raise CyclicVariableError(self.name)
self._invalidate()
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.fullname))
if isinstance(self._source, Dependence):
with self._start_calculate():
try:
self.value = self._source.calculate_value()
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
else:
self.value = self._source
@property
def source(self):
return self._source
@source.setter
def source(self, source):
self._invalidate()
self._source = source
if isinstance(source, Dependence):
for subscription in source._subscriptions:
subscription.subscribers.add(self)
def _invalidate(self):
self.value = None
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self):
if self.value is None:
self.update_value()
return self.value
@property
def fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.fullname, self.name)
else:
return self.name
def __repr__(self):
return '<Variable: {} with value: {}>'.format(self.fullname,
self.value or
'INVALIDATED')
class NamespaceNode:
def __init__(self, name='', parent=None):
self.name = name
self.variables = dict()
self.namespaces = dict()
self.parent = parent
def add_variable(self, variable: VariableNode):
self.variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace):
self.namespaces.update({namespace.name: namespace})
namespace.parent = self
@property
def fullname(self) -> str:
if self.parent is not None:
return '{}.{}'.format(self.parent.fullname, self.name)
else:
return self.name
def get_variable_node(self, name):
if name in self.variables:
return self.variables[name]
else:
raise VariableError("Variable '{variable_name}' is not found in"
" the namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __getattr__(self, name: str):
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name].get_value()
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __contains__(self, name):
return name in self.childs or name in self.variables
def __repr__(self):
return '<Namespace: {}>'.format(self.fullname)
class VariableAPI:
def __init__(self):
self.current_namespace = None
self.errors = []
def __call__(self, name: str, source=None):
variable = VariableNode(name, self.current_namespace)
if isinstance(source, Dependence):
try:
source.check()
variable.source = source
except DependenceError as error:
self.errors.append('Dependence error: {} in variable: {}'.
format(str(error), variable.fullname))
else:
variable.source = source
return variable
class NamespaceAPI:
def __init__(self, var_fabric: VariableAPI,
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self._variables_fabric = var_fabric
self.namespace_stack = [datavars_root]
self._variables_fabric.current_namespace = self._datavars
@property
def datavars(self):
return self._datavars
def set_root(self, datavars_root: NamespaceNode):
self._datavars = datavars_root
self.namespace_stack = [datavars_root]
self._variables_fabric.current_namespace = self._datavars
@contextmanager
def __call__(self, namespace_name):
new_namespace = NamespaceNode(
namespace_name,
parent=self.namespace_stack[-1])
self.namespace_stack[-1].add_namespace(new_namespace)
self.namespace_stack.append(new_namespace)
self._variables_fabric.current_namespace = new_namespace
try:
yield self
finally:
self._variables_fabric.current_namespace =\
self.namespace_stack.pop()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable)