You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

921 lines
38 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import ast
import dis
from typing import List, Any
from contextlib import contextmanager
from inspect import signature, getsource
from types import FunctionType, LambdaType
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
name = 'undefined'
@classmethod
def process_value(cls, value, variable):
return value
@classmethod
def readonly(cls, variable_object) -> None:
variable_object.variable_type = cls
variable_object.readonly = True
class IniType(VariableType):
'''Класс, соответствующий типу переменных созданных в calculate.ini файлах.
'''
name = 'ini'
@classmethod
def process_value(cls, value, variable):
return value
class StringType(VariableType):
'''Класс, соответствующий типу переменных с строковым значением.'''
name = 'string'
@classmethod
def process_value(cls, value, variable) -> str:
if isinstance(value, str):
return value
else:
try:
return str(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" string variable: {reason}".format(
value=value,
reason=str(error)))
class IntegerType(VariableType):
'''Класс, соответствующий типу переменных с целочисленным значением.'''
name = 'integer'
@classmethod
def process_value(cls, value, variable) -> int:
if isinstance(value, int):
return value
else:
try:
return int(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" interger variable: {reason}".format(
value=value,
reason=str(error)))
class FloatType(VariableType):
'''Класс, соответствующий типу переменных с вещественным значением.'''
name = 'float'
@classmethod
def process_value(cls, value, variable) -> float:
if isinstance(value, float):
return value
else:
try:
return float(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" float variable: {reason}".format(
value=value,
reason=str(error)))
class BooleanType(VariableType):
'''Класс, соответствующий типу переменных с булевым значением.'''
name = 'bool'
true_values = {'True', 'true'}
false_values = {'False', 'false'}
@classmethod
def process_value(cls, value, variable) -> bool:
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value in cls.true_values:
return True
if value in cls.false_values:
return False
try:
return bool(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" bool variable: {reason}".format(
value=value,
reason=str(error)))
class ListType(VariableType):
name = 'list'
@classmethod
def process_value(cls, value, variable) -> list:
# TODO нормально все сделать.
if isinstance(value, list):
return value
elif isinstance(value, str):
output_list = list()
values = value.split(',')
for value in values:
output_list.append(value.strip())
return output_list
try:
return list(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" list variable: {reason}".format(
value=value,
reason=str(error)))
class HashValue:
'''Класс значения хэша, передающий некоторые характеристики переменной
хозяина, что позволяет инвалидировать подписки переменной хозяина при любом
изменении хэша или инвалидировать весь хэш при изменении одной из
зависимостей.'''
def __init__(self, key: str, value, master_variable, parent):
self.key = key
self.value = value
self.master_variable = master_variable
self.parent = parent
@property
def subscriptions(self):
return self.master_variable.subscriptions
@property
def subscribers(self):
return self.master_variable.subscribers
def get_value(self) -> str:
'''Метод для получения значения хэша. Перед возвращением значения
обновляет себя на наиболее актуальную версию значения хэша.'''
self = self.master_variable.get_value()[self.key]
return self.value
class Hash:
'''Класс реализующий контейнер для хранения хэша в переменной
соответствующего типа.'''
def __init__(self, values: dict, master_variable, parent=None):
self.fixed = master_variable.fixed
self._values = dict()
for key, value in values.items():
self._values.update({key: HashValue(key, value, master_variable,
self)})
self.master_variable = master_variable
self.row_index = None
def get_hash(self) -> dict:
'''Метод для получения словаря из значений хэша.'''
dict_value = {}
for key in self._values.keys():
dict_value.update({key: self._values[key].get_value()})
return dict_value
def update_hash(self, values: dict) -> None:
'''Метод для обновления значения хэша.'''
print('UPDATE HASH')
for key, value in values.items():
if key in self._values:
self._values[key].value = value
elif self.fixed:
raise VariableError("key '{}' is unavailable for fixed"
" hash, available keys: '{}'".
format(key, ', '.join(self._fields)))
else:
self._values[key] = HashValue(key, value, self.master_variable,
self)
return self
def __getattr__(self, key: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if key in self._values:
return self._values[key].get_value()
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __getitem__(self, key: str) -> HashValue:
if key in self._values:
return self._values[key]
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __iter__(self):
for key in self._values.keys():
yield key
def __contains__(self, key: str) -> bool:
return key in self._values
class HashType(VariableType):
'''Класс, соответствующий типу переменных хэшей.'''
name = 'hash'
@classmethod
def process_value(cls, values, variable) -> Hash:
if not isinstance(values, dict):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(values)))
if variable.value is not None:
updated_hash = variable.value.update_hash(values)
else:
updated_hash = Hash(values, variable)
return updated_hash
@classmethod
def fixed(cls, variable_object) -> None:
variable_object.variable_type = cls
variable_object.fixed = True
class Table:
'''Класс, соответствующий типу переменных таблиц.'''
def __init__(self, values: List[dict], master_variable, fields=None):
self._rows = list()
self.master_variable = master_variable
self.columns = set()
if fields is not None:
self.columns.update(self.fields)
else:
self.columns = set(values[0].keys())
for row in values:
if isinstance(row, dict):
self._check_columns(row)
self._rows.append(row)
else:
raise VariableTypeError("can not create table using value '{}'"
" with type '{}'".format(row,
type(row)))
def get_table(self) -> List[dict]:
return self._rows
def add_row(self, row: dict):
self._check_columns(row)
self._rows.append(row)
def change_row(self, row: dict, index: int) -> None:
self._check_columns(row)
self._rows[index] = row
def clear(self) -> None:
self._rows.clear()
def _check_columns(self, row: dict) -> None:
'''Метод для проверки наличия в хэше только тех полей, которые
соответствуют заданным для таблицы колонкам.'''
for column in row:
if column not in self.columns:
raise VariableError("unknown column value '{}'"
" available: '{}'".format(
column,
', '.join(self.columns)))
def __getitem__(self, index: int) -> Hash:
if isinstance(index, int):
if index < len(self._rows):
return self._rows[index]
else:
raise VariableError("'{index}' index value is out of range"
.format(index=index))
else:
raise VariableError("Table value is not subscriptable")
def __iter__(self):
for row in self._rows:
yield row
def __contains__(self, key: str) -> bool:
if isinstance(key, str):
return key in self._values
return False
def __len__(self):
return len(self._rows)
class TableType(VariableType):
name = 'table'
@classmethod
def process_value(self, value: List[dict], variable) -> Table:
print('PROCESS TABLE')
if not isinstance(value, list) and not isinstance(value, Table):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(value)))
else:
if isinstance(value, Table):
return value
return Table(value, variable)
class VariableWrapper:
'''Класс обертки для переменных, с помощью которого отслеживается
применение переменной в образовании значения переменной от нее зависящей.
'''
def __init__(self, variable, subscriptions):
self._variable = variable
self._subscriptions = subscriptions
@property
def value(self):
'''Метод возвращающий значение переменной и при этом добавляющий его в
подписки.'''
self._subscriptions.add(self._variable)
value = self._variable.get_value()
if isinstance(value, Hash):
value = value.get_hash()
elif isinstance(value, Table):
value = value.get_table()
return value
@property
def subscriptions(self):
return self._subscriptions
@subscriptions.setter
def subscriptions(self, subscriptions):
self._subscriptions = subscriptions
class DependenceSource:
'''Класс зависимости как источника значения переменной.'''
def __init__(self, variables: tuple, depend=None):
self.error = None
self._args = variables
self.depend_function = depend
self._subscriptions = set()
self._args_founded = False
def check(self) -> None:
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._args) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._args) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self) -> Any:
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
self._subscriptions = set()
args = tuple(VariableWrapper(arg, self._subscriptions)
for arg in self._args)
try:
if self.depend_function is None:
return args[-1].value
return self.depend_function(*args)
except CyclicVariableError:
raise
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
def get_args(self, namespace):
if not self._args_founded:
for index in range(0, len(self._args)):
if isinstance(self._args[index], str):
variable = Dependence._find_variable(
self._args[index],
current_namespace=namespace)
if variable is None:
raise DependenceError("variable '{}' not found".
format(variable))
self._args[index] = variable
self._args_founded = True
@property
def subscriptions(self) -> set:
return self._subscriptions
def _check_function(self, function_to_check: FunctionType) -> None:
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
if not isinstance(function_to_check, LambdaType):
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
# значение.
for node in ast.walk(ast.parse(getsource(function_to_check))):
if isinstance(node, ast.Return):
break
else:
raise VariableError("the depend function does not return"
" anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
function_signature = signature(function_to_check)
if not len(self._args) == len(function_signature.parameters):
raise VariableError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(self._args)))
def __ne__(self, other) -> bool:
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other) -> bool:
if not isinstance(other, DependenceSource):
return False
# Сначала сравниваем аргументы.
for l_var, r_var in zip(self._args, other._args):
if l_var != r_var:
return False
if not self._compare_depend_functions(self.depend_function,
other.depend_function):
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name, namespace, variable_type=VariableType,
source=None):
self.name = name
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating = False
self.namespace = namespace
self.namespace.add_variable(self)
self.subscribers = set()
# Список текущих подписок, для проверки их актуальности при
# динамическом связывании.
self._subscriptions = set()
self.value = None
self._invalidated = True
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source = source
if source is not None:
self.update_value()
# Флаг, указывающий, что значение было изменено в процессе работы
# утилит или с помощью тега set из шаблона.
self.set_by_user = False
self._readonly = False
# Флаг имеющий значение только для переменных типа HashType.
# Предназначен для включения проверки соответствия полей хэша при
# установке значения.
self._fixed = False
def update_value(self) -> None:
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
print('updating {}'.format(self.get_fullname()))
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.get_fullname()))
if isinstance(self._source, DependenceSource):
print('NAMESPACE: {}'.format(self.namespace))
self._source.get_args(self.namespace)
with self._start_calculate():
try:
value = self._source.calculate_value()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in self._source.subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in self._source.subscriptions:
subscription.subscribers.add(self)
self._subscriptions = self._source.subscriptions
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
except DependenceError as error:
raise VariableError('{}: {}'.format(self.get_fullname(),
str(error)))
else:
value = self._source
self.value = self.variable_type.process_value(value, self)
self._invalidated = False
def set_variable_type(self, variable_type, readonly=None, fixed=None):
'''Метод для установки типа переменной.'''
if readonly is not None and isinstance(readonly, bool):
self._readonly = readonly
elif fixed is not None and isinstance(fixed, bool):
self._fixed = fixed
if self.variable_type is VariableType:
if isinstance(variable_type, type):
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableError('variable type object must be'
' VariableType or its class method,'
' not {}'.format(type(variable_type)))
elif callable(variable_type):
variable_type(self)
def set(self, value):
self._invalidate()
self.set_by_user = True
self.value = self.variable_type.process_value(value, self)
@property
def source(self):
return self._source
@source.setter
def source(self, source) -> None:
if self._readonly:
raise VariableError("can not change the variable '{}': read only".
format(self.get_fullname()))
# Если источники не совпадают или текущее значение переменной было
# установлено пользователем, то инвалидируем переменную и меняем
# источник.
if self._source != source or self.set_by_user:
self.set_by_user = False
self._invalidate()
self._source = source
@property
def readonly(self) -> bool:
return self._readonly
@readonly.setter
def readonly(self, value: bool) -> None:
# if self.value is None and self._source is not None:
# # TODO выводить предупреждение если переменная инвалидирована,
# # нет источника и при этом устанавливается флаг readonly.
# self.update_value()
self._readonly = value
@property
def fixed(self) -> bool:
return self._fixed
@fixed.setter
def fixed(self, value) -> bool:
self._fixed = value
def _invalidate(self) -> None:
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
# print('{} is invalidated'.format(self.get_fullname()))
if not self._invalidated and not self.set_by_user:
if self.variable_type is not HashType:
self.value = None
self._invalidated = True
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self) -> Any:
'''Метод для получения значения переменной.'''
if self._invalidated and not self.set_by_user:
self.update_value()
return self.value
def get_fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.get_fullname(), self.name)
else:
return self.name
def __getitem__(self, key: str) -> HashValue:
if self.variable_type is HashType:
if key in self.get_value():
return self.value[key]
else:
raise VariableError("value '{}' is not found in hash variable"
" '{}'".format(key, self.get_fullname()))
else:
raise VariableError("'{}' variable type is not subscriptable.".
format(self.variable_type.name))
def __repr__(self):
return '<Variable: {} with value: {}>'.format(self.get_fullname(),
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name='', parent=None):
self.name = name
self.variables = dict()
self.namespaces = dict()
self.parent = parent
def add_variable(self, variable: VariableNode) -> None:
'''Метод для добавления переменной в пространство имен.'''
self.variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace) -> None:
'''Метод для добавления пространства имен в пространство имен.'''
self.namespaces.update({namespace.name: namespace})
namespace.parent = self
def clear(self):
'''Метод для очистки пространства имен. Очищает и пространства имен
и переменные. Предназначен только для использования в calculate.ini.'''
for namespace_name in self.namespaces.keys():
self.namespaces[namespace_name].clear()
self.variables.clear()
self.namespaces.clear()
def get_fullname(self) -> str:
'''Метод для получения полного имени пространства имен.'''
if self.parent is not None:
return '{}.{}'.format(self.parent.get_fullname(), self.name)
else:
return self.name
def __getattr__(self, name: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
variable = self.variables[name]
if variable.variable_type is TableType:
return variable.get_value().get_table()
return variable.get_value()
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __getitem__(self, name: str) -> None:
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __contains__(self, name):
return name in self.namespaces or name in self.variables
def __repr__(self):
return '<Namespace: {}>'.format(self.get_fullname())
class DependenceAPI:
'''Класс образующий интерфейс для создания зависимостей.'''
def __init__(self):
self.current_namespace = None
self.datavars_root = None
def __call__(self, *variables, depend=None):
subscriptions = list()
for variable in variables:
# Поиск переменных теперь происходит при обновлении значения
# переменной.
# if isinstance(variable, str):
# variable = self._find_variable(variable)
# if variable is None:
# raise DependenceError("variable '{}' not found".format(
# variable))
# elif not isinstance(variable, VariableNode):
# raise DependenceError("dependence variables must be 'str' or"
# " 'VariableNode' not '{}'".format(
# type(variable)))
if not (isinstance(variable, str) or
isinstance(variable, VariableNode)):
raise DependenceError("dependence variables must be 'str' or"
" 'VariableNode' not '{}'".format(
type(variable)))
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _find_variable(self, variable_name, current_namespace=None):
'''Метод для поиска переменной в пространствах имен.'''
if current_namespace is None:
current_namespace = self.current_namespace
name_parts = variable_name.split('.')
if not name_parts[0]:
namespace = current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace.parent
else:
name_parts = name_parts[index:]
break
else:
namespace = self.datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class VariableAPI:
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace = None
# TODO Продумать другой способ обработки ошибок.
self.errors = []
def __call__(self, name: str, source=None, type=VariableType,
readonly=False, fixed=False, force=False):
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace.variables:
print('CREATE VARIABLE: {}'.format('{}.{}'.format(
self.current_namespace.get_fullname(),
name)))
variable = VariableNode(name, self.current_namespace)
else:
print('MODIFY VARIABLE: {}'.format(name))
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error),
variable.get_fullname()))
else:
variable.source = source
if readonly:
variable.set_variable_type(type, readonly=True)
elif fixed:
variable.set_variable_type(type, fixed=True)
else:
variable.set_variable_type(type)
return variable
class NamespaceAPI:
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI(),
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.current_namespace = self._datavars
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self):
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_datavars(self, datavars):
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars
self._variables_fabric.current_namespace = self._datavars
def reset(self):
'''Метод для сброса корневого пространства имен.'''
if isinstance(self._datavars, NamespaceNode):
self._datavars = NamespaceNode('<root>')
else:
self._datavars.reset()
self.current_namespace = self._datavars
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
def set_current_namespace(self, namespace: NamespaceNode):
self.current_namespace = namespace
self._variables_fabric.current_namespace = namespace
self._dependence_fabric.current_namespace = namespace
@contextmanager
def __call__(self, namespace_name):
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.current_namespace.namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.current_namespace)
else:
namespace = self.current_namespace.namespaces[namespace_name]
self.current_namespace.add_namespace(namespace)
self.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = self.current_namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
try:
yield self
finally:
self.current_namespace = self.current_namespace.parent
self._variables_fabric.current_namespace = self.current_namespace
self._dependence_fabric.current_namespace = self.current_namespace
Dependence = DependenceAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)