Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

712 строки
30 KiB

# vim: fileencoding=utf-8
#
from ..variables.datavars import DependenceAPI, VariableWrapper, NamespaceNode
from ..variables.loader import Datavars
from collections import OrderedDict
from contextlib import contextmanager
from typing import Tuple, Union, Any
class ParameterError(Exception):
pass
class ValidationError(ParameterError):
pass
class CyclicValidationError(ValidationError):
def __init__(self, *queue):
self.queue = queue
def __str__(self):
return "Cyclic validation for parameters: {}".format(", ".join(
self.queue[:-1]))
class ParameterType:
'''Общий класс типов параметров.'''
descr = 'VALUE'
def process_value(self, value, parameter):
return value
class Integer(ParameterType):
'''Класс типа целочисленных параметров.'''
descr = 'INTEGER'
def process_value(self, value, parameter) -> int:
if isinstance(value, int):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the int type")
def __repr__(self):
return '<Integer Parameter Type>'
class String(ParameterType):
'''Класс типа строковых параметров.'''
descr = 'STRING'
def process_value(self, value, parameter) -> str:
if isinstance(value, str):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the string type")
def __repr__(self):
return '<String Parameter Type>'
class Bool(ParameterType):
'''Класс типа строковых параметров.'''
false_set = {'n', 'false', 'False', 'no'}
true_set = {'y', 'true', 'True', 'yes'}
descr = '[y/n]'
def process_value(self, value, parameter) -> bool:
if isinstance(value, bool):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the bool type")
def __repr__(self):
return '<Bool Parameter Type>'
class Float(ParameterType):
'''Класс типа вещественночисловых параметров.'''
descr = 'FLOAT'
def process_value(self, value, parameter) -> float:
if isinstance(value, float):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the float type")
def __repr__(self):
return '<Float Parameter Type>'
class Separator:
'''Класс объекта сепаратора, возможно, его не будет'''
def __repr__(self):
return "# # #"
class Choice(ParameterType):
'''Класс типа, представлющий собой любое значение из совокупности заданных.
'''
descr = 'CHOICES'
def __init__(self, choices=OrderedDict(), editable=False,
multichoice=False):
self.editable = editable
self.multichoice = multichoice
self.choices = choices
def add_choice(self, choice, comment):
self.choices.update({choice: comment})
def process_value(self, value, parameter):
if value in self.choices:
return value
else:
raise ValidationError(f"value '{value}' is not available in"
f" parameter '{parameter._name}' available"
f" values: '{', '.join(self.choices)}'")
def __repr__(self):
return (f'<Choice Parameter Type: choices={", ".join(self.choices)},'
f' editable={self.editable}, multichoice={self.multichoice}>')
class List(ParameterType):
descr = 'LIST'
def process_value(self, value, parameter):
if isinstance(value, list):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the list type")
def __repr__(self):
return ('<List Parameter Type>')
class TableValue:
'''Класс значения таблицы.'''
def __init__(self, key, fields, fill_function=lambda x: x):
self._fields = fields
self._values = OrderedDict()
# Родительский параметр.
self._parent = None
# Ключ и его индекс в массиве.
self.primary_key = key
# Устанавливаем комментарии стобцам таблицы, по умолчанию то же, что и
# название столбца.
self._fields_comments = OrderedDict()
for field in self._fields:
self._fields_comments.update({field: field})
# Функция для заполнения пустых полей таблицы.
self.fill = fill_function
# Функция для кастомизации уведомления об ошибке.
def set_error(value: str, field: str, available: list) -> None:
raise ParameterError(f"{field} '{value}' is not found.")
self.set_error = set_error
def set_comments(self, *comments):
'''Метод для установки комментариев к полям таблицы.'''
if len(comments) < len(self._fields):
raise ParameterError("Not enough values to comment table fields.")
elif len(comments) > len(self._fields):
raise ParameterError("Too much values to comment table fields.")
self._fields_comments = OrderedDict()
for field, comment in zip(self._fields, comments):
if comment is not None:
self._fields_comments[field] = comment
def change(self, *values):
'''Метод для добавления новых или изменения существующих строк таблицы.
'''
if len(values) < len(self._fields):
raise ParameterError("Not enough values to fill table row.")
elif len(values) > len(self._fields):
raise ParameterError("Too much values to fill table row.")
# Проверяем типы устанавливаемых значений.
validated_values = OrderedDict()
for value, (field, field_type) in zip(values,
self._fields.items()):
if value is None or self._parent is None:
validated_values.update({field: value})
else:
validated_values.update({field: field_type.process_value(
value,
self._parent)})
current_key = validated_values[self.primary_key]
if current_key in self._values:
self._values[current_key] = validated_values
elif self._parent is None or self._parent._parameter_type.expandable:
self._values.update({current_key: validated_values})
else:
self.set_error(current_key,
self._fields_comments[self.primary_key],
list(self._values.keys()))
self._values = self.fill(self._values)
def check_values_types(self):
'''Метод для проверки по типам всех значений таблицы.'''
for row in self._values.values():
for field, field_type in self._fields.items():
if row[field] is not None:
row[field] = field_type.process_value(row[field],
self._parent)
def get_for_var(self):
'''Метод для преобразования данного описания таблицы в ту, которая
приемлема для инициализации переменных.'''
output = []
for value in self._values.values():
output.append(value)
return output
class Table(ParameterType):
'''Метод реализующий тип таблиц.'''
descr = 'LIST'
def __init__(self, expandable=False):
self.expandable = expandable
def process_value(self, value: TableValue, parameter):
if isinstance(value, TableValue):
# Если родитель не установлен -- значит параметр был только что
# инициализирован. Устанавливаем его, и проводим полную проверку
# инициализированных значений таблицы.
if value._parent is None:
value._parent = parameter
value.check_values_types()
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the table type")
def __repr__(self):
return (f'<Table Parameter Type: expandable={self.expandable}>')
class File(ParameterType):
pass
class Password(ParameterType):
'''Класс типа строковых параметров.'''
descr = 'PASSWORD'
def __init__(self, param=False):
# TODO разобраться с параметрами, необходимыми для параметров типа
# Password.
self.param = param
def process_value(self, value, parameter) -> str:
if isinstance(value, str):
return value
raise ValidationError(f"Can not assign '{type(value)}' value to"
" parameter of the string type")
def __repr__(self):
return '<String Parameter Type>'
class Description:
'''Класс, содержащий описание параметра и способ его отображения.'''
# Словарь со способами отображения параметра в графическом интерфейсе.
representations = {}
def __init__(self, short='', full='', usage=''):
self.short = short
self.full = full
self.usage = usage
self._gui_repr = None
def initialize(self, parameter_name, parameter_type, shortname=None):
'''Метод для инициализации представления по указанной пользователем
информации, типу параметра и его имени.'''
if not self.short:
self.short = parameter_name
if not self.full:
self.full = self.short
if not self.usage:
if shortname:
self.usage = f"-{shortname} {parameter_type.descr}, "
self.usage = self.usage +\
f"--{parameter_name} {parameter_type.descr}"
self._gui_repr = self._get_gui_repr(parameter_type)
def _get_gui_repr(self, parameter_type):
'''Метод для получения по типу параметра способа его представления.'''
# TODO реализовать его, когда удастся выделить совокупность доступных
# способов представления.
return None
@property
def gui_repr(self):
return self._gui_repr
class BaseParameter:
'''Класс базовый класс всех параметров.'''
default_type = ParameterType
def __init__(self, name, group, desctiption: Description,
shortname=None, argv=None):
self._group = group
self._name = name
self._shortname = shortname
self._argv = argv
# Ссылка на экземпляр контейнера параметров.
self.container = None
self._value = None
# Допустимые значения параметра, если таковые имеются.
# self.choices = OrderedDict()
# Комментарии, если таковой необходим.
self.comment = None
# Флаг, указывающий на то, что значение параметра было изменено
# установлено пользователем с помощью метода set.
self._set_by_user = False
self._validated = False
if hasattr(self, 'type'):
self._parameter_type = self.type
else:
self._parameter_type = self.default_type()
# Добавляем описание и инициализируем его.
self._description = desctiption
self._description.initialize(self._name, self._parameter_type,
shortname=self._shortname)
# Комментарий, наличие которого указывает на неактивность параметра.
self.disactivity_comment = None
# Коллекции текущих подписок, аргументов, из которых эти подписки
# формируются и подписчиков.
self._subscriptions = {}
self._args = []
self._subscribers = []
# Флаги для проверки того необходимо ли проводить поиск подписок или
# подписчиков.
self._args_is_found = False
self._sets_is_found = False
# Метод для указания того, что в данный момент осуществляется расчет
# значения параметра.
self._calculating = False
self._validation = False
@property
def choices(self):
if isinstance(self._parameter_type, Choice):
return self._parameter_type.choices
@property
def disactivity(self):
return bool(self.disactivity_comment)
def validate(self, container, datavars, value) -> None:
'''Метод для проверки корректности параметра. Переопределяется при
создании нового типа параметра.'''
return None
def validate_value(self, value):
'''Метод для запуска валидации параметра с использованием
переопределенного метода валидации.'''
if self._validation:
raise CyclicValidationError(self._name)
with self._start_validation():
try:
self.validate(self.container, self.container.datavars, value)
except CyclicValidationError as error:
raise CyclicValidationError(self._name, *error.queue)
@contextmanager
def _start_validation(self):
'''Контекстный менеджер предназначеный для переключения параметра в
режим валидации.'''
try:
self._validation = True
yield self
finally:
self._validation = False
def set(self, value) -> None:
'''Метод для установки значения параметра.'''
if isinstance(self._parameter_type, Table):
print('VALUE:', self.value)
self._value.change(*value)
else:
self._value = value
self._set_by_user = True
# Назначаем новые значения подписанным переменным.
self._set_variables()
@property
def value(self):
'''Метод для получения значения параметра. Выдается или дефолтное или
установленное пользователем.'''
if not self._set_by_user:
if self._value is None and not self._calculating:
self._value = self.update_value()
return self._value
def check_value_type(self, value):
'''Метод для запуска проверки значения по типу параметра.'''
return self._parameter_type.process_value(value, self)
def bind(self, *variables) -> None:
'''Метод для настройки взаимосвязи параметра с переменными.'''
self._args = variables
return self
def bind_method(self, *variables):
'''Метод переопределяемый для создания нового типа параметра. Должен
возвращать вычисленное значение и активность параметра None -- активен,
comment -- дизактивирован.'''
if variables:
return variables[0].value, None
else:
return None, None
def set_to(self, *variables):
'''Метод для добавления переменных, которым будет установлено значение
параметра.'''
self._subscribers = variables
return self
def _set_variables(self):
'''Метод для установки значения параметра переменным-подписчикам.'''
# Только если значение установлено пользователем.
if self._set_by_user:
if not self._sets_is_found:
self._subscribers = self.find_variables(self._subscribers)
self._sets_is_found = True
for subscriber in self._subscribers:
if isinstance(self._parameter_type, Table):
subscriber.set(self.value.get_for_var())
else:
subscriber.set(self.value)
def _invalidate(self):
'''Метод, через который переменные сообщают о необходимости пересчитать
значение параметра с дефолтным значением.'''
value = self.update_value()
if not self._set_by_user:
self._value = value
self._validated = False
def update_value(self):
'''Метод для получения дефолтного значения.'''
if not self._args_is_found:
self._args = self.find_variables(self._args)
self._args_is_found = True
with self._start_calculate():
value, subscriptions = self._calculate_binding()
# Обновляем подписки. Сначала убираем лишние.
for subscription in self._subscriptions:
if subscription not in subscriptions:
subscription.subscribers.remove(self)
# Теперь добавляем новые.
for subscription in subscriptions:
subscription.subscribers.add(self)
self._subscriptions = subscriptions
return self._parameter_type.process_value(value, self)
def _calculate_binding(self):
'''Метод для расчета значения параметра по умолчанию, используя
заданные переменные.'''
subscriptions = set()
args = tuple(VariableWrapper(arg, subscriptions) for arg in self._args)
try:
value, self.disactivity_comment = self.bind_method(*args)
return value, subscriptions
except Exception as error:
raise ParameterError('can not calculate using dependencies: {}'
' reason: {}'.format(', '.join(
[subscription.get_fullname()
for subscription
in self._args]),
str(error)))
@contextmanager
def _start_calculate(self):
'''Менеджер контекста устанавливающий флаг, указывающий, что данная
переменная в состоянии расчета.'''
try:
self._calculating = True
yield self
finally:
self._calculating = False
def find_variables(self, variables):
'''Метод для поиска переменных, по их названию. Обходит список
переменных и заменяет все строки с запросами в списке на результат
поиска по запросу.'''
output = []
for variable in variables:
if isinstance(variable, str):
variable = DependenceAPI.find_variable(variable,
self.container.datavars)
output.append(variable)
return output
def __repr__(self):
description = self._description.short or self._description.full
return (f"<Parameter: '{self._name}' = "
f"{self._value if self._value else 'NULL'}: "
f"{description}>")
class GroupWrapper:
'''Класс обертки параметров группы, который возможно будет использоваться
для организации работы графического интерфейса.'''
def __init__(self, group, container):
self.values = OrderedDict()
self.container = container
def set_group(self):
'''Метод для установки значений параметров, осносящихся к группе.'''
self.container.set_parameters(self.values)
class Parameters:
'''Класс контейнера параметров.'''
def __init__(self, datavars: Datavars, check_order=[]):
self._datavars = datavars
self._parameters = OrderedDict()
self._validation_dict = OrderedDict()
self._order = check_order
# Флаг указывающий на то, что в данный момент идет валидация параметров
self._validation = False
# Занятые имена параметров.
self._names = set()
# Список позиционных аргументов.
self._argvs = []
# Словарь функций для взаимодействия графического клиента с группами
# параметров.
self._gui_helpers = {}
def add(self, *parameters: Tuple[BaseParameter]):
'''Метод для добавления некоторой совокупности параметров.'''
for parameter in parameters:
self.add_parameter(parameter)
def set_order(self, *parameters):
'''Метод для установки порядка проверки параметров.'''
self._order = []
for parameter in parameters:
if isinstance(parameter, str):
parameter = self[parameter]
self._order.append(parameter)
for parameter in self:
if parameter not in self._order:
self._order.append(parameter)
def add_parameter(self, parameter: BaseParameter) -> None:
'''Метод для добавления параметров в контейнер.'''
if parameter._name in self._names:
raise ParameterError(f"Can not add parameter '{parameter._name}'."
" Such name has already been added. ")
elif parameter._shortname and parameter._shortname in self._names:
raise ParameterError(f"Can not add parameter '{parameter._name}'."
" Such name has already been added. ")
elif (parameter._argv is not None and
len(self._argvs) > parameter._argv and
self._argvs[parameter._argv]):
raise ParameterError(f"Can not add positional parameter "
f"'{parameter._name}'. Position"
f"'{parameter._argv}' is already taken.")
parameter.container = self
if parameter._group in self._parameters:
self._parameters[parameter._group].append(parameter)
else:
self._parameters[parameter._group] = [parameter]
self._names.add(parameter._name)
if parameter._shortname:
self._names.add(parameter._shortname)
if parameter._argv is not None:
# Если нужно, расширяем список позиций.
if len(self._argvs) <= parameter._argv:
while(len(self._argvs) <= parameter._argv):
self._argvs.append(None)
self._argvs[parameter._argv] = parameter
self._order.append(parameter)
parameter.update_value()
def set_parameters(self, parameters: OrderedDict) -> None:
'''Метод для установки значений некоторого числа параметров их
проверки.'''
# Сначала проверяем все значения по типам и составляем словарь
# валидации.
for parameter_name, value in parameters.items():
parameter = self[parameter_name]
if not parameter.disactivity_comment:
self._validation_dict[parameter] =\
parameter._parameter_type.process_value(value,
parameter)
parameter._validated = False
# Теперь запускаем переопределенный метод для проверки параметров.
with self._run_validation():
self.validate_parameters(self._validation_dict)
def validate_parameters(self, parameters: OrderedDict):
'''Метод для запуска валидации параметров.'''
for parameter in self._order:
if parameter in self._validation_dict:
if (not parameter._validated
and not parameter.disactivity_comment):
parameter.validate_value(self._validation_dict[parameter])
parameter._validated = True
parameter.set(self._validation_dict[parameter])
def validate_all(self):
for parameter in self._order:
if (not parameter._validated and
not parameter.disactivity_comment):
parameter.validate_value(self._validation_dict[parameter])
parameter._validated = True
parameter.set(self._validation_dict[parameter])
@contextmanager
def _run_validation(self):
'''Контекстный менеджер предназначенный для перевода контейнера
параметров в режим валидации.'''
try:
self._validation = True
yield self
finally:
self._validation = False
self._validation_dict = OrderedDict()
def get_group_parameters(self, group_name: str):
'''Метод для получения списка параметров, относящихся к указанной
группе.'''
return self._parameters[group_name]
def get_descriptions(self) -> dict:
'''Метод для получения словаря с описанием параметров.'''
output = OrderedDict()
for group, parameters in self._parameters.items():
for parameter in parameters:
usage = ' ' + parameter._description.usage
full = parameter._description.full
if len(usage) > 23:
usage = usage + '\n'
full = ' ' * 24 + full
elif len(usage) == 23:
usage = usage + ' '
else:
usage = usage.ljust(23) + ' '
parameter_description = (f"{usage}{full}")
if group in output:
output[group].append(parameter_description)
else:
output[group] = [parameter_description]
return output
@property
def datavars(self) -> Union[Datavars, NamespaceNode]:
return self._datavars
def __getitem__(self, name: str) -> Any:
for group, parameters in self._parameters.items():
for parameter in parameters:
if parameter._name == name or parameter._shortname == name:
if self._validation and not parameter._validated:
# В режиме валидации параметров, при попытке получить
# параметр сначала проверяем необходимость его проверки
# И если нужно проверяем его.
parameter_value =\
self._validation_dict.get(parameter, None) or\
parameter._value
parameter.validate_value(parameter_value)
parameter._validated = True
return parameter
else:
return parameter
raise ParameterError(f"No such parameter '{name}'.")
def __iter__(self):
for parameters in self._parameters.values():
for parameter in parameters:
yield parameter