You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# import site
# import os
import re
# import sys
import types
# import functools
from contextlib import contextmanager
_ = lambda x: x
class BaseClass:
BASE_CLASS = "BaseClass"
@classmethod
def is_implementation(cls, check_class):
'''Метод для проверки того, что класс является производным базового
класса, а не самим базовым. Используется в автозагрузке переменных из
модулей.'''
if isinstance(check_class, type) and issubclass(check_class, cls):
if check_class.BASE_CLASS == cls.BASE_CLASS and \
check_class.__name__ != cls.BASE_CLASS:
return True
return False
class VariableError(Exception):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return _("Cyclic dependence in variables: {}").format(", ".join(
self.queue[:-1]))
class VariableProperty:
'''Базовый класс для объектов свойств.'''
def __init__(self, parent):
self.parent = parent
class StringVariable(VariableProperty):
'''Класс свойства, соответствующий переменным просто хранящим строки как
значения.'''
pass
class ListVariable(VariableProperty):
'''Класс свойства, соответствующий переменным хранящим списки значений.'''
def set_value(self, value, force=False):
if isinstance(value, (list, tuple)):
return value
elif isinstance(value, str):
return value.split(",")
else:
raise VariableError(
_("The value of variable '{varname}' must be list").format(
varname=self.parent.name))
def post_get(self, value):
return value
class ReadonlyVariable(VariableProperty):
def set_value(self, value, force=False):
if not force:
raise VariableError(
_("Attempting to rewrite readonly variable {}").format(
self.parent.name))
return value
class IntegerVariable(VariableProperty):
re_match = re.compile(r"^-?\d+$")
def check(self, value):
if value and not self.re_match.match(value):
raise VariableError(
_("The value of variable '{varname}' must be integer").format(
varname=self.parent.name))
def post_get(self, value):
return int(value)
class BooleanVariable(VariableProperty):
def post_get(self, value):
return value == "true"
class ChoiceVariable(VariableProperty):
def check(self, value):
choices = self.parent.choice()
if value and value not in choices:
raise VariableError(
_("The value for variable '{varname}' may be "
"{vartype}").format(varname=self.parent.name,
vartype=",".join(choices)))
def choice(self):
if self.parent.__class__.choice == Variable.choice and \
self.parent.__class__.choice_comment == Variable.choice_comment:
raise VariableError(_("Wrong choice variable {}").format(
self.parent.name))
return [x[0] for x in self.parent.choice_comment()]
def choice_comment(self):
return [(x, x) for x in self.parent.choice()]
class DefaultValue(VariableProperty):
def __init__(self, value):
self.value = value
self.parent = None
def get_value(self, invalidate_subs=None):
if self.parent._value is None:
self.parent._unsubscribe_depends()
self.parent._value = self.value
class IniCreated(DefaultValue):
pass
class Variable(BaseClass):
BASE_CLASS = "Variable"
On = "on"
Off = "off"
value = ""
class Type:
Bool = "bool"
List = "list"
String = "string"
Integer = "int"
Choice = "choice"
properties = (StringVariable,)
def __init__(self, name):
self.name = name
self._value = None
self.vars = None
self.invalidate_subs = set()
self.depends = set()
self.calculating = False
self._properties = [x(self) for x in self.properties]
# версия значения переменной, если переменная со времен получения
# значения успела обновиться, исключить сброс ее значения
@property
def fullname(self):
'''Метод для получения полного имени переменной.'''
return "{}.{}".format(self.vars.get_fullname(), self.name)
def addProperty(self, prop):
prop.parent = self
self._properties.insert(0, prop)
def findProperty(self, propclass):
for prop in self._properties:
if isinstance(prop, propclass):
return prop
else:
return None
def _emit_invalidate(self, slots):
for f in slots:
f()
def _unsubscribe_depends(self):
for dependvar in self.depends:
dependvar.update_unsubscribe(self.invalidate)
self.depends = set()
def update_subscribe(self, f_or_var):
if isinstance(f_or_var, Variable):
f_or_var.depends.add(self)
self.invalidate_subs.add(f_or_var.invalidate)
else:
self.invalidate_subs.add(f_or_var)
def update_unsubscribe(self, f):
if f in self.invalidate_subs:
self.invalidate_subs.remove(f)
def invalidate(self):
self._value = None
if self.invalidate_subs:
slots = self.invalidate_subs
self.invalidate_subs = set()
self._emit_invalidate(slots)
def set_parent(self, namespace):
self.vars = namespace
@contextmanager
def _start_calculate(self):
try:
self.calculating = True
yield self
finally:
self.calculating = False
def get_value(self, invalidate_sub=None):
for f in self.call_properties("get_value"):
f(invalidate_sub)
if self.calculating:
raise CyclicVariableError(self.name)
if self._value is None:
with self._start_calculate():
try:
self._unsubscribe_depends()
self._value = self.get()
if isinstance(self._value, types.GeneratorType):
self._value = list(self._value)
except CyclicVariableError as e:
raise CyclicVariableError(self.name, *e.queue)
if invalidate_sub is not None:
self.update_subscribe(invalidate_sub)
return self.post_get(self._value)
def post_get(self, value):
for f in self.call_properties("post_get"):
ret = f(value)
if ret is not None:
return ret
return value
def call_properties(self, method_name, *args):
'''Метод для вызова указанного метода у всех объектов, которыми владеет
переменная.'''
for _property in self._properties:
method = getattr(_property, method_name, None)
if method:
yield method
def set_value(self, value, force=False):
'''Метод для установки некоторого заданного значения всем объектам,
принадлежащим переменной.'''
for setter in self.call_properties("set_value"):
value = setter(value, force)
value = self.set(value)
self.check(value)
self.invalidate()
self._value = value
self._unsubscribe_depends()
def check(self, value):
'''Метод для проверки корректности устанавливаемого значения путем
вызова проверочных методов всех объектов, которыми владеет переменная.
'''
for checker in self.call_properties("check"):
checker(value)
def get(self):
'''Метод для заполнения переменной.'''
return self.value
def get_comment_value(self, invalidate_sub=None):
'''Этот метод вызывается внутри методов get.'''
val = self.get_comment()
if invalidate_sub is not None:
self.update_subscribe(invalidate_sub)
return val
def get_comment(self):
'''Метод для установки .'''
for f in self.call_properties("get_comment"):
return f()
return self.get_value()
def set(self, value):
'''Метод для модификации переменной.'''
return value
def choice(self):
'''Метод возвращет список доступных значений для переменной.'''
for f in self.call_properties("choice"):
return f()
return []
def choice_comment(self):
for f in self.call_properties("choice_comment"):
return f()
return []
class NamespaceError(Exception):
pass
class Namespace(BaseClass):
'''Класс пространства имен.'''
BASE_CLASS = "Namespace"
def __init__(self, name="", parent=None):
self._name = name
self.variables = {}
self.childs = {}
self.parent = parent or self
self.root = self
self._next_namespace = 0
def get_fullname(self):
'''Метод для получения полного имени пространства имен, включающего в
себя имена всех родительских пространств имен.'''
if self.parent is not self and self.parent.parent is not self.parent:
return "{}.{}".format(self.parent.get_fullname(), self._name)
else:
return self._name
def __getattr__(self, name):
if name in self.childs:
return self.childs[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableNotFoundError(
_("Variable or namespace {varname} not found").format(
varname="{}.{}".format(self.get_fullname(),
name)))
def clear_childs(self):
'''Метод для глубокой очистки пространства имен от всех дочерних
пространств имен.'''
for child in self.childs.values():
child.clear_childs()
self.childs = {}
def __getitem__(self, name):
return getattr(self, str(name))
def __setitem__(self, name, value):
return getattr(self, str(name)).set_value(value)
def __iter__(self):
# Сортировка: вначале числовые ключи потом прочие.
def sortkey(x):
k, v = x
if k.isdigit():
return (0, int(k), k)
else:
return (1, 0, k)
for k, v in sorted(self.childs.items(), key=sortkey):
yield v
def __contains__(self, name):
return name in self.childs or name in self.variables
def add_string_variable(self, varname: str, value: str):
'''Метод для добавления переменной с помощью строк.'''
var = Variable(varname)
var.value = value
var.set_parent(self)
self.variables[varname] = var
def add_variable(self, variable: Variable):
'''Метод для добавления переменной.'''
self.variables[variable.name] = variable
variable.set_parent(self)
def _get_next_namespace_name(self):
'''Метод для получения имени следующего по счету пространства имен.'''
name = str(self._next_namespace)
while name in self.childs:
self._next_namespace += 1
name = str(self._next_namespace)
return name
def add_namespace(self, namespace=None, name=None):
'''Метод для добавления пространств имен.'''
if name is None:
if namespace is None:
name = self._get_next_namespace_name()
else:
name = namespace._name
if namespace is None:
namespace = Namespace(name)
self.childs[name] = namespace
namespace.parent = self
namespace.root = self.root
return namespace
class HashVariable(Namespace):
'''Класс переменных, представляющих собой словарь.'''
BASE_CLASS = "HashVariable"
class HashValue(Variable):
BASE_CLASS = "HashValue"
def __init__(self, name, master_variable):
super().__init__(name)
self.parent = None
self.master_variable = master_variable
def get_value(self, invalidate_sub=None):
return self.master_variable.getHashValue(self.name, invalidate_sub)
def set_value(self, value, force=False):
return self.master_variable.setHashValue(self.name, value, force)
def invalidate(self):
self.master_variable.invalidate()
class Data(Variable):
BASE_CLASS = "Data"
def getHashValue(self, name, invalidate_sub=None):
return self.get_value(invalidate_sub)[name]
def setHashValue(self, name, value, force):
if name in self.readonly_vars and not force:
raise VariableError(
_("Attempting to rewrite readonly variable {}").
format(name))
data = self.get_value().copy()
data[name] = value
self.set_value(data, force)
def get_value(self, invalidate_sub=None):
return self.master_variable.get_value(invalidate_sub)
hash_vars = []
readonly_vars = []
def __init__(self, name, parent=None, hash_vars=None):
super().__init__(name)
if hash_vars is not None:
self.hash_vars = hash_vars
self.parent = parent
if not self.hash_vars:
raise VariableError(
_("Missed '{attrname}' attribute for hash variable {varname}").
format(attrname="hash_vars", varname=self.get_fullname()))
self.master_variable = self.Data(name)
self.master_variable.set_parent(parent)
self.master_variable.readonly_vars = self.readonly_vars
for varname in self.hash_vars:
var = self.HashValue(varname, self.master_variable)
self.add_variable(var)
def invalidate(self):
self.master_variable.invalidate()
class TableVariable(Namespace):
"""
Переменная представляет собой список словарей
"""
BASE_CLASS = "TableVariable"
class TableHashVariable(HashVariable):
BASE_CLASS = "TableHashVariable"
def __init__(self, name, parent=None, hash_vars=None,
master_variable=None, index=None):
super().__init__(name, parent, hash_vars)
self.master_variable._index = index
class Data(HashVariable.Data):
def getHashValue(self, name, invalidate_sub=None):
return self.vars.master_variable.getTableValue(name,
self._index,
invalidate_sub)
def setHashValue(self, name, value, force):
self.vars.master_variable.setTableValue(name, self._index,
value, force)
class Data(Variable):
BASE_CLASS = "Data"
def getTableValue(self, name, index, invalidate_sub=None):
data = self.get_value(invalidate_sub)
return data[index][name]
def setTableValue(self, name, index, value, force):
if name in self.readonly_vars and not force:
raise VariableError(
_("Attempting to rewrite readonly variable {}").format(
name))
data = [x.copy() for x in self.get_value()]
rowdata = data[index]
rowdata[name] = value
self.set_value(data, force)
@property
def childs(self):
if self._childs is None:
value = self.master_variable.get_value()
self._childs = {}
for i, row in enumerate(value):
hashvar = self.TableHashVariable(self.master_variable.name,
self, self.hash_vars,
self.master_variable, i)
self._childs[str(i)] = hashvar
return self._childs
@childs.setter
def childs(self, value):
self._childs = value
def get_value(self, invalidate_sub=None):
return self.master_variable.get_value(invalidate_sub)
def set_value(self, value, force=False):
self.master_variable.set_value(value, force)
def invalidate(self):
self.master_variable.invalidate()
hash_vars = []
readonly_vars = []
def _drop_childs(self):
self._childs = None
self.master_variable.update_subscribe(self._drop_childs)
def clear_childs(self):
super().clear_childs()
self._drop_childs()
def __init__(self, name, parent=None):
super().__init__(name)
self._childs = None
self.parent = parent
if not self.hash_vars:
raise VariableError(
_("Missed '{attrname}' attribute for table variable {varname}").
format(attrname="hash_vars",
varname=self.get_fullname()))
self.master_variable = self.Data(name)
self.master_variable.set_parent(parent)
self.master_variable.readonly_vars = self.readonly_vars
self.master_variable.update_subscribe(self._drop_childs)