Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

712 righe
30 KiB

Questo file contiene caratteri Unicode ambigui!

Questo file contiene caratteri Unicode ambigui che possono essere confusi con altri nella tua localizzazione attuale. Se il tuo caso di utilizzo è intenzionale e legittimo, puoi tranquillamente ignorare questo avviso. Usa il pulsante Escape per evidenziare questi caratteri.

# vim: fileencoding=utf-8
#
from ..variables.datavars import DependenceAPI, VariableWrapper, NamespaceNode
from ..variables.loader import Datavars
from collections import OrderedDict
from contextlib import contextmanager
from typing import Tuple, Union, Any
class ParameterError(Exception):
pass
class ValidationError(ParameterError):
pass
class CyclicValidationError(ValidationError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic validation for parameters: {}".format(", ".join(
self.queue[:-1]))
class ParameterType:
'''Общий класс типов параметров.'''
descr = 'VALUE'
def process_value(self, value, parameter):
return value
class Integer(ParameterType):
'''Класс типа целочисленных параметров.'''
descr = 'INTEGER'
def process_value(self, value, parameter) -> int:
if isinstance(value, int):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the int type")
def __repr__(self):
return '<Integer Parameter Type>'
class String(ParameterType):
'''Класс типа строковых параметров.'''
descr = 'STRING'
def process_value(self, value, parameter) -> str:
if isinstance(value, str):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the string type")
def __repr__(self):
return '<String Parameter Type>'
class Bool(ParameterType):
'''Класс типа строковых параметров.'''
false_set = {'n', 'false', 'False', 'no'}
true_set = {'y', 'true', 'True', 'yes'}
descr = '[y/n]'
def process_value(self, value, parameter) -> bool:
if isinstance(value, bool):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the bool type")
def __repr__(self):
return '<Bool Parameter Type>'
class Float(ParameterType):
'''Класс типа вещественночисловых параметров.'''
descr = 'FLOAT'
def process_value(self, value, parameter) -> float:
if isinstance(value, float):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the float type")
def __repr__(self):
return '<Float Parameter Type>'
class Separator:
'''Класс объекта сепаратора, возможно, его не будет'''
def __repr__(self):
return "# # #"
class Choice(ParameterType):
'''Класс типа, представлющий собой любое значение из совокупности заданных.
'''
descr = 'CHOICES'
def __init__(self, choices=OrderedDict(), editable=False,
multichoice=False):
self.editable = editable
self.multichoice = multichoice
self.choices = choices
def add_choice(self, choice, comment):
self.choices.update({choice: comment})
def process_value(self, value, parameter):
if value in self.choices:
return value
else:
raise ValidationError(f"value '{value}' is not available in"
f" parameter '{parameter._name}' available"
f" values: '{', '.join(self.choices)}'")
def __repr__(self):
return (f'<Choice Parameter Type: choices={", ".join(self.choices)},'
f' editable={self.editable}, multichoice={self.multichoice}>')
class List(ParameterType):
descr = 'LIST'
def process_value(self, value, parameter):
if isinstance(value, list):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the list type")
def __repr__(self):
return ('<List Parameter Type>')
class TableValue:
'''Класс значения таблицы.'''
def __init__(self, key, fields, fill_function=lambda x: x):
self._fields = fields
self._values = OrderedDict()
# Родительский параметр.
self._parent = None
# Ключ и его индекс в массиве.
self.primary_key = key
# Устанавливаем комментарии стобцам таблицы, по умолчанию то же, что и
# название столбца.
self._fields_comments = OrderedDict()
for field in self._fields:
self._fields_comments.update({field: field})
# Функция для заполнения пустых полей таблицы.
self.fill = fill_function
# Функция для кастомизации уведомления об ошибке.
def set_error(value: str, field: str, available: list) -> None:
raise ParameterError(f"{field} '{value}' is not found.")
self.set_error = set_error
def set_comments(self, *comments):
'''Метод для установки комментариев к полям таблицы.'''
if len(comments) < len(self._fields):
raise ParameterError("Not enough values to comment table fields.")
elif len(comments) > len(self._fields):
raise ParameterError("Too much values to comment table fields.")
self._fields_comments = OrderedDict()
for field, comment in zip(self._fields, comments):
if comment is not None:
self._fields_comments[field] = comment
def change(self, *values):
'''Метод для добавления новых или изменения существующих строк таблицы.
'''
if len(values) < len(self._fields):
raise ParameterError("Not enough values to fill table row.")
elif len(values) > len(self._fields):
raise ParameterError("Too much values to fill table row.")
# Проверяем типы устанавливаемых значений.
validated_values = OrderedDict()
for value, (field, field_type) in zip(values,
self._fields.items()):
if value is None or self._parent is None:
validated_values.update({field: value})
else:
validated_values.update({field: field_type.process_value(
value,
self._parent)})
current_key = validated_values[self.primary_key]
if current_key in self._values:
self._values[current_key] = validated_values
elif self._parent is None or self._parent._parameter_type.expandable:
self._values.update({current_key: validated_values})
else:
self.set_error(current_key,
self._fields_comments[self.primary_key],
list(self._values.keys()))
self._values = self.fill(self._values)
def check_values_types(self):
'''Метод для проверки по типам всех значений таблицы.'''
for row in self._values.values():
for field, field_type in self._fields.items():
if row[field] is not None:
row[field] = field_type.process_value(row[field],
self._parent)
def get_for_var(self):
'''Метод для преобразования данного описания таблицы в ту, которая
приемлема для инициализации переменных.'''
output = []
for value in self._values.values():
output.append(value)
return output
class Table(ParameterType):
'''Метод реализующий тип таблиц.'''
descr = 'LIST'
def __init__(self, expandable=False):
self.expandable = expandable
def process_value(self, value: TableValue, parameter):
if isinstance(value, TableValue):
# Если родитель не установлен -- значит параметр был только что
# инициализирован. Устанавливаем его, и проводим полную проверку
# инициализированных значений таблицы.
if value._parent is None:
value._parent = parameter
value.check_values_types()
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the table type")
def __repr__(self):
return (f'<Table Parameter Type: expandable={self.expandable}>')
class File(ParameterType):
pass
class Password(ParameterType):
'''Класс типа строковых параметров.'''
descr = 'PASSWORD'
def __init__(self, param=False):
# TODO разобраться с параметрами, необходимыми для параметров типа
# Password.
self.param = param
def process_value(self, value, parameter) -> str:
if isinstance(value, str):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the string type")
def __repr__(self):
return '<String Parameter Type>'
class Description:
'''Класс, содержащий описание параметра и способ его отображения.'''
# Словарь со способами отображения параметра в графическом интерфейсе.
representations = {}
def __init__(self, short='', full='', usage=''):
self.short = short
self.full = full
self.usage = usage
self._gui_repr = None
def initialize(self, parameter_name, parameter_type, shortname=None):
'''Метод для инициализации представления по указанной пользователем
информации, типу параметра и его имени.'''
if not self.short:
self.short = parameter_name
if not self.full:
self.full = self.short
if not self.usage:
if shortname:
self.usage = f"-{shortname} {parameter_type.descr}, "
self.usage = self.usage +\
f"--{parameter_name} {parameter_type.descr}"
self._gui_repr = self._get_gui_repr(parameter_type)
def _get_gui_repr(self, parameter_type):
'''Метод для получения по типу параметра способа его представления.'''
# TODO реализовать его, когда удастся выделить совокупность доступных
# способов представления.
return None
@property
def gui_repr(self):
return self._gui_repr
class BaseParameter:
'''Класс базовый класс всех параметров.'''
default_type = ParameterType
def __init__(self, name, group, desctiption: Description,
shortname=None, argv=None):
self._group = group
self._name = name
self._shortname = shortname
self._argv = argv
# Ссылка на экземпляр контейнера параметров.
self.container = None
self._value = None
# Допустимые значения параметра, если таковые имеются.
# self.choices = OrderedDict()
# Комментарии, если таковой необходим.
self.comment = None
# Флаг, указывающий на то, что значение параметра было изменено
# установлено пользователем с помощью метода set.
self._set_by_user = False
self._validated = False
if hasattr(self, 'type'):
self._parameter_type = self.type
else:
self._parameter_type = self.default_type()
# Добавляем описание и инициализируем его.
self._description = desctiption
self._description.initialize(self._name, self._parameter_type,
shortname=self._shortname)
# Комментарий, наличие которого указывает на неактивность параметра.
self.disactivity_comment = None
# Коллекции текущих подписок, аргументов, из которых эти подписки
# формируются и подписчиков.
self._subscriptions = {}
self._args = []
self._subscribers = []
# Флаги для проверки того необходимо ли проводить поиск подписок или
# подписчиков.
self._args_is_found = False
self._sets_is_found = False
# Метод для указания того, что в данный момент осуществляется расчет
# значения параметра.
self._calculating = False
self._validation = False
@property
def choices(self):
if isinstance(self._parameter_type, Choice):
return self._parameter_type.choices
@property
def disactivity(self):
return bool(self.disactivity_comment)
def validate(self, container, datavars, value) -> None:
'''Метод для проверки корректности параметра. Переопределяется при
создании нового типа параметра.'''
return None
def validate_value(self, value):
'''Метод для запуска валидации параметра с использованием
переопределенного метода валидации.'''
if self._validation:
raise CyclicValidationError(self._name)
with self._start_validation():
try:
self.validate(self.container, self.container.datavars, value)
except CyclicValidationError as error:
raise CyclicValidationError(self._name, *error.queue)
@contextmanager
def _start_validation(self):
'''Контекстный менеджер предназначеный для переключения параметра в
режим валидации.'''
try:
self._validation = True
yield self
finally:
self._validation = False
def set(self, value) -> None:
'''Метод для установки значения параметра.'''
if isinstance(self._parameter_type, Table):
print('VALUE:', self.value)
self._value.change(*value)
else:
self._value = value
self._set_by_user = True
# Назначаем новые значения подписанным переменным.
self._set_variables()
@property
def value(self):
'''Метод для получения значения параметра. Выдается или дефолтное или
установленное пользователем.'''
if not self._set_by_user:
if self._value is None and not self._calculating:
self._value = self.update_value()
return self._value
def check_value_type(self, value):
'''Метод для запуска проверки значения по типу параметра.'''
return self._parameter_type.process_value(value, self)
def bind(self, *variables) -> None:
'''Метод для настройки взаимосвязи параметра с переменными.'''
self._args = variables
return self
def bind_method(self, *variables):
'''Метод переопределяемый для создания нового типа параметра. Должен
возвращать вычисленное значение и активность параметра None -- активен,
comment -- дизактивирован.'''
if variables:
return variables[0].value, None
else:
return None, None
def set_to(self, *variables):
'''Метод для добавления переменных, которым будет установлено значение
параметра.'''
self._subscribers = variables
return self
def _set_variables(self):
'''Метод для установки значения параметра переменным-подписчикам.'''
# Только если значение установлено пользователем.
if self._set_by_user:
if not self._sets_is_found:
self._subscribers = self.find_variables(self._subscribers)
self._sets_is_found = True
for subscriber in self._subscribers:
if isinstance(self._parameter_type, Table):
subscriber.set(self.value.get_for_var())
else:
subscriber.set(self.value)
def _invalidate(self):
'''Метод, через который переменные сообщают о необходимости пересчитать
значение параметра с дефолтным значением.'''
value = self.update_value()
if not self._set_by_user:
self._value = value
self._validated = False
def update_value(self):
'''Метод для получения дефолтного значения.'''
if not self._args_is_found:
self._args = self.find_variables(self._args)
self._args_is_found = True
with self._start_calculate():
value, subscriptions = self._calculate_binding()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in subscriptions:
subscription.subscribers.add(self)
self._subscriptions = subscriptions
return self._parameter_type.process_value(value, self)
def _calculate_binding(self):
'''Метод для расчета значения параметра по умолчанию, используя
заданные переменные.'''
subscriptions = set()
args = tuple(VariableWrapper(arg, subscriptions) for arg in self._args)
try:
value, self.disactivity_comment = self.bind_method(*args)
return value, subscriptions
except Exception as error:
raise ParameterError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self._calculating = True
yield self
finally:
self._calculating = False
def find_variables(self, variables):
'''Метод для поиска переменных, по их названию. Обходит список
переменных и заменяет все строки с запросами в списке на результат
поиска по запросу.'''
output = []
for variable in variables:
if isinstance(variable, str):
variable = DependenceAPI.find_variable(variable,
self.container.datavars)
output.append(variable)
return output
def __repr__(self):
description = self._description.short or self._description.full
return (f"<Parameter: '{self._name}' = "
f"{self._value if self._value else 'NULL'}: "
f"{description}>")
class GroupWrapper:
'''Класс обертки параметров группы, который возможно будет использоваться
для организации работы графического интерфейса.'''
def __init__(self, group, container):
self.values = OrderedDict()
self.container = container
def set_group(self):
'''Метод для установки значений параметров, осносящихся к группе.'''
self.container.set_parameters(self.values)
class Parameters:
'''Класс контейнера параметров.'''
def __init__(self, datavars: Datavars, check_order=[]):
self._datavars = datavars
self._parameters = OrderedDict()
self._validation_dict = OrderedDict()
self._order = check_order
# Флаг указывающий на то, что в данный момент идет валидация параметров
self._validation = False
# Занятые имена параметров.
self._names = set()
# Список позиционных аргументов.
self._argvs = []
# Словарь функций для взаимодействия графического клиента с группами
# параметров.
self._gui_helpers = {}
def add(self, *parameters: Tuple[BaseParameter]):
'''Метод для добавления некоторой совокупности параметров.'''
for parameter in parameters:
self.add_parameter(parameter)
def set_order(self, *parameters):
'''Метод для установки порядка проверки параметров.'''
self._order = []
for parameter in parameters:
if isinstance(parameter, str):
parameter = self[parameter]
self._order.append(parameter)
for parameter in self:
if parameter not in self._order:
self._order.append(parameter)
def add_parameter(self, parameter: BaseParameter) -> None:
'''Метод для добавления параметров в контейнер.'''
if parameter._name in self._names:
raise ParameterError(f"Can not add parameter '{parameter._name}'."
" Such name has already been added. ")
elif parameter._shortname and parameter._shortname in self._names:
raise ParameterError(f"Can not add parameter '{parameter._name}'."
" Such name has already been added. ")
elif (parameter._argv is not None and
len(self._argvs) > parameter._argv and
self._argvs[parameter._argv]):
raise ParameterError(f"Can not add positional parameter "
f"'{parameter._name}'. Position"
f"'{parameter._argv}' is already taken.")
parameter.container = self
if parameter._group in self._parameters:
self._parameters[parameter._group].append(parameter)
else:
self._parameters[parameter._group] = [parameter]
self._names.add(parameter._name)
if parameter._shortname:
self._names.add(parameter._shortname)
if parameter._argv is not None:
# Если нужно, расширяем список позиций.
if len(self._argvs) <= parameter._argv:
while(len(self._argvs) <= parameter._argv):
self._argvs.append(None)
self._argvs[parameter._argv] = parameter
self._order.append(parameter)
parameter.update_value()
def set_parameters(self, parameters: OrderedDict) -> None:
'''Метод для установки значений некоторого числа параметров их
проверки.'''
# Сначала проверяем все значения по типам и составляем словарь
# валидации.
for parameter_name, value in parameters.items():
parameter = self[parameter_name]
if not parameter.disactivity_comment:
self._validation_dict[parameter] =\
parameter._parameter_type.process_value(value,
parameter)
parameter._validated = False
# Теперь запускаем переопределенный метод для проверки параметров.
with self._run_validation():
self.validate_parameters(self._validation_dict)
def validate_parameters(self, parameters: OrderedDict):
'''Метод для запуска валидации параметров.'''
for parameter in self._order:
if parameter in self._validation_dict:
if (not parameter._validated
and not parameter.disactivity_comment):
parameter.validate_value(self._validation_dict[parameter])
parameter._validated = True
parameter.set(self._validation_dict[parameter])
def validate_all(self):
for parameter in self._order:
if (not parameter._validated and
not parameter.disactivity_comment):
parameter.validate_value(self._validation_dict[parameter])
parameter._validated = True
parameter.set(self._validation_dict[parameter])
@contextmanager
def _run_validation(self):
'''Контекстный менеджер предназначенный для перевода контейнера
параметров в режим валидации.'''
try:
self._validation = True
yield self
finally:
self._validation = False
self._validation_dict = OrderedDict()
def get_group_parameters(self, group_name: str):
'''Метод для получения списка параметров, относящихся к указанной
группе.'''
return self._parameters[group_name]
def get_descriptions(self) -> dict:
'''Метод для получения словаря с описанием параметров.'''
output = OrderedDict()
for group, parameters in self._parameters.items():
for parameter in parameters:
usage = ' ' + parameter._description.usage
full = parameter._description.full
if len(usage) > 23:
usage = usage + '\n'
full = ' ' * 24 + full
elif len(usage) == 23:
usage = usage + ' '
else:
usage = usage.ljust(23) + ' '
parameter_description = (f"{usage}{full}")
if group in output:
output[group].append(parameter_description)
else:
output[group] = [parameter_description]
return output
@property
def datavars(self) -> Union[Datavars, NamespaceNode]:
return self._datavars
def __getitem__(self, name: str) -> Any:
for group, parameters in self._parameters.items():
for parameter in parameters:
if parameter._name == name or parameter._shortname == name:
if self._validation and not parameter._validated:
# В режиме валидации параметров, при попытке получить
# параметр сначала проверяем необходимость его проверки
# И если нужно проверяем его.
parameter_value =\
self._validation_dict.get(parameter, None) or\
parameter._value
parameter.validate_value(parameter_value)
parameter._validated = True
return parameter
else:
return parameter
raise ParameterError(f"No such parameter '{name}'.")
def __iter__(self):
for parameters in self._parameters.values():
for parameter in parameters:
yield parameter