You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

514 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import re
import ast
import dis
from contextlib import contextmanager
from inspect import signature, getsource
from types import FunctionType, LambdaType
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
@classmethod
def check(self, value: str) -> bool:
return True
class StringType(VariableType):
pass
class IntegerType(VariableType):
integer_pattern = re.compile(r"^-?\d+$")
@classmethod
def check(self, value) -> bool:
if isinstance(value, int):
return True
return self.integer_pattern.match(value)
class FloatType(VariableType):
float_pattern = re.compile(r"^-?\d+(\.\d+)?$")
@classmethod
def check(self, value) -> bool:
if isinstance(value, float):
return True
return self.integer_pattern.match(value)
class BooleanType(VariableType):
bool_available = {'True', 'False'}
@classmethod
def check(self, value) -> bool:
return isinstance(value, bool) or value in self.bool_available
class ListType(VariableType):
pass
class ReadonlyType(VariableType):
pass
class HashType(VariableType):
pass
class TableType(VariableType):
pass
class DependenceSource:
current_namespace = None
datavars_root = None
def __init__(self, variables, depend=None):
self.error = None
self._subscriptions = variables
self.depend_function = depend
def check(self):
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._subscriptions) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._subscriptions) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self):
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
args_values = tuple(subscription.get_value() for subscription
in self._subscriptions)
print('args_values: {}'.format(args_values))
try:
if self.depend_function is None:
return args_values[-1]
return self.depend_function(*args_values)
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.fullname
for subscription
in self._subscriptions]),
str(error)))
@property
def subscriptions(self):
return self._subscriptions
def _check_function(self, function_to_check: FunctionType):
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
if not isinstance(function_to_check, LambdaType):
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
# значение.
for node in ast.walk(ast.parse(getsource(function_to_check))):
if isinstance(node, ast.Return):
break
else:
raise VariableError("the depend function does not return"
" anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
function_signature = signature(function_to_check)
print('subscribers: {}'.format(self._subscriptions))
if not len(self._subscriptions) == len(function_signature.parameters):
raise VariableError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(self._subscriptions)))
def __ne__(self, other):
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other):
if not isinstance(other, DependenceSource):
return False
if not self._compare_depend_functions(self.depend_function,
other.depend_function):
return False
for l_var, r_var in zip(self._subscriptions, other._subscriptions):
if l_var != r_var:
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class HashSource:
def __init__(self):
pass
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name, namespace, variable_type=StringType, source=None):
self.name = name
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating = False
self.namespace = namespace
self.namespace.add_variable(self)
self.subscribers = set()
# Если значение переменной None -- значит она обнулена.
self.value = None
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source = source
if source is not None:
self.update_value()
def update_value(self):
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.fullname))
if isinstance(self._source, DependenceSource):
with self._start_calculate():
try:
self.value = self._source.calculate_value()
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
else:
self.value = self._source
@property
def source(self):
return self._source
@source.setter
def source(self, source):
if self._source != source:
self._invalidate()
self._source = source
if isinstance(source, DependenceSource):
for subscription in source._subscriptions:
subscription.subscribers.add(self)
def _invalidate(self):
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
print('{} is invalidated'.format(self.fullname))
if self.value is not None:
self.value = None
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self):
'''Метод для получения значения переменной.'''
if self.value is None:
self.update_value()
return self.value
@property
def fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.fullname, self.name)
else:
return self.name
def __repr__(self):
return '<Variable: {} with value: {}>'.format(self.fullname,
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name='', parent=None):
self.name = name
self.variables = dict()
self.namespaces = dict()
self.parent = parent
def add_variable(self, variable: VariableNode):
'''Метод для добавления переменной в пространство имен.'''
self.variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace):
'''Метод для добавления пространства имен в пространство имен.'''
self.namespaces.update({namespace.name: namespace})
namespace.parent = self
@property
def fullname(self) -> str:
if self.parent is not None:
return '{}.{}'.format(self.parent.fullname, self.name)
else:
return self.name
def __getattr__(self, name: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name].get_value()
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __getitem__(self, name: str):
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __contains__(self, name):
return name in self.namespaces or name in self.variables
def __repr__(self):
return '<Namespace: {}>'.format(self.fullname)
class HashNode:
def __init__(self, name, namespace, sources, types=dict()):
self.name = name
self.namespace = namespace
self.calculating = False
self.namespace.add_variable(self)
self.sources = sources
self.subscribers = dict()
self.values = dict()
self.types = dict()
for key, source in sources:
self.values.update({key: None})
if key in types:
if issubclass(types[key], VariableType):
self.types.update({key: types[key]})
else:
raise VariableTypeError('variable_type must be'
' VariableType, but not {}'.format(
type(types[key])))
else:
self.types.update({key: StringType})
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self.update_values()
class DependenceAPI:
def __init__(self):
self.current_namespace = None
self.datavars_root = None
def __call__(self, *variables, depend=None):
subscriptions = list()
for variable in variables:
if isinstance(variable, str):
variable = self._find_variable(variable)
if variable is None:
raise DependenceError("variable '{}' not found".format(
variable))
elif not isinstance(variable, VariableNode):
raise DependenceError("dependence variables must be 'str' or"
" 'VariableNode' not '{}'".format(
type(variable)))
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _find_variable(self, variable_name):
'''Метод для поиска переменной в пространстве'''
name_parts = variable_name.split('.')
if not name_parts[0]:
namespace = self.current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace.parent
else:
name_parts = name_parts[index:]
break
else:
namespace = self.datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class VariableAPI:
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace = None
# TODO Продумать другой способ обработки ошибок.
self.errors = []
def __call__(self, name: str, source=None):
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace.variables:
variable = VariableNode(name, self.current_namespace)
else:
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error), variable.fullname))
else:
variable.source = source
return variable
class NamespaceAPI:
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI(),
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.namespace_stack = [datavars_root]
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self):
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_root(self, datavars_root: NamespaceNode):
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars_root
self.namespace_stack = [datavars_root]
self._variables_fabric.current_namespace = self._datavars
def reset(self):
'''Метод для сброса корневого пространства имен.'''
self._datavars = NamespaceNode('<root>')
self.namespace_stack = [self._datavars]
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
@contextmanager
def __call__(self, namespace_name):
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.namespace_stack[-1].namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.namespace_stack[-1])
else:
namespace = self.namespace_stack[-1].namespaces[namespace_name]
self.namespace_stack[-1].add_namespace(namespace)
self.namespace_stack.append(namespace)
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
print('current_namespace in DependenceAPI: {}'.format(
namespace.fullname))
try:
yield self
finally:
current_namespace = self.namespace_stack.pop()
self._variables_fabric.current_namespace = current_namespace
self._dependence_fabric.current_namespace = current_namespace
Dependence = DependenceAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)