You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

620 lines
24 KiB

import ast
import dis
from contextlib import contextmanager
from inspect import signature, getsource
from types import FunctionType, LambdaType
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
name = 'base'
@classmethod
def process_value(self, value, variable):
pass
class StringType(VariableType):
name = 'string'
@classmethod
def process_value(self, value, variable) -> str:
if isinstance(value, str):
return value
else:
try:
return str(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" string variable: {reason}".format(
value=value,
reason=str(error)))
class IntegerType(VariableType):
name = 'integer'
@classmethod
def process_value(self, value, variable) -> int:
if isinstance(value, int):
return value
else:
try:
return int(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" interger variable: {reason}".format(
value=value,
reason=str(error)))
class FloatType(VariableType):
name = 'float'
@classmethod
def process_value(self, value, variable) -> float:
if isinstance(value, float):
return True
else:
try:
return float(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" float variable: {reason}".format(
value=value,
reason=str(error)))
class BooleanType(VariableType):
name = 'bool'
true_values = {'True', 'true'}
false_values = {'False', 'false'}
@classmethod
def process_value(self, value, variable) -> bool:
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value in self.true_values:
return True
if value in self.false_values:
return False
try:
return bool(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" bool variable: {reason}".format(
value=value,
reason=str(error)))
class ListType(VariableType):
name = 'list'
@classmethod
def process_value(self, value, variable) -> list:
# TODO нормально все сделать.
return value
class ReadonlyType(VariableType):
# TODO Убрать его, добавить соответствующий флаг нодам переменных.
pass
class HashValue:
def __init__(self, key, value, master_variable):
self.key = key
self.value = value
self.master_variable = master_variable
@property
def subscriptions(self):
return self.master_variable.subscriptions
@property
def subscribers(self):
return self.master_variable.subscribers
def get_value(self):
self = self.master_variable.get_value()[self.key]
return self.value
class Hash:
name = 'hash'
def __init__(self, values: dict, master_variable):
self._values = dict()
for key, value in values.items():
self._values.update({key: HashValue(key, value, master_variable)})
self.name = None
self.master_variable = master_variable
def __getattr__(self, key: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if key in self._values:
return self._values[key].get_value()
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(key=key,
hash_name=self.name))
def __getitem__(self, key: str):
if key in self._values:
return self._values[key]
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(key=key,
hash_name=self.name))
def __contains__(self, key: str):
return key in self._values
class HashType(VariableType):
@classmethod
def process_value(self, value, variable) -> Hash:
print('IN PROCESS HASH VALUE')
if not isinstance(value, dict):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(value)))
return Hash(value, variable)
class TableType(VariableType):
pass
class DependenceSource:
current_namespace = None
datavars_root = None
def __init__(self, variables, depend=None):
self.error = None
self._subscriptions = variables
self.depend_function = depend
def check(self):
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._subscriptions) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._subscriptions) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self):
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
args_values = tuple(subscription.get_value() for subscription
in self._subscriptions)
print('args_values: {}'.format(args_values))
try:
if self.depend_function is None:
return args_values[-1]
return self.depend_function(*args_values)
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.fullname
for subscription
in self._subscriptions]),
str(error)))
@property
def subscriptions(self):
return self._subscriptions
def _check_function(self, function_to_check: FunctionType):
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
if not isinstance(function_to_check, LambdaType):
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
# значение.
for node in ast.walk(ast.parse(getsource(function_to_check))):
if isinstance(node, ast.Return):
break
else:
raise VariableError("the depend function does not return"
" anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
function_signature = signature(function_to_check)
print('subscribers: {}'.format(self._subscriptions))
if not len(self._subscriptions) == len(function_signature.parameters):
raise VariableError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(self._subscriptions)))
def __ne__(self, other):
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other):
if not isinstance(other, DependenceSource):
return False
if not self._compare_depend_functions(self.depend_function,
other.depend_function):
return False
for l_var, r_var in zip(self._subscriptions, other._subscriptions):
if l_var != r_var:
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name, namespace, variable_type=StringType, source=None):
self.name = name
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating = False
self.namespace = namespace
self.namespace.add_variable(self)
self.subscribers = set()
# Если значение переменной None -- значит она обнулена.
self.value = None
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source = source
if source is not None:
self.update_value()
self._readonly = False
def update_value(self):
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
print('updating {}'.format(self.fullname))
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.fullname))
if isinstance(self._source, DependenceSource):
with self._start_calculate():
try:
value = self._source.calculate_value()
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
else:
value = self._source
self.value = self.variable_type.process_value(value, self)
@property
def source(self):
return self._source
@source.setter
def source(self, source):
if self._readonly:
raise VariableError("can not change the variable '{}': read only")
if self._source != source:
self._invalidate()
self._source = source
if isinstance(source, DependenceSource):
for subscription in source.subscriptions:
subscription.subscribers.add(self)
@property
def readonly(self) -> bool:
return self._readonly
@readonly.setter
def readonly(self, value: bool):
if self.value is None and self._source is not None:
# TODO выводить предупреждение если переменная инвалидирована,
# нет источника и при этом устанавливается флаг readonly.
self.update_value()
self._readonly = value
def _invalidate(self):
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
print('{} is invalidated'.format(self.fullname))
if self.value is not None:
self.value = None
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self):
'''Метод для получения значения переменной.'''
if self.value is None:
self.update_value()
return self.value
@property
def fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.fullname, self.name)
else:
return self.name
def __getitem__(self, key):
if self.variable_type is HashType:
if key in self.get_value():
return self.value[key]
else:
raise VariableError("value '{}' is not found in hash variable"
" '{}'".format(key, self.fullname))
else:
raise VariableError("'{}' variable type is not subscriptable.".
format(self.variable_type.name))
def __repr__(self):
return '<Variable: {} with value: {}>'.format(self.fullname,
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name='', parent=None):
self.name = name
self.variables = dict()
self.namespaces = dict()
self.parent = parent
def add_variable(self, variable: VariableNode):
'''Метод для добавления переменной в пространство имен.'''
self.variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace):
'''Метод для добавления пространства имен в пространство имен.'''
self.namespaces.update({namespace.name: namespace})
namespace.parent = self
@property
def fullname(self) -> str:
if self.parent is not None:
return '{}.{}'.format(self.parent.fullname, self.name)
else:
return self.name
def __getattr__(self, name: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name].get_value()
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __getitem__(self, name: str):
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __contains__(self, name):
return name in self.namespaces or name in self.variables
def __repr__(self):
return '<Namespace: {}>'.format(self.fullname)
class DependenceAPI:
def __init__(self):
self.current_namespace = None
self.datavars_root = None
def __call__(self, *variables, depend=None):
subscriptions = list()
for variable in variables:
if isinstance(variable, str):
variable = self._find_variable(variable)
if variable is None:
raise DependenceError("variable '{}' not found".format(
variable))
elif not isinstance(variable, VariableNode):
raise DependenceError("dependence variables must be 'str' or"
" 'VariableNode' not '{}'".format(
type(variable)))
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _find_variable(self, variable_name):
'''Метод для поиска переменной в пространстве'''
name_parts = variable_name.split('.')
if not name_parts[0]:
namespace = self.current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace.parent
else:
name_parts = name_parts[index:]
break
else:
namespace = self.datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class VariableAPI:
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace = None
# TODO Продумать другой способ обработки ошибок.
self.errors = []
def __call__(self, name: str, source=None, type=StringType,
readonly=False):
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace.variables:
variable = VariableNode(name, self.current_namespace,
variable_type=type)
else:
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error), variable.fullname))
else:
variable.source = source
return variable
class NamespaceAPI:
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI(),
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.namespace_stack = [datavars_root]
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self):
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_root(self, datavars_root: NamespaceNode):
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars_root
self.namespace_stack = [datavars_root]
self._variables_fabric.current_namespace = self._datavars
def reset(self):
'''Метод для сброса корневого пространства имен.'''
self._datavars = NamespaceNode('<root>')
self.namespace_stack = [self._datavars]
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
@contextmanager
def __call__(self, namespace_name):
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.namespace_stack[-1].namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.namespace_stack[-1])
else:
namespace = self.namespace_stack[-1].namespaces[namespace_name]
self.namespace_stack[-1].add_namespace(namespace)
self.namespace_stack.append(namespace)
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
print('current_namespace in DependenceAPI: {}'.format(
namespace.fullname))
try:
yield self
finally:
current_namespace = self.namespace_stack.pop()
self._variables_fabric.current_namespace = current_namespace
self._dependence_fabric.current_namespace = current_namespace
Dependence = DependenceAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)