You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import site
import os
import re
import sys
import types
import functools
from contextlib import contextmanager
_ = lambda x:x
class BaseClass:
BASE_CLASS = "BaseClass"
@classmethod
def isImplementation(cls, check_class):
"""
Проверить, что класс является производным базового класса, а
не самим базовым, используется в автозагрузке переменных из модулей
"""
if isinstance(check_class, type) and issubclass(check_class, cls):
if check_class.BASE_CLASS == cls.BASE_CLASS and \
check_class.__name__ != cls.BASE_CLASS:
return True
return False
class VariableError(Exception):
pass
class VariableNotFoundError(VariableError):
pass
class CyclicVariableError(VariableError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return _("Cyclic dependence in variables: %s") % ", ".join(self.queue[:-1])
class VariableProperty:
def __init__(self, parent):
self.parent = parent
class StringVariable(VariableProperty):
pass
class ListVariable(VariableProperty):
def setValue(self, value, force=False):
if isinstance(value, (list, tuple)):
return value
elif isinstance(value, str):
return value.split(",")
else:
raise VariableError(
_("The value of variable '{varname}' must be list").format(
varname=self.parent.name))
def post_get(self, value):
return value
class ReadonlyVariable(VariableProperty):
def setValue(self, value, force=False):
if not force:
raise VariableError(
_("Attempting to rewrite readonly variable %s")%self.parent.name)
return value
class IntegerVariable(VariableProperty):
reMatch = re.compile(r"^-?\d+$")
def check(self, value):
if value and not self.reMatch.match(value):
raise VariableError(
_("The value of variable '{varname}' must be integer").format(
varname=self.parent.name))
def post_get(self, value):
return int(value)
class BooleanVariable(VariableProperty):
def post_get(self, value):
return value == "true"
class ChoiceVariable(VariableProperty):
def check(self, value):
choices = self.parent.choice()
if value and value not in choices:
raise VariableError(
_("The value for variable '{varname}' may be "
"{vartype}").format(
varname=self.parent.name,
vartype=",".join(choices)))
def choice(self):
if self.parent.__class__.choice == Variable.choice and \
self.parent.__class__.choiceComment == Variable.choiceComment:
raise VariableError(_("Wrong choice variable %s")
% self.parent.name)
return [x[0] for x in self.parent.choiceComment()]
def choiceComment(self):
return [(x,x) for x in self.parent.choice()]
class DefaultValue(VariableProperty):
def __init__(self, value):
self.value = value
self.parent = None
def getValue(self, invalidate_subs=None):
if self.parent._value is None:
self.parent._unsubscribe_depends()
self.parent._value = self.value
class IniCreated(DefaultValue):
pass
class Variable(BaseClass):
BASE_CLASS = "Variable"
On = "on"
Off = "off"
value = ""
class Type:
Bool = "bool"
List = "list"
String = "string"
Integer = "int"
Choice = "choice"
properties = (StringVariable,)
def __init__(self, name):
self.name = name
self._value = None
self.vars = None
self.invalidate_subs = set()
self.depends = set()
self.calculating = False
self._properties = [x(self) for x in self.properties]
# версия значения переменной, если переменная со времен получения
# значения успела обновиться, исключить сброс ее значения
@property
def fullname(self):
return "%s.%s" % (self.vars.getFullname(), self.name)
def addProperty(self, prop):
prop.parent = self
self._properties.insert(0, prop)
def findProperty(self, propclass):
for prop in self._properties:
if isinstance(prop, propclass):
return prop
else:
return None
def _emit_invalidate(self, slots):
for f in slots:
f()
def _unsubscribe_depends(self):
for dependvar in self.depends:
dependvar.update_unsubscribe(self.invalidate)
self.depends = set()
def update_subscribe(self, f_or_var):
if isinstance(f_or_var, Variable):
f_or_var.depends.add(self)
self.invalidate_subs.add(f_or_var.invalidate)
else:
self.invalidate_subs.add(f_or_var)
def update_unsubscribe(self, f):
if f in self.invalidate_subs:
self.invalidate_subs.remove(f)
def invalidate(self):
self._value = None
if self.invalidate_subs:
slots = self.invalidate_subs
self.invalidate_subs = set()
self._emit_invalidate(slots)
def setParent(self, namespace):
self.vars = namespace
@contextmanager
def _start_calculate(self):
try:
self.calculating = True
yield self
finally:
self.calculating = False
def getValue(self, invalidate_sub=None):
for f in self.callProperties("getValue"):
f(invalidate_sub)
if self.calculating:
raise CyclicVariableError(self.name)
if self._value is None:
with self._start_calculate():
try:
self._unsubscribe_depends()
self._value = self.get()
if isinstance(self._value, types.GeneratorType):
self._value = list(self._value)
except CyclicVariableError as e:
raise CyclicVariableError(self.name, *e.queue)
if invalidate_sub is not None:
self.update_subscribe(invalidate_sub)
return self.post_get(self._value)
def post_get(self, value):
for f in self.callProperties("post_get"):
ret = f(value)
if ret is not None:
return ret
return value
def callProperties(self, fname, *args):
for prop in self._properties:
f = getattr(prop, fname, None)
if f:
yield f
def setValue(self, value, force=False):
for f in self.callProperties("setValue"):
value = f(value, force)
value = self.set(value)
self.check(value)
self.invalidate()
self._value = value
self._unsubscribe_depends()
def check(self, value):
"""
Функция проверки значения устанавливаемого значения
"""
for f in self.callProperties("check"):
f(value)
def get(self):
"""
Функция заполнения переменной
"""
return self.value
def getCommentValue(self, invalidate_sub=None):
"""
Этот метод вызывается внутри методов get
"""
val = self.getComment()
if invalidate_sub is not None:
self.update_subscribe(invalidate_sub)
return val
def getComment(self):
"""
Комментарий к значению
"""
for f in self.callProperties("getComment"):
return f()
return self.getValue()
def set(self, value):
"""
Функция модификации переменной
"""
return value
def choice(self):
"""
Функция возвращет список доступных значений для переменной
"""
for f in self.callProperties("choice"):
return f()
return []
def choiceComment(self):
for f in self.callProperties("choiceComment"):
return f()
return []
class NamespaceError(Exception):
pass
class Namespace(BaseClass):
BASE_CLASS = "Namespace"
def __init__(self, name="", parent=None):
self._name = name
self.variables = {}
self.childs = {}
self.parent = parent or self
self.root = self
self._nextns = 0
def getFullname(self):
if self.parent is not self and self.parent.parent is not self.parent:
return "%s.%s" % (self.parent.getFullname(), self._name)
else:
return self._name
def __getattr__(self, name):
if name in self.childs:
return self.childs[name]
elif name in self.variables:
return self.variables[name]
else:
raise VariableNotFoundError(
_("Variable or namespace {varname} not found").format(
varname="%s.%s" % (self.getFullname(), name)))
def clearChilds(self):
for child in self.childs.values():
child.clearChilds()
self.childs = {}
def __getitem__(self, name):
return getattr(self, str(name))
def __setitem__(self, name, value):
return getattr(self, str(name)).setValue(value)
def __iter__(self):
"""
Сортировка: вначале числовые ключи потом прочие
"""
def sortkey(x):
k,v = x
if k.isdigit():
return (0,int(k),k)
else:
return (1,0,k)
for k,v in sorted(self.childs.items(), key=sortkey):
yield v
def __contains__(self, name):
return name in self.childs or name in self.variables
def addStringVariable(self, varname, value):
var = Variable(varname)
var.value = value
var.setParent(self)
self.variables[varname] = var
def addVariable(self, variable):
self.variables[variable.name] = variable
variable.setParent(self)
def _getNextNamespaceName(self):
name = str(self._nextns)
while name in self.childs:
self._nextns += 1
name = str(self._nextns)
return name
def addNamespace(self, namespace=None, name=None):
if name is None:
if namespace is None:
name = self._getNextNamespaceName()
else:
name = namespace._name
if namespace is None:
namespace = Namespace(name)
self.childs[name] = namespace
namespace.parent = self
namespace.root = self.root
return namespace
class HashVariable(Namespace):
"""
Переменная представляет собой словарь
"""
BASE_CLASS = "HashVariable"
class HashValue(Variable):
BASE_CLASS = "HashValue"
def __init__(self, name, master_variable):
super().__init__(name)
self.parent = None
self.master_variable = master_variable
def getValue(self, invalidate_sub=None):
return self.master_variable.getHashValue(self.name, invalidate_sub)
def setValue(self, value, force=False):
return self.master_variable.setHashValue(self.name, value, force)
def invalidate(self):
self.master_variable.invalidate()
class Data(Variable):
BASE_CLASS = "Data"
def getHashValue(self, name, invalidate_sub=None):
return self.getValue(invalidate_sub)[name]
def setHashValue(self, name, value, force):
if name in self.readonly_vars and not force:
raise VariableError(
_("Attempting to rewrite readonly variable %s")%name)
data = self.getValue().copy()
data[name] = value
self.setValue(data, force)
def getValue(self, invalidate_sub=None):
return self.master_variable.getValue(invalidate_sub)
hash_vars = []
readonly_vars = []
def __init__(self, name, parent=None, hash_vars=None):
super().__init__(name)
if hash_vars is not None:
self.hash_vars = hash_vars
self.parent = parent
if not self.hash_vars:
raise VariableError(
_("Missed '{attrname}' attribute for hash variable {varname}"
).format(attrname = "hash_vars",
varname=self.getFullname()))
self.master_variable = self.Data(name)
self.master_variable.setParent(parent)
self.master_variable.readonly_vars = self.readonly_vars
for varname in self.hash_vars:
var = self.HashValue(varname, self.master_variable)
self.addVariable(var)
def invalidate(self):
self.master_variable.invalidate()
def getValue(self, invalidate_sub=None):
return self.master_variable.getValue(invalidate_sub)
class TableVariable(Namespace):
"""
Переменная представляет собой список словарей
"""
BASE_CLASS = "TableVariable"
class TableHashVariable(HashVariable):
BASE_CLASS = "TableHashVariable"
def __init__(self, name, parent=None, hash_vars=None,
master_variable=None, index=None):
super().__init__(name, parent, hash_vars)
self.master_variable._index = index
class Data(HashVariable.Data):
def getHashValue(self, name, invalidate_sub=None):
return self.vars.master_variable.getTableValue(name, self._index,
invalidate_sub)
def setHashValue(self, name, value, force):
self.vars.master_variable.setTableValue(name, self._index, value, force)
class Data(Variable):
BASE_CLASS = "Data"
def getTableValue(self, name, index, invalidate_sub=None):
data = self.getValue(invalidate_sub)
return data[index][name]
def setTableValue(self, name, index, value, force):
if name in self.readonly_vars and not force:
raise VariableError(
_("Attempting to rewrite readonly variable %s")%name)
data = [x.copy() for x in self.getValue()]
rowdata = data[index]
rowdata[name] = value
self.setValue(data, force)
@property
def childs(self):
if self._childs is None:
value = self.master_variable.getValue()
self._childs = {}
for i, row in enumerate(value):
hashvar = self.TableHashVariable(self.master_variable.name,
self, self.hash_vars, self.master_variable, i)
self._childs[str(i)] = hashvar
return self._childs
@childs.setter
def childs(self, value):
self._childs = value
def getValue(self, invalidate_sub=None):
return self.master_variable.getValue(invalidate_sub)
def setValue(self, value, force=False):
self.master_variable.setValue(value, force)
def invalidate(self):
self.master_variable.invalidate()
hash_vars = []
readonly_vars = []
def _drop_childs(self):
self._childs = None
self.master_variable.update_subscribe(self._drop_childs)
def clearChilds(self):
super().clearChilds()
self._drop_childs()
def __init__(self, name, parent=None):
super().__init__(name)
self._childs = None
self.parent = parent
if not self.hash_vars:
raise VariableError(
_("Missed '{attrname}' attribute for table variable {varname}"
).format(attrname = "hash_vars",
varname=self.getFullname()))
self.master_variable = self.Data(name)
self.master_variable.setParent(parent)
self.master_variable.readonly_vars = self.readonly_vars
self.master_variable.update_subscribe(self._drop_childs)