You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/datavars.py

2041 lines
71 KiB

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
import importlib
from .utils.tools import drop_slash
from .utils.text import convertStrListDict, _u, _u8, formatListOr
from .utils.files import pathJoin
from .utils.portage import searchProfile, RepositorySubstituting, RepositoryPath
from os import path
import os
from collections import OrderedDict
from collections.abc import Iterable
from threading import RLock
from itertools import *
import operator
import traceback
from string import Formatter
from contextlib import contextmanager
# from types import StringTypes, GeneratorType
from types import GeneratorType
from .configparser import ConfigParser, ParsingError
from .cl_lang import setLocalTranslate, getLazyLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_lib3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
def addStdConfig(fileLst, prefix='/'):
stdPath = path.join(prefix,
drop_slash(RepositoryPath.Calculate), "calculate.env")
if stdPath not in map(os.path.abspath, fileLst):
return [stdPath] + fileLst
return fileLst
def itemgetter(*args, **kwargs):
"""
itemgetter with alwaysTuple param, which used for force tuple elements
"""
alwaysTuple = kwargs.get('alwaysTuple', False)
if alwaysTuple:
if len(args) == 1:
return lambda x: (x[args[0]],)
return operator.itemgetter(*args)
class CriticalError(Exception):
"""Critical error"""
class DataVarsError(Exception):
"""Exception of getting variable values"""
exception_list=None
def __init__(self, expression):
if isinstance(expression, list):
super().__init__("")
self.exception_list = expression
else:
super().__init__(expression)
class ImportModuleDataVarsError(DataVarsError):
"""Ошибка импортирования модуля"""
class VariableError(Exception):
"""Exception of sended by Variable"""
exception_list=None
def __init__(self, expression):
if isinstance(expression, list):
super().__init__("")
self.exception_list = expression
else:
super().__init__(expression)
class PasswordError(VariableError):
"""Exception of password change error"""
class CommonVariableError(VariableError):
"""Exception for command line"""
class BuildAlreadyInstalledError(VariableError):
"""Exception for install -U when used after a successful install"""
def makePath(dirs, mode=755):
if not path.exists(dirs):
os.makedirs(dirs, mode)
READONLY = "r"
WRITEABLE = "w"
class HumanReadable():
Yes = True
No = False
Auto = None
class ValueFormatter(Formatter):
def get_field(self, field_name, args, kwargs):
Get = args[0]
if field_name.endswith("()"):
field_name, o, func = field_name.rpartition(".")
value = str(Get(field_name))
return getattr(value, func[:-2])(), None
return Get(field_name), None
class VariableInterface():
_value_formatter = ValueFormatter()
type = "string"
def Get(self, varname=None, humanreadable=False):
pass
def Choice(self, varname):
pass
def Select(self, selField, **kwargs):
pass
def select(self, *fields, **kw):
pass
def ZipVars(self, *argvVarNames, **kw):
pass
def uncompatible(self):
pass
def set(self, value):
pass
def get(self):
pass
def choice(self):
pass
def GetBool(self, varname):
pass
def GetInteger(self, varname):
pass
def invalidate(self, force=False):
pass
def is_console_set(self, varname):
pass
class Variable(VariableInterface):
"""
Class of variable. Need for creating calculate utilities variable
add fill method described by class with fill methods.
Instance attributes:
wasSet : boolean
variable value was set by _Set method
reqVars : List
list of variable which use this variable in fill method
invalid : boolean
invalid value
section : string
section of variable (main,install,builder and etc)
"""
class Types():
String = "string"
List = "list"
Choice = "choice"
Table = "table"
Boolean = "bool"
Object = "object"
Tristate = "bool3"
# label of variable
label = None
# opts for cmdline
opt = None
# metavalues
metavalue = None
# help information
help = None
# writeable variable
mode = WRITEABLE
# hidden variable
hide = True
# default value
value = None
# variable section
section = "main"
# variable name
name = ""
# syntax for opts
syntax = ""
# variable type: string, multichoice, list, number, choiceedit, table
# flag, yesno, password
type = "string"
# source column with data
source = None
# gui element type
element = None
# force check after variables in list
check_after = []
# fill value is true
untrusted = False
NONE, CHECK, UNCOMPAT, HUMAN = 0, 1, 2, 3
# при установке значения переменной устанавливает значение
# указанной переменной
alias = ""
# значение заполняется переменными
value_format = ""
# спец тип для gui
guitype = ""
On = "on"
Off = "off"
Auto = "auto"
EmptyTable = [[]]
Readonly = READONLY
Writable = WRITEABLE
def __init__(self, parent=None, **kwargs):
"""
Initialize instance
"""
self.modeGet = Variable.NONE
self.wasSet = False # variable is user or ini set
self.reqVars = [] # varaibles which used this value for fill
self.reqCheck = [] # variables which used this value for check
self.reqUncompat = [] # varaibles which used for uncompat
self.variableType = type
self.parent = parent
# self.value is None or self.name in self.parent.iniCache
self.invalid = True
self.name = self.getVariableName()
self.processInvalidate = False
# производится чтение из ini
self.bIniRead = False
self.init()
for key, val in kwargs.items():
setattr(self, key, val)
def init(self):
"""
Overridible init using for palce translatable strings
"""
pass
@classmethod
def getVariableName(cls):
return re.sub("(.)([A-Z])", "\\1_\\2", cls.__name__[8:]).lower()
def getCharType(self):
"""
Get symbol of type
"""
mapType = OrderedDict(table='t', list='l', choice='c', number='i',
bool='b')
for k in mapType.keys():
if k in self.type:
return mapType[k]
else:
return "s"
@property
def empty_value(self):
if "list" in self.type:
return []
elif "table" in self.type:
return [[]]
return ""
def is_console_set(self, varname):
return self.parent.is_console_set(varname)
def GetBool(self, varname):
return self.isTrue(self.Get(varname))
def GetInteger(self, varname):
return int(self.Get(varname))
def Get(self, varname=None, humanreadable=False):
"""Get value of other value"""
if not varname or varname == self.name:
return self._get()
varObj = self.parent.getInfo(varname)
# varObj.modeGet = Variable.UNCOMPAT
if not "." in varname:
# varname = "%s.%s" % tuple([self.section, varname])
varname = f"{self.section}.{varname}"
res = self.parent.Get(varname, humanreadable)
# build dependence for check and uncompatible
if varObj:
if self.modeGet == Variable.NONE:
if self not in varObj.reqVars:
varObj.reqVars.append(self)
if self.modeGet == Variable.CHECK:
if self not in varObj.reqCheck:
varObj.reqCheck.append(self)
elif self.modeGet == Variable.UNCOMPAT:
if varObj not in self.reqUncompat:
self.reqUncompat.append(varObj)
return res
def Choice(self, varname):
"""Choice from other value"""
return self.parent.Choice(varname)
def Select(self, selField, **kwargs):
"""Select by datavars"""
return self.parent.Select(selField, zipVars=self.ZipVars, **kwargs)
def select(self, *fields, **kw):
return self.parent.select(*fields, zip_vars=self.ZipVars, **kw)
def getRemoteInfo(self, envFile):
"""getRemoteInfo by datavars"""
return self.parent.getRemoteInfo(envFile)
def ZipVars(self, *argvVarNames, **kw):
"""
Get zipped values of variables specified by list 'argvVarNames'
"""
if "humanreadable" in kw:
hr = kw['humanreadable']
return list(zip(*(self.Get(x, humanreadable=hr) for x in argvVarNames)))
else:
return list(zip(*map(self.Get, argvVarNames)))
def _get_format_value(self):
return self._value_formatter.format(
self.__class__.value_format, self.Get)
def get(self):
"""
Overridable
Calculate value method
"""
if self.__class__.value_format:
return self._get_format_value()
if self.__class__.value:
return self.__class__.value
else:
return [] if "list" in self.type else ""
def humanReadable(self):
"""Return human readable value"""
val = self.Get()
if "choice" in self.type:
choiceVal, commentVal = self._choice()
if commentVal:
valDict = dict(zip(choiceVal, commentVal))
if "list" in self.type:
return [x if isinstance(x, (list, tuple))
else valDict.get(x, x) for x in val]
else:
return valDict.get(val, val)
return val
def _human(self):
oldModeGet = self.modeGet
try:
self.modeGet = Variable.HUMAN
return self.humanReadable()
finally:
self.modeGet = oldModeGet
def getHumanReadableAuto(self):
"""
Return humanreadable values for readonly variables
"""
if self.mode == READONLY:
return self._human()
else:
return self._get()
def iniread(self):
"""
Read from ini file
"""
if self.bIniRead:
return False
self.bIniRead = True
try:
value = self.parent.iniCache[self.fullname]
value = self.parent.unserialize(self.type, value)
# value = self.parent.unserialize(self.type,
# value.encode('utf-8'))
if self.mode == READONLY:
raise VariableError(
_("Attempting to rewrite readonly variable %s "
"via calculate.env") %
self.name)
else:
try:
self._set(value)
except Exception:
return False
return True
finally:
self.bIniRead = False
@property
def fullname(self):
return '%s.%s' % (self.section, self.name)
def _get(self):
"""
Standard inner method for getting value of variable.
"""
# variable need refill
if self.invalid:
# get net value
if self.fullname in self.parent.iniCache and self.iniread():
return self.value
else:
oldModeGet = self.modeGet
try:
self.modeGet = Variable.NONE
value = self.get()
self.untrusted = self.__class__.untrusted
finally:
self.modeGet = oldModeGet
# if value not equal previous value
if value != self.value:
# invalidate depended variables
self.invalidate()
# mark as filled
self.invalid = False
if hasattr(self.value, "close"):
self.value.close()
self.value = value
# return self.value
val = self.value
if isinstance(val, bytes):
val = str(val, encoding="UTF-8")
return val
def choice(self):
"""
Overridable
Generation list of allowable values
"""
return []
def _choice(self):
"""
Convert choice created by variable choice to two tuple (values,comments)
"""
res = self.choice()
if isinstance(res, GeneratorType):
res = tuple(res)
if res and type(res[0]) in (tuple, list):
return list(zip(*res))
else:
return res, None
def check_on(self):
pass
def check_off(self):
pass
def check(self, value):
"""
Overridable
Check method, for wrong value need raise VariableError
"""
if self.type == self.Types.Boolean:
if self.isTrue(value):
self.check_on()
else:
self.check_off()
pass
def uncompatible(self):
"""
Overridable
Using for check uncompatibility using variable with value of
other variables.
"""
return ""
def _uncompatible(self):
"""
Full check uncompatible
"""
try:
self.modeGet = Variable.UNCOMPAT
self.reqUncompat = []
return self.uncompatible()
finally:
self.modeGet = Variable.NONE
def set(self, value):
"""
Overridable
Using for replace value before set
"""
def convert(value):
if value and value.lower() != "auto":
return "on" if self.isTrue(value) else "off"
else:
return ""
if self.type == "bool":
value = "on" if self.isTrue(value) else "off"
if "bool" in self.type and "list" in self.type:
return [convert(x) for x in value]
if "int" in self.type:
return str(value)
return value
def raiseNothingValue(self):
"""
Using for override error message about nothing value.
"""
pass
def checkType(self, value):
"""Check value for type"""
if "int" in self.type:
if (value and not value.isdigit() and not value.startswith("-") and
not value[1:].isdigit()):
raise VariableError(
_("The value of variable '{varname}' must be integer"
).format(varname=self.label or self.name))
if "list" in self.type:
if not type(value) in (list, tuple):
raise VariableError(
_("The value for variable '{varname}' "
"may be {vartype} only").format(
varname=self.label or self.name,
vartype="list"))
error = _("Values for variable '{varname}' may be {vartype} only")
else:
value = repeat(value, 1)
error = _(
"The value for variable '{varname}' may be {vartype} only")
if "string" in self.type:
value, valuecopy = tee(value, 2)
if any(x for x in valuecopy if not isinstance(x, str)):
raise VariableError(error.format(
varname=self.label or self.name,
vartype="string"))
return
if "choice" in self.type:
choiceVal = self.choice() or []
if isinstance(choiceVal, GeneratorType):
choiceVal = tuple(choiceVal)
tipChoice = ['"%s" [%s]' % (x[1], x[0]) if type(x) in (list, tuple) else str(x) for x in choiceVal]
if choiceVal and type(choiceVal[0]) in (tuple, list):
choiceVal = [x[0] for x in choiceVal]
if "list-list" not in self.type:
value = repeat(value, 1)
for recval in value:
for val in recval:
name = self.label or self.name
if val == "list" and "choice" in self.type:
if "edit" in self.type:
if "list" in self.type:
error = _("Values for variable '{varname}' may "
"be {vartype}")
else:
error = _(
"The value for variable '{varname}' may be "
"{vartype}")
self.raiseWrongChoice(name, tipChoice, val, error)
if "choice" in self.type and not "choiceedit" in self.type:
if ((choiceVal and (val or val == "") and
val not in choiceVal or
val is None) or
not choiceVal):
if not choiceVal:
self.raiseNothingValue()
self.raiseWrongChoice(name, tipChoice, val, error)
def raiseWrongChoice(self, name, choiceVal, value, error):
raise VariableError(error.format(
varname=name,
vartype=formatListOr(choiceVal)))
def setValue(self, value, force=False):
"""
Standard action for set value
"""
self.value = self.set(value)
self.wasSet = True
self.invalid = False
# run check
if not force:
self._check()
def _set(self, value, force=False):
"""
Standard inner method for setting value for variable.
"""
# runc check choice
if type(value) == int:
value = str(value)
if not force:
try:
self.modeGet = Variable.CHECK
self.checkType(value)
finally:
self.modeGet = Variable.NONE
# if value change
if value != self.value or type(value) in (list, tuple):
self.invalidate(True)
if hasattr(self.value, "close"):
self.value.close()
# run set value
self.setValue(value, force)
def _check(self, value=None):
"""
Full check value
"""
if self.name != "cl_help_set" and not self.GetBool('cl_help_set'):
if value is None:
value = self.Get(self.name)
try:
self.modeGet = Variable.CHECK
self.untrusted = True
self.check(value)
self.untrusted = False
finally:
self.modeGet = Variable.NONE
def invalidate(self, force=False):
"""
Invalidate value of variable (drop filled value).
force : boolean=False
Drop value also for after manual set
"""
# fix possible infinite recursion on invalidating
if not self.processInvalidate and (force or not self.wasSet):
try:
# mark variable that in invalidation now
self.processInvalidate = True
self.wasSet = False
self.invalid = True
for var in self.reqVars:
var.invalidate()
var.untrusted = var.__class__.untrusted
for var in self.reqCheck:
var.untrusted = True
self.reqVars = []
self.reqCheck = []
finally:
# unmark process invalidate
self.processInvalidate = False
@classmethod
def isTrue(cls, value):
if type(value) == bool:
return value
value = value.lower()
if value in ('yes', 'on', 'true'):
return True
if value == 'auto':
return None
return False
def has_base_method(self, methodname):
"""
Проверить не изменился ли метод после наследования
"""
base_method = getattr(Variable, methodname)
method = getattr(self, methodname).__func__
return base_method == method
class TableVariable(Variable):
"""
Table type variable
"""
type = "table"
def set(self, value):
return value
def get(self, hr=HumanReadable.No):
"""Get table data"""
for varname, value in (x for x in ((y, self.Get(y)) for y in self.source) if not isinstance(x[1], list)):
raise VariableError(
_("Source variable %s does not contain a list") % varname)
return [list(x) for x in zip_longest(*[self.Get(x, humanreadable=hr)
for x in self.source], fillvalue='')] or [[]]
def getHumanReadableAuto(self):
"""
Return humanreadable values for readonly variables
"""
return self.get(hr=HumanReadable.Auto)
def humanReadable(self):
return self.get(hr=HumanReadable.Yes)
def __getWritableColumns(self, includeFirst=False):
"""
Get data for writable columns exclude index column
(Example: (1,'os_disk_mount)
"""
if includeFirst:
offset = 0
else:
offset = 1
# return list(filter(lambda x: self.parent.getInfo(x[1]).mode == WRITEABLE,
# enumerate(self.source[offset:])))
return [x for x in enumerate(self.source[offset:]) if self.parent.getInfo(x[1]).mode == WRITEABLE]
def check(self, value):
"""
Check table value - check all writeable columns.
"""
writeCols = list(self.__getWritableColumns(includeFirst=True))
error = []
if not any(value):
value = [[]] * len(writeCols)
else:
value = list(zip(*map(itemgetter(*map(itemgetter(0),
writeCols), alwaysTuple=True),value)))
for colInfo, values in zip(writeCols, value):
try:
self.parent.Check(colInfo[1], values)
except VariableError as e:
error.append(e)
if error:
raise VariableError(error)
def __isIndexWritable(self):
return self.parent.getInfo(self.source[0]).mode == WRITEABLE
def checkType(self, value):
"""Check table value type"""
# check type value (list and each element is list)
if not type(value) in (list, tuple) or \
any(i for i in value if not type(i) in (tuple, list)):
raise VariableError(
_("The value for {varname} may be '{vartype}' only").format(
varname=self.label or self.name,
vartype="table"))
# check len all entries
writeLen = len(list(self.__getWritableColumns())) + 1
for item in value:
if item and not len(item) in (writeLen, len(self.source)):
raise VariableError(
_("Wrong entry '{entry}' for table variable '{varname}'").
format(entry=str(item),
varname=(self.label or self.name).lower()) + "\n" +
_("Each entry must contains %d field(s)") % writeLen)
# check rewrite readonly index
if not self.__isIndexWritable():
indexValues = self.Get(self.source[0])
for item in value:
if item and not item[0] in indexValues:
self.raiseReadonlyIndexError(self.source[0],
self.name,
item[0])
def raiseReadonlyIndexError(self, fieldname="", variablename="",
value=""):
"""
Overriddible method for spcifing behavior on rewrite
readonly index
"""
raise DataVarsError(
_("Attempting to rewrite a readonly index field "
"{fieldname} in variable {variablename}").format(
fieldname=fieldname, variablename=variablename))
def setValue(self, value, force=False):
value = self.set(value)
self.untrusted = True
oldvalue = self.Get(self.name)
# get writable columns
writeCols = self.__getWritableColumns()
# get slicer
_slice = itemgetter(*map(itemgetter(0), writeCols),
alwaysTuple=True)
# create dict for writable columns
# if table not empty
if any(oldvalue):
# oldvalue = OrderedDict(map(lambda x: (x[0], _slice(x[1:])),
# oldvalue))
oldvalue = OrderedDict([(x[0], _slice(x[1:])) for x in oldvalue])
else:
oldvalue = OrderedDict()
# get new dict
if any(value):
if len(value[0]) == len(self.source):
# newval = OrderedDict(map(lambda x: (x[0], _slice(x[1:])),
# value))
newval = OrderedDict([(x[0], _slice(x[1:])) for x in value])
else:
# newval = OrderedDict(map(lambda x: (x[0], x[1:]), value))
newval = OrderedDict([(x[0], x[1:]) for x in value])
else:
newval = OrderedDict()
# if table with writable index field then replace all table
error = []
if self.__isIndexWritable():
if not force and any(value):
self.parent.Check(self.source[0], list(zip(*value))[0])
oldvalue = newval
try:
#py3 keys() now returns its own type instead of a list
self.parent.Set(self.source[0], list(oldvalue.keys()))
except VariableError as e:
error.append(e)
# update entry by index field
else:
oldvalue.update(newval)
oldvalValues = list(zip(*oldvalue.values()))
for col, vals in zip((x[1] for x in writeCols),
oldvalValues):
try:
self.parent.Set(col, list(vals))
except VariableError as e:
error.append(e)
for num, varname in writeCols[len(oldvalValues):]:
self.parent.Invalidate(varname, True)
if error:
raise VariableError(error)
else:
self.untrusted = False
class ReadonlyVariable(Variable):
"""
Alias for readonly variables
"""
mode = READONLY
class ActionVariable(ReadonlyVariable):
nonchroot = False
postinst = False
def get(self):
if self.nonchroot and self.Get('cl_chroot_status') == "on":
return "off"
if self.postinst and not self.Get('cl_ebuild_phase') in \
('', 'postinst', 'prerm'):
return "off"
return self.action(self.Get('cl_action'))
def action(self, cl_action):
return "off"
class FieldValue(VariableInterface):
"""
Table column variable
"""
type = "list"
source_variable = ""
column = 0
def get(self):
sourceVar = self.Get(self.source_variable)
if isinstance(sourceVar, Iterable):
if any(sourceVar):
return list(zip(*sourceVar))[self.column]
else:
return []
else:
raise VariableError(_("Field variable got not iterable value"))
class SourceReadonlyVariable(ReadonlyVariable):
"""
Source readonly variable (Using for table)
Example:
['os_location_source',...,'os_location_size'(this)]
"""
indexField = ""
def get(self):
return [x or "" for x in map(self.getMap().get,
self.Get(self.indexField))]
def humanReadable(self):
return [x or "" for x in map(self.getMapHumanReadable().get,
self.Get(self.indexField))]
def getMap(self):
return {}
def getMapHumanReadable(self):
return self.getMap()
class ReadonlyTableVariable(TableVariable):
"""
Alias for readonly table
"""
mode = READONLY
class SimpleDataVars():
"""
Simple datavars for temporary link var objs.
"""
systemRoot = "/"
def __init__(self, *objs):
self.allVars = {}
self.cache = {}
self.changed = set()
for obj in objs:
obj.parent = self
self.allVars[obj.getVariableName()] = obj
def Get(self, varname, humanreadable=HumanReadable.No):
varname = varname.rpartition(".")[2]
if not varname in self.cache:
if self.allVars.get(varname) is not None:
self.cache[varname] = self.allVars.get(varname).get()
else:
# ошибка возникает, когда такой переменной нет
# в этом пространстве переменных, например
# нет зависимой переменной
raise DataVarsError(_("Failed to get value %s") % varname)
return self.cache[varname]
def Set(self, varname, value):
varname = varname.rpartition(".")[2]
self.cache[varname] = value
self.changed.add(varname)
def Invalidate(self, varname):
varname = varname.rpartition(".")[2]
if varname in self.cache:
self.cache.pop(varname)
if varname in self.changed:
self.changed.remove(varname)
def GetBool(self, varname):
return Variable.isTrue(self.Get(varname))
def GetInteger(self, varname):
"""
Получить целочисленное значение
"""
try:
return int(self.Get(varname))
except ValueError:
return 0
def getInfo(self, varname):
return self.allVars.get(varname, None)
def __getitem__(self, item):
return self.Get(item)
def __setitem__(self, key, value):
self.Set(key, value)
def prepare_all(self):
for var in self.allVars.keys():
self.Get(var)
def flIniFileFrom(self, iniFile, system_root=''):
"""
Read variable values from ini files.
Return False if ini file is invalid.
"""
config = ConfigParser(strict=False)
if path.isdir(iniFile):
rs = RepositorySubstituting(self, system_root=system_root)
try:
config.read(addStdConfig(
searchProfile(iniFile, 'calculate.env',
repository_sub=rs),
prefix=self.systemRoot), encoding="utf-8")
except ParsingError:
raise DataVarsError(_("Error in calculate.env in profile"))
else:
try:
config.read(iniFile, encoding="utf-8")
except ParsingError:
raise DataVarsError(_("Error in %s") % iniFile)
importVars = {}
for varname, varobj in self.allVars.items():
if varobj.section not in importVars:
importVars[varobj.section] = \
(dict(config.items(varobj.section, raw=True))
if config.has_section(varobj.section)
else {})
if varname not in self.changed:
val = self.unserialize(
varobj.type or "string",
importVars[varobj.section].get(varname, ''))
if val and val != [[]] or varname in importVars[varobj.section]:
self.cache[varname] = val
@staticmethod
def serialize(varType, value):
"""
Serialize to string for ini file
"""
fixEmpty = lambda x: str(x) or "''"
# fixSpace = lambda x: "'%s'" % str(x) if " " in str(x) else str(x)
isListOrTuple = lambda x: type(x) in (list, tuple)
if isListOrTuple(value):
if "list" in varType:
return ",".join(map(fixEmpty, value))
elif "table" in varType:
return ",".join((":".join(map(fixEmpty, x)) for x in value))
return fixEmpty(value)
@staticmethod
def unserialize(varType, value):
"""
Unserialize form string for varname
"""
fixEmpty = lambda x: "" if x == "''" or x == '""' else x.strip()
def getList(delimeter=','):
def wrapper(val):
val = str(val)
if val == "":
return []
elif val.startswith('\n') or val.startswith(' \n'):
return [{x.split(':')[0]: list(map(str.strip, x.split(':', maxsplit=1)[1].split(','))) for x in val.split("\n") if len(x)>2}]
return [fixEmpty(x) for x in val.split(delimeter)]
return wrapper
if "list" in varType:
return getList()(value)
if "table" in varType:
return [getList(':')(x) for x in value.split(',')]
# if (isinstance(value, bytes)):
# value = str(value, "UTF-8")
return fixEmpty(value).strip("'").strip('"')
def ZipVars(self, *argvVarNames, **kw):
"""
Get zipped values of variables specified by list 'argvVarNames'
"""
if "humanreadable" in kw:
hr = kw['humanreadable']
return list(zip(*(self.Get(x, humanreadable=hr)if isinstance(x, str) else x for x in argvVarNames)))
else:
return list(zip(*(self.Get(x) if isinstance(x, str) else x for x in argvVarNames)))
def select(self, *fields, **kw):
"""
Новая функция выборки из табличных переменных
:param fields: необходимые поля
:param kw: аргумены выборки
:param limit: количество возвращаемых элементов
:param zip_vars: функция объединения переменных
:return:
"""
keywords = ["limit", "zip_vars"]
zip_vars = kw.get("zip_vars", self.ZipVars)
filter_chain = []
variables = list(fields)
for k in (x for x in kw if x not in keywords):
varname, op, funcname = k.partition("__")
if varname[2] != "_":
varname = varname.replace("_", ".", 1)
if not op:
funcname = "eq"
variables.append(varname)
func_maps = {
'eq': lambda x, y: lambda z: z[x] == y,
'ne': lambda x, y: lambda z: z[x] != y,
'in': lambda x, y: lambda z: z[x] in y,
'not_in': lambda x, y: lambda z: z[x] not in y,
'startswith': lambda x, y: lambda z: z[x].startswith(y)
}
filter_chain.append(func_maps[funcname](len(variables) - 1, kw[k]))
l = len(fields)
if l == 1:
ret_func = lambda x: x[0]
else:
ret_func = lambda x: x[:l]
def generator(limit):
c = 0
for x in zip_vars(*variables):
for f in filter_chain:
if not f(x):
break
else:
yield ret_func(x)
if limit and c >= limit:
break
else:
c += 1
limit = kw.get("limit", None)
if limit == 1:
for x in generator(limit):
return x
else:
if l == 1:
return ""
else:
return [""] * l
else:
return list(generator(limit))
def Select(self, selField, where="os_disk_dev", eq=None, ne=None,
_in=None, _notin=None, like=None, notlike=None, func=None,
_if=None,
sort=None, sortkey=None, limit=None, zipVars=None):
"""Select value from table variables"""
if zipVars is None:
zipVars = self.ZipVars
if func or _if:
if _if:
func = _if
if func.__code__.co_argcount > 1:
filterFunc = lambda x: func(*x[:func.__code__.co_argcount])
else:
filterFunc = func
elif eq is not None:
filterFunc = lambda x: x[0] == eq
elif ne is not None:
filterFunc = lambda x: x[0] != ne
elif _in is not None:
filterFunc = lambda x: x[0] in _in
elif _notin is not None:
filterFunc = lambda x: not x[0] in _notin
elif like is not None:
filterFunc = lambda x: re.compile(like).search(x[0])
elif notlike is not None:
filterFunc = lambda x: not re.compile(notlike).search(x[0])
else:
filterFunc = lambda x: x
if not type(where) in (tuple, list):
where = [where]
woffset = len(where)
if isinstance(selField, (tuple, list)):
count = len(selField) + woffset
mapFunc = lambda x: x[woffset:]
res = [x for x in zipVars(*(where + selField)) if filterFunc(x)]
else:
count = 1 + woffset
fields = where + [selField]
mapFunc = lambda x: x[woffset]
res = [x for x in zipVars(*(fields)) if filterFunc(x)]
if sort:
if "/" in sort:
sort, sortkey = sort.split('/')
sortkey = itemgetter(*map(int, sortkey.split(',')))
res.sort(key=sortkey, reverse=True if sort == "DESC" else False)
if limit == 1:
if not any(res):
res = [[""] * count]
return mapFunc(res[0])
else:
return [mapFunc(x) for x in res[:limit]]
class DataVars(SimpleDataVars):
"""
Class contains variables, perform creating variable objects by
class variable describer and variable filler.
Instance attributes:
moduleList : Dict
dictionary has section name with modules which describe variable
for this sectin
loadVariables : Dict
contains all initialized variables load from varObj and fillObj
requestVariables : List
contains var object which already run Get method. Use for detect
dependence of variables.
"""
defaultData = 'calculate.lib.variables'
reRightVar = re.compile(r'^(?:([a-z_]+)\.)?([a-z_0-9]+)$')
l = RLock()
def __init__(self):
super().__init__()
self.requestVariables = []
self.loadVariables = {}
self.allVars = {}
self.untrustedVariables = set()
self.refresh = []
self.groups = []
self.mapVarGroup = OrderedDict()
self.briefData = {}
self.filledVars = {}
self.defaultModule = "main"
self.importedModules = []
self.iniCache = {}
self.map_opt_vars = {}
@contextmanager
def useDefaultModule(self, modulename):
oldmodule = self.defaultModule
try:
self.defaultModule = modulename
yield self
finally:
self.defaultModule = oldmodule
def isModuleInstalled(self, module):
try:
importlib.import_module("calculate.%s.datavars" % module)
return True
except ImportError:
return False
def importData(self, data=None):
"""
Depricated function for support 3.0 utilities
"""
section = self.importVariables(data)
if data:
self.defaultModule = section
def importVariables(self, data=None):
"""
Import modules for variables. 'section' is section name, 'modList'
contains tuple with modules which discribe variable names and fill
methods
"""
# try import vars and fill modules specified by modList
if not data:
data = self.defaultData
try:
varModule = importlib.import_module(data)
except (ImportError, AttributeError) as e:
self.raiseModuleError(data, e)
return False
# get all Variable classes from module
# variable of class VariableClAction will be named cl_action
if hasattr(varModule, "section"):
section = varModule.section
else:
section = "main"
for varMod in [getattr(varModule, x) for x in dir(varModule)
if not x.startswith("_") and "Variable" not in x] + [varModule]:
for classname in filterfalse(
("ReadonlyVariable", "VariableInterface",
"Variable", "VariableError").__contains__,
(x for x in dir(varMod) if x.startswith("Variable"))):
varObj = getattr(varMod, classname)
varName = varObj.getVariableName()
if not varName in self.allVars:
self.allVars[varName] = (section, varObj)
self.importedModules.append(section)
return section
def raiseVariableNotFound(self, varname, *args, **kwargs):
# import traceback
# traceback.print_stack()
# print "Variable not found:", varname
raise DataVarsError(_("Variable %s not found") % varname)
def raiseModuleError(self, module, error, *args, **kwargs):
raise ImportModuleDataVarsError("\n".join([
_("Failed to import module %s") % module,
_("error") + ": " + str(error)]))
def loadVariable(self, varname, section=None):
"""
Initialize variable 'varname' from fillObj and varObj.
"""
if not self.allVars:
self.importVariables()
if section and section not in self.importedModules:
self.importVariables("calculate.%s.variables" % section)
self.flIniFile(onlySection=section)
if varname not in self.loadVariables:
if varname in self.allVars.keys():
section, varObj = self.allVars[varname]
newVar = varObj(parent=self, section=section)
if hasattr(newVar, "refresh"):
self.refresh.append(newVar)
self.loadVariables[varname] = newVar
else:
# this code using for raise exception or
# if replace raiseVariableNotFound method
# then return default value
self.loadVariables[varname] = \
self.raiseVariableNotFound(
varname, parent=self, section="main")
def exists(self, varName):
try:
self.Get(varName)
return True
except Exception:
return False
def defined(self, varName):
"""
Return exists or not varName
"""
return varName in self.allVars.keys()
def getIniVar(self, fullVarName):
"""
Get variable values from ini files. Variable specified by
'fullVarName' in format: section 'dot' variable. If specified
only variable then section will be 'main'
Example:
self.getIniVar('install.os_install_net_settings')
self.getIniVar('cl_action')
"""
calculateIniFiles = self.Get('main.cl_env_path')
section, dot, varname = fullVarName.rpartition('.')
if not section:
section = "main"
retVal = ""
for iniFile in calculateIniFiles:
if path.exists(iniFile):
config = ConfigParser(strict=False)
try:
config.read(iniFile, encoding="utf-8")
value = config[section][varname]
retVal = value
except ParsingError:
raise DataVarsError(_("Error in %s") % iniFile)
except KeyError:
pass
return retVal
def getRemoteInfo(self, envFile):
"""
Get information variables from envFile
Return dict:
section : { variable1 : value,
variable2 : value }
section2 : { variable : value }
"""
optionsInfo = {}
config = ConfigParser(strict=False)
config.read(envFile, encoding="utf-8")
allsect = config.sections()
for section in allsect:
options = {
# k.encode('utf-8'): convertStrListDict(v.encode('utf-8'))
k: convertStrListDict(v)
for k, v in config.items(section, raw=True)}
if options:
optionsInfo[section] = options
return optionsInfo
def flIniFile(self, raiseOnError=False, onlySection=None):
"""
Read variable values from ini files.
Return False if ini file is invalid.
"""
if not self.iniCache:
# get initialized section names
try:
make_profile = self.Get('main.cl_make_profile')
except VariableError as e:
action = self.Get('cl_action')
if not action or action in ['update_profile']:
return True
raise CriticalError(str(e))
if os.path.exists(make_profile):
profiledir = path.dirname(make_profile)
profiles = [('profile',
path.join(profiledir, os.readlink(make_profile)))]
else:
profiles = []
for iniName, iniFile in profiles + list(self.Get('main.cl_env_data')):
if path.exists(iniFile) or iniName == 'profile':
config = ConfigParser(strict=False)
if iniName == 'profile':
repos = RepositorySubstituting(self)
try:
config.read(addStdConfig(
searchProfile(iniFile, 'calculate.env',
repository_sub=repos)),
encoding="utf-8")
except ParsingError:
raise DataVarsError(
_("Error in profile calculate.env"))
else:
try:
config.read(iniFile, encoding="utf-8")
except ParsingError:
raise DataVarsError(
_("Error in %s") % iniFile)
iniSections = config.sections()
if not iniSections:
continue
if onlySection:
iniSections = tuple(set(iniSections) &
{str(onlySection)})
for section in iniSections:
for key, value in config.items(section, raw=True):
key = _u8(key)
fullkey = "%s.%s" % (section, key)
self.iniCache[fullkey] = value
if iniName != "profile":
self.filledVars[fullkey] = iniName
return True
def Get(self, varname, humanreadable=HumanReadable.No):
"""Threading safety Get"""
DataVars.l.acquire()
try:
var = self.__Get(varname, humanreadable)
finally:
DataVars.l.release()
return var
def isFromIni(self, varname):
"""
Check what value get from ini file
"""
section, varname = self.splitVarname(varname)
varname = "%s.%s" % (section, varname)
return self.filledVars[varname] if varname in self.filledVars else ""
def splitVarname(self, varname):
res = self.reRightVar.search(varname)
if res:
if res.group(1):
return res.groups()
else:
return self.defaultModule, res.group(2)
else:
raise DataVarsError(_("Wrong variable name %s") % varname)
def __Get(self, varname, humanreadable=HumanReadable.No):
"""
Get value of variable 'varname'
"""
try:
# tm = time.time()
section, varname = self.splitVarname(varname)
if (varname not in self.loadVariables or
section not in self.importedModules):
try:
self.loadVariable(varname, section=section)
except ImportModuleDataVarsError:
if varname.startswith("ac_"):
return ""
raise
varObj = self.loadVariables[varname]
# check section
if varObj.section not in (section, "main"):
self.raiseVariableNotFound(varname, parent=self,
section=section)
# if for this variable already use Get method
if varObj.invalid and varObj in self.requestVariables:
varnames = "-".join((x.name for x in self.requestVariables + [varObj]))
raise DataVarsError(
_("Loop dependence of variables '%s'") % varnames)
# add this variable for list of requested variables
if humanreadable in (HumanReadable.No, HumanReadable.Auto):
self.requestVariables.append(varObj)
if humanreadable is HumanReadable.Auto:
res = varObj.getHumanReadableAuto()
elif humanreadable is HumanReadable.Yes:
res = varObj._human()
else:
res = varObj._get()
if not humanreadable:
if self.requestVariables:
self.requestVariables.pop()
return res
except BaseException:
while len(self.requestVariables):
self.requestVariables.pop()
raise
def Check(self, varname, value):
"""
Check varialbe value
"""
# load variable if it isn't loaded
if not varname in self.loadVariables:
self.loadVariable(varname)
varObj = self.loadVariables[varname]
varObj.checkType(value)
varObj._check(value)
def Choice(self, varname):
"""
Get choice value of variable 'varname'
"""
# load variable if it isn't loaded
if varname not in self.loadVariables:
self.loadVariable(varname)
varObj = self.loadVariables[varname]
return varObj._choice()[0]
def Uncompatible(self, varname):
"""
Check on compatible change value of this variable
Empty string if variable changable or string with error
"""
# load variable if it isn't loaded
if varname not in self.loadVariables:
self.loadVariable(varname)
varObj = self.loadVariables[varname]
return varObj._uncompatible()
def getRequired(self, varname):
"""
Generate list variables using in uncompatible check
"""
added = []
def genReqs(varObj):
if not varObj in added:
yield varObj
for var in varObj.reqVars:
for dep in genReqs(var):
yield dep
if hasattr(varObj, "source") and varObj.source:
for varsource in map(self.getInfo, varObj.source):
for dep in genReqs(varsource):
yield dep
for dep in genReqs(self.getInfo(varname)):
added.append(dep)
return added
def ChoiceAndComments(self, varname):
if varname not in self.loadVariables:
self.loadVariable(varname)
varObj = self.loadVariables[varname]
return varObj._choice()
def getInfo(self, varname):
"""
Get info (label and other for variable)
"""
section, varname = self.splitVarname(varname)
if varname not in self.loadVariables:
self.loadVariable(varname, section=section or None)
return self.loadVariables[varname]
def Set(self, varname, varvalue, force=False):
"""
Set value 'varvalue' for variable 'varname'
"""
varObj = self.getInfo(varname)
if force or varObj.mode != READONLY:
varObj._set(varvalue, force)
if varObj.alias:
self.Set(varObj.alias, varObj.value, force)
else:
raise DataVarsError(
_("Attempting to rewrite readonly variable %s") %
varname)
def Invalidate(self, varname, force=False, onlySet=False):
"""
Invalidate value of variable. Force - invalidate also for set values
"""
if varname not in self.loadVariables:
self.loadVariable(varname)
varObj = self.loadVariables[varname]
if not onlySet or varObj.wasSet:
varObj.invalidate(force or onlySet)
def getEnvFileByLocation(self, location):
"""
Get env file path by location alias. This path append to value
of variable 'cl_chroot_path'.
Return False if alias not found.
"""
calculateIniFile = self.Select('cl_env_path',
where='cl_env_location',
eq=location, limit=1)
if calculateIniFile:
return pathJoin(self.Get('main.cl_chroot_path'), calculateIniFile)
else:
return False
def Write(self, varname, val=None, force=False, location='system',
header=False):
"""
Write variable 'varname' to ini file. If set 'val', then to
variable before write perform value changing. 'force' use for
change value of read-only variables. 'location' must be ('system',
'local', 'remote') use for specify type of ini file location ini.
'header' use for specify section in ini file. If header not
specified then use default for this value section.
"""
# set value to variable if specified value
if val is not None:
self.Set(varname, val, force=force)
else:
if header:
vname = "%s.%s" % (header, varname)
else:
vname = "varname"
val = self.Get(vname)
# get path to target ini file
iniFile = self.getEnvFileByLocation(location)
if iniFile is not False:
# make parent directories
makePath(path.dirname(iniFile))
# get variable section if not specified header
if not header:
header = self.loadVariables[varname].section
# write value to ini file
from . import cl_ini_parser
config = cl_ini_parser.iniParser(iniFile)
val = self.serialize(self.getInfo(varname).type,
val)
writeVal = convertStrListDict(val)
if writeVal != config.getVar(header, varname):
if not config.setVar(header, {varname: writeVal}):
raise DataVarsError(
_("Unable to write variable {varname} into "
"'{location}'").format(varname=varname,
location=location))
return True
else:
raise DataVarsError(_("Unable to find alias '%s' for the "
"template variables path") % location)
def Delete(self, varname, location='system', header=False):
"""
Delete variable 'varname' from ini file and refresh value.
'location' must be ('system', 'local', 'remote') use for specify
type of ini file location ini. 'header' use for specify section in
ini file. If header not specified then use default for this value
section.
"""
# get path to target ini file
iniFile = self.getEnvFileByLocation(location)
if iniFile is not False:
# make parent directories
makePath(path.dirname(iniFile))
# get variable section if not specified header
if not header:
if varname not in self.loadVariables:
self.loadVariable(varname)
header = self.loadVariables[varname].section
# write value to ini file
from . import cl_ini_parser
config = cl_ini_parser.iniParser(iniFile)
allVars = config.getAreaVars(header)
if allVars is False:
return False
if varname in allVars.keys():
retDelVar = config.delVar(header, varname)
# if success delete value and this section empty
# of ini file then remove it
return retDelVar and (config.getAreaVars(header) or
config.delArea(header))
return True
else:
raise DataVarsError(_("Unable to find alias '%s' for the "
"template variables path") % location)
def processRefresh(self):
for refreshVar in self.refresh:
if not refreshVar.invalid:
refreshVar.refresh()
def close(self):
for varObj in self.loadVariables.values():
if hasattr(varObj, "close"):
varObj.close()
elif hasattr(varObj, "value") and hasattr(varObj.value, "close"):
varObj.value.close()
def addGroup(self, groupName, normal=(), expert=(), brief=(),
next_label=None,
brief_force=(), hide=(), expert_label=None, image='',
custom_buttons=()):
"""
Добавить группу переменных для метода
groupName - название группы
normal - регулярные настройки
expert - расширенные настройки
hide - кортеж имен переменных, которые необходимо скрыть в brief
image - изображение
brief - кортеж имен переменных, значение которых необходимо отобразите
в выводе-подтверждении
brief_force - кортеж имен переменных, значение которых необходимо
отобразите в выводе-подтверждении, даже если параметр является
несовместимым
expert_label - текст вместо стандартной надписи для отображения
расширенных параметров
next_label - название кнопки для запуска действия или перехода к
следующей группе
custom_buttons - список кортежей (название кнопки, текст на кнопке,
значение (метод), элемент ("button")
"""
default_next_label = _("Next")
default_expert_label = _("Click for advanced settings")
self.groups.append(
{'name': groupName,
'normal': normal,
'expert': expert,
'hide': hide,
'image': image,
'brief': brief,
'custom_buttons': custom_buttons,
'brief_force': brief_force,
'expert_label': expert_label or default_expert_label,
'next_label': next_label or default_next_label})
lastIndex = len(self.groups) - 1
for var in chain(normal, expert):
varname = var.rpartition('.')[2]
self.mapVarGroup[varname] = lastIndex
objVar = self.getInfo(var)
if objVar.opt is not None:
for opname in objVar.opt:
self.map_opt_vars[opname] = varname
return self
def is_console_set(self, varname):
for op in self.Get('main.cl_console_args'):
if self.map_opt_vars.get(op, '') == varname:
return True
return False
def getGroups(self):
"""
Get groups variables
"""
return self.groups
def clearGroups(self):
"""
Clear group (using for drop translate)
"""
self.groups = []
self.mapVarGroup = OrderedDict()
def addBrief(self, next_label=None, text=None, image=""):
self.briefData["next"] = next_label
self.briefData["help"] = text
self.briefData["image"] = image
def getBrief(self):
return self.briefData
def _dependSort(self, inlist):
"""
Sort variable use "check_after" attribute
"""
moved = []
inlist = list(inlist)
while inlist:
for i, el in enumerate(islice(inlist, 1, None)):
if el.name in inlist[0].check_after:
inlist.insert(0, inlist.pop(i + 1))
if inlist[0].name in moved:
raise VariableError("Loop depends for %s on check" %
(",".join(moved)))
moved.append(inlist[0].name)
break
else:
yield inlist.pop(0)
if inlist:
moved = [inlist[0].name]
def _nocheckSort(self, inlist):
"""
Place check variable without check (standard check method) at first
"""
afterlist = []
for var in inlist:
if (not var.source and not var.check_after and
var.has_base_method("check") and
var.has_base_method("uncompatible")):
yield var
else:
afterlist.append(var)
for var in afterlist:
yield var
def fixWsdlTableValue(self, varObj, val):
"""
Fix data get by wsdl (list-list) type
"""
if varObj.type == "table":
# multiLists = list(filter(lambda x: "list-list" in x[1],
# enumerate(
# [self.getInfo(x).type for x in
# varObj.source])))
multiLists = [x for x in enumerate([self.getInfo(x).type for x in varObj.source]) if "list-list" in x[1]]
if multiLists:
val = list(zip_longest(*val, fillvalue=""))
for col, typecol in multiLists:
if col < len(val):
val[col] = [x.split(',')
if isinstance(x, str) else x for x
in val[col]]
val = list(zip_longest(*val, fillvalue=""))
return val
def plainList(self, *elements):
"""
Make plain list from many dimensions
"""
for w in elements:
if type(w.message) in (list, tuple):
for i in self.plainList(*w.message):
yield i
else:
yield w
def printGroup(self, info):
"""
Вывести группу переменных
"""
print("=DEBUG.SET=")
for vname in sorted((x for x
in info._type_info.keys() if x.lower() == x)):
print("|", vname, "=", getattr(info, vname))
print("=DEBUG.DATAVARS=")
for vname in self.mapVarGroup.keys():
print("|", vname, "=", self.Get(vname))
print("=DEBUG.END=")
def checkGroups(self, info, allvars=False, invalidators=None):
"""
Check variables in group or all in info
"""
def compare_choice(part_s, *options):
part_s = part_s.lower()
options = [x.lower() for x in options]
return any(x == part_s for x in options)
def get_val_from_choice(choicedata, val):
"""
Получить похожее значение
"""
if val not in [x[0] for x in choicedata]:
# result = list(filter(lambda x: compare_choice(val, x[0], x[1]),
# choicedata))
result = [x for x in choicedata if compare_choice(val, x[0], x[1])]
if len(result) > 1:
raise VariableError(
_("Ambiguous choice:%s") % formatListOr((('"%s"(%s)' % (x[1], x[0])
if x[0] != x[1] else x[0]) for x in result)))
elif result:
return result[0][0]
return val
if not self.groups:
return []
errors = []
if self.groups:
keys = list(self.mapVarGroup.keys())
else:
keys = sorted((x for x in info._type_info.keys() if x.lower() == x))
varsByKeys = [self.getInfo(x) for x in keys]
groupVars = []
# invalidate default variables
if hasattr(info, "Default") and info.Default:
default = info.Default
for varname in default:
self.Invalidate(varname, onlySet=True)
else:
default = []
# проверить переменные, при смене значения которых нужно сбросить
# указанные зависимые, даже если пользователь уже изменял их значения
# это может быть использово при groupmod
if invalidators:
for k, vars in invalidators.items():
if hasattr(info, k):
val = getattr(info,k)
if val is not None and val != self.Get(k):
for var in vars:
self.Invalidate(var, force=True)
# look over all vars
for var, varObj in map(lambda x: (x.name, x),
self._nocheckSort(self._dependSort(varsByKeys))):
# if info skipped and send None
if info is None:
val = None
else:
# if variable not exists in info data - skip it
if not hasattr(info, var):
continue
# get value of variable from info
val = getattr(info, var)
# if type(val) == unicode:
# val = _u8(val)
varSendFromClient = val is not None and var not in default
varGroupUntrusted = varObj.untrusted and var in groupVars
varAllUntrusted = varObj.untrusted and allvars
# if group variable is not defined and
# variable send from client
# datavars has groups of variables
if not groupVars and varSendFromClient and self.groups:
# get normal and expert variables from group
groupIndex = self.mapVarGroup[var]
groupVars = list(self.groups[groupIndex]["normal"]) + \
list(self.groups[groupIndex]["expert"])
if varSendFromClient or varGroupUntrusted or varAllUntrusted \
or not info:
try:
uncomperr = self.Uncompatible(var)
# get value for final or group check
if val is None:
if not uncomperr:
self.Check(var, self.Get(var))
else:
val = self.fixWsdlTableValue(varObj, val)
if varObj.type in ("choice", "choice-list"):
choicedata = self.ChoiceAndComments(var)
if any(choicedata):
if choicedata[1] is None:
choicedata = [choicedata[0],
choicedata[0]]
choicedata = list(zip(*choicedata))
if varObj.type == "choice":
vals = [val]
else:
vals = val
res = []
for val in vals:
val = get_val_from_choice(choicedata, val)
res.append(val)
if varObj.type == "choice":
val = res[0]
else:
val = res
self.Set(var, val)
# raise error for atempt set uncompatible variable
if uncomperr:
raise VariableError(uncomperr)
except (VariableError, DataVarsError) as e:
# assemble all variable errors
# messages = e if type(e) == list \
# else [e]
messages = e.exception_list if e.exception_list else [e]
mess = "\n".join((str(x) for x in messages))
mapError = {PasswordError: 'pwderror',
CommonVariableError: 'commonerror',
BuildAlreadyInstalledError: 'sysinstalled'}
for k, v in mapError.items():
if (isinstance(e, k) or e.exception_list and
all(isinstance(x, k) for x in
e.exception_list)):
typeError = v
break
else:
typeError = 'error'
errors.append({'type': typeError, 'field': var,
'field_obj': self.getInfo(
var) if var else "",
'message': mess})
except BaseException as e:
for i in traceback.format_exception(*sys.exc_info()):
print(i, end=' ')
errors.append({'type': 'error', 'field': var,
'message': str(e)})
return errors
def printDependence(self):
for key, val in self.loadVariables.items():
if val.reqVars:
print(key, "->", [x.name for x in val.reqVars])
def printVars(self, filt=lambda x: x):
for i in sorted(filter(filt, self.allVars.keys())):
print("{0:<40} {1}".format(i, self.Get(i, True)))
def reinit(self):
"""
Using for update variable translatable elements (example: label)
"""
for key, var in self.loadVariables.items():
var.init()
def printWrong(self, filt=lambda x: x):
print("!!!WRONG VARS!!!")
for i in sorted((x for x in self.allVars.keys() if filt(x))):
if self.getInfo(i).untrusted:
print("{0:<40} {1}".format(i, self.Get(i, True)))