You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

948 lines
40 KiB

# vim: fileencoding=utf-8
#
import ast
import dis
from typing import List, Any
from contextlib import contextmanager
from inspect import signature, getsource
from types import FunctionType, LambdaType
from calculate.utils.tools import Singleton
class DependenceError(Exception):
pass
class VariableError(Exception):
pass
class VariableTypeError(VariableError):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic dependence in variables: {}".format(", ".join(
self.queue[:-1]))
class VariableType:
'''Базовый класс для типов.'''
name = 'undefined'
@classmethod
def process_value(cls, value, variable):
return value
@classmethod
def readonly(cls, variable_object) -> None:
variable_object.variable_type = cls
variable_object.readonly = True
class IniType(VariableType):
'''Класс, соответствующий типу переменных созданных в calculate.ini файлах.
'''
name = 'ini'
@classmethod
def process_value(cls, value, variable):
return value
class StringType(VariableType):
'''Класс, соответствующий типу переменных с строковым значением.'''
name = 'string'
@classmethod
def process_value(cls, value, variable) -> str:
if isinstance(value, str):
return value
else:
try:
return str(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" string variable: {reason}".format(
value=value,
reason=str(error)))
class IntegerType(VariableType):
'''Класс, соответствующий типу переменных с целочисленным значением.'''
name = 'integer'
@classmethod
def process_value(cls, value, variable) -> int:
if isinstance(value, int):
return value
else:
try:
return int(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" interger variable: {reason}".format(
value=value,
reason=str(error)))
class FloatType(VariableType):
'''Класс, соответствующий типу переменных с вещественным значением.'''
name = 'float'
@classmethod
def process_value(cls, value, variable) -> float:
if isinstance(value, float):
return value
else:
try:
return float(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" float variable: {reason}".format(
value=value,
reason=str(error)))
class BooleanType(VariableType):
'''Класс, соответствующий типу переменных с булевым значением.'''
name = 'bool'
true_values = {'True', 'true'}
false_values = {'False', 'false'}
@classmethod
def process_value(cls, value, variable) -> bool:
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value in cls.true_values:
return True
if value in cls.false_values:
return False
try:
return bool(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" bool variable: {reason}".format(
value=value,
reason=str(error)))
class ListType(VariableType):
name = 'list'
@classmethod
def process_value(cls, value, variable) -> list:
# TODO нормально все сделать.
if isinstance(value, list):
return value
elif isinstance(value, str):
output_list = list()
values = value.split(',')
for value in values:
output_list.append(value.strip())
return output_list
try:
return list(value)
except Exception as error:
raise VariableTypeError("can not set value '{value}' to"
" list variable: {reason}".format(
value=value,
reason=str(error)))
class HashValue:
'''Класс значения хэша, передающий некоторые характеристики переменной
хозяина, что позволяет инвалидировать подписки переменной хозяина при любом
изменении хэша или инвалидировать весь хэш при изменении одной из
зависимостей.'''
def __init__(self, key: str, value, master_variable, parent):
self.key = key
self.value = value
self.master_variable = master_variable
self.parent = parent
@property
def subscriptions(self):
return self.master_variable.subscriptions
@property
def subscribers(self):
return self.master_variable.subscribers
def get_value(self) -> str:
'''Метод для получения значения хэша. Перед возвращением значения
обновляет себя на наиболее актуальную версию значения хэша.'''
self = self.master_variable.get_value()[self.key]
return self.value
class Hash:
'''Класс реализующий контейнер для хранения хэша в переменной
соответствующего типа.'''
def __init__(self, values: dict, master_variable, parent=None):
self.fixed = master_variable.fixed
self._values = dict()
self._fields = set()
for key, value in values.items():
self._values.update({key: HashValue(key, value, master_variable,
self)})
if self.fixed:
self._fields.add(key)
self.master_variable = master_variable
self.row_index = None
def get_hash(self) -> dict:
'''Метод для получения словаря из значений хэша.'''
dict_value = {}
for key in self._values.keys():
dict_value.update({key: self._values[key].get_value()})
return dict_value
def update_hash(self, values: dict) -> None:
'''Метод для обновления значения хэша.'''
for key, value in values.items():
if key in self._values:
self._values[key].value = value
elif self.fixed:
raise VariableError("key '{}' is unavailable for fixed"
" hash, available keys: '{}'.".
format(key, ', '.join(self._fields)))
else:
self._values[key] = HashValue(key, value, self.master_variable,
self)
return self
def __getattr__(self, key: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if key in self._values:
return self._values[key].get_value()
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __getitem__(self, key: str) -> HashValue:
if key in self._values:
return self._values[key]
else:
raise VariableError(("'{key}' is not found in the hash"
" '{hash_name}'").format(
key=key,
hash_name=self.master_variable.get_fullname()))
def __iter__(self):
for key in self._values.keys():
yield key
def __contains__(self, key: str) -> bool:
return key in self._values
class HashType(VariableType):
'''Класс, соответствующий типу переменных хэшей.'''
name = 'hash'
@classmethod
def process_value(cls, values, variable) -> Hash:
if not isinstance(values, dict):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(values)))
if variable.value is not None:
updated_hash = variable.value.update_hash(values)
else:
updated_hash = Hash(values, variable)
return updated_hash
@classmethod
def fixed(cls, variable_object) -> None:
variable_object.variable_type = cls
variable_object.fixed = True
class Table:
'''Класс, соответствующий типу переменных таблиц.'''
def __init__(self, values: List[dict], master_variable, fields=None):
self._rows = list()
self.master_variable = master_variable
self.columns = set()
if fields is not None:
self.columns.update(self.fields)
else:
self.columns = set(values[0].keys())
for row in values:
if isinstance(row, dict):
self._check_columns(row)
self._rows.append(row)
else:
raise VariableTypeError("can not create table using value '{}'"
" with type '{}'".format(row,
type(row)))
def get_table(self) -> List[dict]:
return self._rows
def add_row(self, row: dict):
self._check_columns(row)
self._rows.append(row)
def change_row(self, row: dict, index: int) -> None:
self._check_columns(row)
self._rows[index] = row
def clear(self) -> None:
self._rows.clear()
def _check_columns(self, row: dict) -> None:
'''Метод для проверки наличия в хэше только тех полей, которые
соответствуют заданным для таблицы колонкам.'''
for column in row:
if column not in self.columns:
raise VariableError("unknown column value '{}'"
" available: '{}'".format(
column,
', '.join(self.columns)))
def __getitem__(self, index: int) -> Hash:
if isinstance(index, int):
if index < len(self._rows):
return self._rows[index]
else:
raise VariableError("'{index}' index value is out of range"
.format(index=index))
else:
raise VariableError("Table value is not subscriptable")
def __iter__(self):
for row in self._rows:
yield row
def __contains__(self, key: str) -> bool:
if isinstance(key, str):
return key in self._values
return False
def __len__(self):
return len(self._rows)
class TableType(VariableType):
name = 'table'
@classmethod
def process_value(self, value: List[dict], variable) -> Table:
if not isinstance(value, list) and not isinstance(value, Table):
raise VariableTypeError("can not set value with type '{_type}' to"
" hash variable: value must be 'dict' type"
.format(_type=type(value)))
else:
if isinstance(value, Table):
return value
return Table(value, variable)
class VariableWrapper:
'''Класс обертки для переменных, с помощью которого отслеживается
применение переменной в образовании значения переменной от нее зависящей.
'''
def __init__(self, variable, subscriptions):
self._variable = variable
self._subscriptions = subscriptions
@property
def value(self):
'''Метод возвращающий значение переменной и при этом добавляющий его в
подписки.'''
self._subscriptions.add(self._variable)
value = self._variable.get_value()
if isinstance(value, Hash):
value = value.get_hash()
elif isinstance(value, Table):
value = value.get_table()
return value
@property
def subscriptions(self):
return self._subscriptions
@subscriptions.setter
def subscriptions(self, subscriptions):
self._subscriptions = subscriptions
class DependenceSource:
'''Класс зависимости как источника значения переменной.'''
def __init__(self, variables: tuple, depend=None):
self.error = None
self._args = variables
self.depend_function = depend
self._subscriptions = set()
self._args_founded = False
def check(self) -> None:
'''Метод для запуска проверки корректности функции зависимости, а также
сопоставления ее числу заданных зависимостей.'''
if self.depend_function is None:
if len(self._args) > 1:
raise DependenceError('the depend function is needed if the'
' number of dependencies is more than'
' one')
elif len(self._args) == 0:
raise DependenceError('dependence is set without variables')
else:
self._check_function(self.depend_function)
def calculate_value(self) -> Any:
'''Метод для расчета значения переменной с использованием зависимостей
и заданной для них функции.'''
self._subscriptions = set()
args = tuple(VariableWrapper(arg, self._subscriptions)
for arg in self._args)
try:
if self.depend_function is None:
return args[-1].value
return self.depend_function(*args)
except CyclicVariableError:
raise
except Exception as error:
raise DependenceError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
def _get_args(self, namespace):
'''Метод для преобразования списка аргументов функции зависимости,
содержащего переменные и строки, в список аргументов состоящий только
из нод переменных и значений хэшей.'''
if not self._args_founded:
for index in range(0, len(self._args)):
if isinstance(self._args[index], str):
variable = Dependence._find_variable(
self._args[index],
current_namespace=namespace)
if variable is None:
raise DependenceError("variable '{}' not found".
format(variable))
self._args[index] = variable
self._args_founded = True
@property
def subscriptions(self) -> set:
return self._subscriptions
def _check_function(self, function_to_check: FunctionType) -> None:
'''Метод для проверки того, возращает ли функция какое-либо значение, а
также того, совпадает ли число подписок с числом аргументов этой
функции.'''
if not isinstance(function_to_check, LambdaType):
# Если функция не лямбда, проверяем есть ли у нее возвращаемое
# значение.
for node in ast.walk(ast.parse(getsource(function_to_check))):
if isinstance(node, ast.Return):
break
else:
raise VariableError("the depend function does not return"
" anything in variable")
# Проверяем совпадение количества аргументов функции и заданных для
# функции переменных.
function_signature = signature(function_to_check)
if not len(self._args) == len(function_signature.parameters):
raise VariableError("the depend function takes {} arguments,"
" while {} is given".format(
len(function_signature.parameters),
len(self._args)))
def __ne__(self, other) -> bool:
if not isinstance(other, DependenceSource):
return True
return not self == other
def __eq__(self, other) -> bool:
if not isinstance(other, DependenceSource):
return False
# Сначала сравниваем аргументы.
for l_var, r_var in zip(self._args, other._args):
if l_var != r_var:
return False
if not self._compare_depend_functions(self.depend_function,
other.depend_function):
return False
return True
def _compare_depend_functions(self, l_func: FunctionType,
r_func: FunctionType) -> bool:
'''Метод для сравнения функций путем сравнения инструкций, полученных
в результате их дизассемблирования.'''
l_instructions = list(dis.get_instructions(l_func))
r_instructions = list(dis.get_instructions(r_func))
for l_instr, r_instr in zip(l_instructions, r_instructions):
if l_instr.opname != r_instr.opname:
return False
if l_instr.arg != r_instr.arg:
return False
if ((l_instr.argval != l_instr.argrepr) and
(r_instr.argval != r_instr.argrepr)):
if r_instr.argval != l_instr.argval:
return False
return True
class VariableNode:
'''Класс ноды соответствующей переменной в дереве переменных.'''
def __init__(self, name, namespace, variable_type=VariableType,
source=None):
self.name = name
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableTypeError('variable_type must be VariableType'
', but not {}'.format(type(variable_type)))
self.calculating = False
self.namespace = namespace
self.namespace.add_variable(self)
self.subscribers = set()
# Список текущих подписок, для проверки их актуальности при
# динамическом связывании.
self._subscriptions = set()
self.value = None
self._invalidated = True
# Источник значения переменной, может быть значением, а может быть
# зависимостью.
self._source = source
if source is not None:
self.update_value()
# Флаг, указывающий, что значение было изменено в процессе работы
# утилит или с помощью тега set из шаблона.
self.set_by_user = False
self._readonly = False
# Флаг имеющий значение только для переменных типа HashType.
# Предназначен для включения проверки соответствия полей хэша при
# установке значения.
self._fixed = False
def update_value(self) -> None:
'''Метод для обновления значения переменной с помощью указанного
источника ее значения.'''
if self.calculating:
raise CyclicVariableError(self.name)
if self._source is None:
raise VariableError("No sources to update variable '{}'".
format(self.get_fullname()))
if isinstance(self._source, DependenceSource):
self._source._get_args(self.namespace)
with self._start_calculate():
try:
value = self._source.calculate_value()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in self._source.subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in self._source.subscriptions:
subscription.subscribers.add(self)
self._subscriptions = self._source.subscriptions
except CyclicVariableError as error:
raise CyclicVariableError(self.name, *error.queue)
except DependenceError as error:
raise VariableError('{}: {}'.format(self.get_fullname(),
str(error)))
else:
value = self._source
self.value = self.variable_type.process_value(value, self)
self._invalidated = False
def set_variable_type(self, variable_type, readonly=None, fixed=None):
'''Метод для установки типа переменной.'''
if readonly is not None and isinstance(readonly, bool):
self._readonly = readonly
elif fixed is not None and isinstance(fixed, bool):
self._fixed = fixed
if self.variable_type is VariableType:
if isinstance(variable_type, type):
if issubclass(variable_type, VariableType):
self.variable_type = variable_type
else:
raise VariableError('variable type object must be'
' VariableType or its class method,'
' not {}'.format(type(variable_type)))
elif callable(variable_type):
variable_type(self)
def set(self, value):
'''Метод для установки временного пользовательского значения
переменной.'''
# Сбрасываем флаги, чтобы провести инвалидацию.
self._invalidated = False
self.set_by_user = False
self._invalidate()
self.set_by_user = True
self.value = self.variable_type.process_value(value, self)
def reset(self):
'''Метод для сброса пользовательского значения.'''
if self.set_by_user:
self._invalidated = False
self.set_by_user = False
self._invalidate()
@property
def source(self):
return self._source
@source.setter
def source(self, source) -> None:
if self._readonly:
raise VariableError("can not change the variable '{}': read only".
format(self.get_fullname()))
# Если источники не совпадают или текущее значение переменной было
# установлено пользователем, то инвалидируем переменную и меняем
# источник.
if self._source != source or self.set_by_user:
self.set_by_user = False
self._invalidate()
self._source = source
@property
def readonly(self) -> bool:
return self._readonly
@readonly.setter
def readonly(self, value: bool) -> None:
# if self.value is None and self._source is not None:
# # TODO выводить предупреждение если переменная инвалидирована,
# # нет источника и при этом устанавливается флаг readonly.
# self.update_value()
self._readonly = value
@property
def fixed(self) -> bool:
return self._fixed
@fixed.setter
def fixed(self, value) -> bool:
self._fixed = value
def _invalidate(self) -> None:
'''Метод для инвалидации данной переменной и всех зависящих от нее
переменных.'''
# print('{} is invalidated'.format(self.get_fullname()))
if not self._invalidated and not self.set_by_user:
if self.variable_type is not HashType:
self.value = None
self._invalidated = True
for subscriber in self.subscribers:
subscriber._invalidate()
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self) -> Any:
'''Метод для получения значения переменной.'''
if self._invalidated and not self.set_by_user:
self.update_value()
return self.value
def get_fullname(self) -> str:
if self.namespace:
return "{}.{}".format(self.namespace.get_fullname(), self.name)
else:
return self.name
def __getitem__(self, key: str) -> HashValue:
if self.variable_type is HashType:
if key in self.get_value():
return self.value[key]
else:
raise VariableNotFoundError("value '{}' is not found in hash"
" variable '{}'".format(
key, self.get_fullname()))
else:
raise VariableError("'{}' variable type is not subscriptable.".
format(self.variable_type.name))
def __repr__(self):
return '<Variable: {} with value: {}>'.format(self.get_fullname(),
self.value or
'INVALIDATED')
class NamespaceNode:
'''Класс ноды соответствующей пространству имен в дереве переменных.'''
def __init__(self, name='', parent=None):
self.name = name
self.variables = dict()
self.namespaces = dict()
self.parent = parent
def add_variable(self, variable: VariableNode) -> None:
'''Метод для добавления переменной в пространство имен.'''
if variable.name in self.namespaces:
raise VariableError("namespace with the name '{}' is already in"
" the namespace '{}'".format(
variable.name,
self.get_fullname()))
self.variables.update({variable.name: variable})
variable.namespace = self
def add_namespace(self, namespace) -> None:
'''Метод для добавления пространства имен в пространство имен.'''
if namespace.name in self.variables:
raise VariableError("variable with the name '{}' is already in"
" the namespace '{}'".format(
namespace.name,
self.get_fullname()))
self.namespaces.update({namespace.name: namespace})
namespace.parent = self
def clear(self):
'''Метод для очистки пространства имен. Очищает и пространства имен
и переменные. Предназначен только для использования в calculate.ini.'''
for namespace_name in self.namespaces.keys():
self.namespaces[namespace_name].clear()
self.variables.clear()
self.namespaces.clear()
def get_fullname(self) -> str:
'''Метод для получения полного имени пространства имен.'''
if self.parent is not None and self.parent.name != '<root>':
return '{}.{}'.format(self.parent.get_fullname(), self.name)
else:
return self.name
def __getattr__(self, name: str):
'''Метод возвращает ноду пространства имен или значение переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
variable = self.variables[name]
if variable.variable_type is TableType:
return variable.get_value().get_table()
return variable.get_value()
else:
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __getitem__(self, name: str) -> None:
'''Метод возвращает ноду пространства имен или ноду переменной.'''
if name in self.namespaces:
return self.namespaces[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableNotFoundError("'{variable_name}' is not found in the"
" namespace '{namespace_name}'".format(
variable_name=name,
namespace_name=self.name))
def __contains__(self, name):
return name in self.namespaces or name in self.variables
def __repr__(self):
return '<Namespace: {}>'.format(self.get_fullname())
class DependenceAPI(metaclass=Singleton):
'''Класс образующий интерфейс для создания зависимостей.'''
def __init__(self):
self.current_namespace = None
self.datavars_root = None
def __call__(self, *variables, depend=None):
subscriptions = list()
for variable in variables:
# Поиск переменных теперь происходит при обновлении значения
# переменной.
# if isinstance(variable, str):
# variable = self._find_variable(variable)
# if variable is None:
# raise DependenceError("variable '{}' not found".format(
# variable))
# elif not isinstance(variable, VariableNode):
# raise DependenceError("dependence variables must be 'str' or"
# " 'VariableNode' not '{}'".format(
# type(variable)))
if not (isinstance(variable, str) or
isinstance(variable, VariableNode)):
raise DependenceError("dependence variables must be 'str' or"
" 'VariableNode' not '{}'".format(
type(variable)))
subscriptions.append(variable)
return DependenceSource(subscriptions, depend=depend)
def _find_variable(self, variable_name, current_namespace=None):
'''Метод для поиска переменной в пространствах имен.'''
if current_namespace is None:
current_namespace = self.current_namespace
name_parts = variable_name.split('.')
if not name_parts[0]:
namespace = current_namespace
for index in range(1, len(name_parts)):
if not name_parts[index]:
namespace = namespace.parent
else:
name_parts = name_parts[index:]
break
else:
namespace = self.datavars_root
search_result = namespace
for part in name_parts:
search_result = search_result[part]
return search_result
class VariableAPI(metaclass=Singleton):
'''Класс для создания переменных при задании их через
python-скрипты.'''
def __init__(self):
self.current_namespace = None
# TODO Продумать другой способ обработки ошибок.
self.errors = []
def __call__(self, name: str, source=None, type=VariableType,
readonly=False, fixed=False, force=False):
'''Метод для создания переменных внутри with Namespace('name').'''
if name not in self.current_namespace.variables:
variable = VariableNode(name, self.current_namespace)
else:
variable = self.current_namespace[name]
if isinstance(source, DependenceSource):
try:
source.check()
variable.source = source
except DependenceError as error:
raise VariableError('Dependence error: {} in variable: {}'.
format(str(error),
variable.get_fullname()))
else:
variable.source = source
if readonly:
variable.set_variable_type(type, readonly=True)
elif fixed:
variable.set_variable_type(type, fixed=True)
else:
variable.set_variable_type(type)
return variable
class NamespaceAPI(metaclass=Singleton):
'''Класс для создания пространств имен при задании переменных через
python-скрипты.'''
def __init__(self, var_fabric: VariableAPI,
dependence_fabric: DependenceAPI(),
datavars_root=NamespaceNode('<root>')):
self._datavars = datavars_root
self.current_namespace = self._datavars
# Привязываем фабрику переменных.
self._variables_fabric = var_fabric
self._variables_fabric.current_namespace = self._datavars
# Привязываем фабрику зависимостей.
self._dependence_fabric = dependence_fabric
self._dependence_fabric.current_namespace = self._datavars
self._dependence_fabric.datavars_root = self._datavars
@property
def datavars(self):
'''Метод для получения корневого пространства имен, через которое далее
можно получить доступ к переменным.'''
return self._datavars
def set_datavars(self, datavars):
'''Метод для установки корневого пространства имен, которое пока что
будет использоваться для предоставления доступа к переменным.'''
self._datavars = datavars
self._variables_fabric.current_namespace = self._datavars
def reset(self):
'''Метод для сброса корневого пространства имен.'''
if isinstance(self._datavars, NamespaceNode):
self._datavars = NamespaceNode('<root>')
else:
self._datavars.reset()
self.current_namespace = self._datavars
self._variables_fabric.current_namespace = self._datavars
self._dependence_fabric.current_namespace = self._datavars
def set_current_namespace(self, namespace: NamespaceNode):
'''Метод для установки текущего пространства имен, в которое будут
добавляться далее переменные и пространства имен.'''
self.current_namespace = namespace
self._variables_fabric.current_namespace = namespace
self._dependence_fabric.current_namespace = namespace
@contextmanager
def __call__(self, namespace_name):
'''Метод для создания пространств имен с помощью with.'''
if namespace_name not in self.current_namespace.namespaces:
namespace = NamespaceNode(namespace_name,
parent=self.current_namespace)
else:
namespace = self.current_namespace.namespaces[namespace_name]
self.current_namespace.add_namespace(namespace)
self.current_namespace = namespace
# Устанавливаем текущее пространство имен фабрике переменных.
self._variables_fabric.current_namespace = self.current_namespace
# Устанавливаем текущее пространство имен фабрике зависимостей.
self._dependence_fabric.current_namespace = namespace
self._dependence_fabric.datavars_root = self._datavars
try:
yield self
finally:
self.current_namespace = self.current_namespace.parent
self._variables_fabric.current_namespace = self.current_namespace
self._dependence_fabric.current_namespace = self.current_namespace
Dependence = DependenceAPI()
Variable = VariableAPI()
Namespace = NamespaceAPI(Variable, Dependence)