# vim: fileencoding=utf-8 # from ..variables.datavars import DependenceAPI, VariableWrapper, NamespaceNode from ..variables.loader import Datavars from collections import OrderedDict from contextlib import contextmanager from typing import Tuple, Union, Any class ParameterError(Exception): pass class ValidationError(ParameterError): pass class CyclicValidationError(ValidationError): def __init__(self, *queue): self.queue = queue def __str__(self): return "Cyclic validation for parameters: {}".format(", ".join( self.queue[:-1])) class ParameterType: '''Общий класс типов параметров.''' descr = 'VALUE' def process_value(self, value, parameter): return value class Integer(ParameterType): '''Класс типа целочисленных параметров.''' descr = 'INTEGER' def process_value(self, value, parameter) -> int: if isinstance(value, int): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the int type") def __repr__(self): return '' class String(ParameterType): '''Класс типа строковых параметров.''' descr = 'STRING' def process_value(self, value, parameter) -> str: if isinstance(value, str): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the string type") def __repr__(self): return '' class Bool(ParameterType): '''Класс типа строковых параметров.''' false_set = {'n', 'false', 'False', 'no'} true_set = {'y', 'true', 'True', 'yes'} descr = '[y/n]' def process_value(self, value, parameter) -> bool: if isinstance(value, bool): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the bool type") def __repr__(self): return '' class Float(ParameterType): '''Класс типа вещественночисловых параметров.''' descr = 'FLOAT' def process_value(self, value, parameter) -> float: if isinstance(value, float): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the float type") def __repr__(self): return '' class Separator: '''Класс объекта сепаратора, возможно, его не будет''' def __repr__(self): return "# # #" class Choice(ParameterType): '''Класс типа, представлющий собой любое значение из совокупности заданных. ''' descr = 'CHOICES' def __init__(self, choices=OrderedDict(), editable=False, multichoice=False): self.editable = editable self.multichoice = multichoice self.choices = choices def add_choice(self, choice, comment): self.choices.update({choice: comment}) def process_value(self, value, parameter): if value in self.choices: return value else: raise ValidationError(f"value '{value}' is not available in" f" parameter '{parameter._name}' available" f" values: '{', '.join(self.choices)}'") def __repr__(self): return (f'') class List(ParameterType): descr = 'LIST' def process_value(self, value, parameter): if isinstance(value, list): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the list type") def __repr__(self): return ('') class TableValue: '''Класс значения таблицы.''' def __init__(self, key, fields, fill_function=lambda x: x): self._fields = fields self._values = OrderedDict() # Родительский параметр. self._parent = None # Ключ и его индекс в массиве. self.primary_key = key # Устанавливаем комментарии стобцам таблицы, по умолчанию то же, что и # название столбца. self._fields_comments = OrderedDict() for field in self._fields: self._fields_comments.update({field: field}) # Функция для заполнения пустых полей таблицы. self.fill = fill_function # Функция для кастомизации уведомления об ошибке. def set_error(value: str, field: str, available: list) -> None: raise ParameterError(f"{field} '{value}' is not found.") self.set_error = set_error def set_comments(self, *comments): '''Метод для установки комментариев к полям таблицы.''' if len(comments) < len(self._fields): raise ParameterError("Not enough values to comment table fields.") elif len(comments) > len(self._fields): raise ParameterError("Too much values to comment table fields.") self._fields_comments = OrderedDict() for field, comment in zip(self._fields, comments): if comment is not None: self._fields_comments[field] = comment def change(self, *values): '''Метод для добавления новых или изменения существующих строк таблицы. ''' if len(values) < len(self._fields): raise ParameterError("Not enough values to fill table row.") elif len(values) > len(self._fields): raise ParameterError("Too much values to fill table row.") # Проверяем типы устанавливаемых значений. validated_values = OrderedDict() for value, (field, field_type) in zip(values, self._fields.items()): if value is None or self._parent is None: validated_values.update({field: value}) else: validated_values.update({field: field_type.process_value( value, self._parent)}) current_key = validated_values[self.primary_key] if current_key in self._values: self._values[current_key] = validated_values elif self._parent is None or self._parent._parameter_type.expandable: self._values.update({current_key: validated_values}) else: self.set_error(current_key, self._fields_comments[self.primary_key], list(self._values.keys())) self._values = self.fill(self._values) def check_values_types(self): '''Метод для проверки по типам всех значений таблицы.''' for row in self._values.values(): for field, field_type in self._fields.items(): if row[field] is not None: row[field] = field_type.process_value(row[field], self._parent) def get_for_var(self): '''Метод для преобразования данного описания таблицы в ту, которая приемлема для инициализации переменных.''' output = [] for value in self._values.values(): output.append(value) return output class Table(ParameterType): '''Метод реализующий тип таблиц.''' descr = 'LIST' def __init__(self, expandable=False): self.expandable = expandable def process_value(self, value: TableValue, parameter): if isinstance(value, TableValue): # Если родитель не установлен -- значит параметр был только что # инициализирован. Устанавливаем его, и проводим полную проверку # инициализированных значений таблицы. if value._parent is None: value._parent = parameter value.check_values_types() return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the table type") def __repr__(self): return (f'') class File(ParameterType): pass class Password(ParameterType): '''Класс типа строковых параметров.''' descr = 'PASSWORD' def __init__(self, param=False): # TODO разобраться с параметрами, необходимыми для параметров типа # Password. self.param = param def process_value(self, value, parameter) -> str: if isinstance(value, str): return value raise ValidationError(f"Can not assign '{type(value)}' value to" " parameter of the string type") def __repr__(self): return '' class Description: '''Класс, содержащий описание параметра и способ его отображения.''' # Словарь со способами отображения параметра в графическом интерфейсе. representations = {} def __init__(self, short='', full='', usage=''): self.short = short self.full = full self.usage = usage self._gui_repr = None def initialize(self, parameter_name, parameter_type, shortname=None): '''Метод для инициализации представления по указанной пользователем информации, типу параметра и его имени.''' if not self.short: self.short = parameter_name if not self.full: self.full = self.short if not self.usage: if shortname: self.usage = f"-{shortname} {parameter_type.descr}, " self.usage = self.usage +\ f"--{parameter_name} {parameter_type.descr}" self._gui_repr = self._get_gui_repr(parameter_type) def _get_gui_repr(self, parameter_type): '''Метод для получения по типу параметра способа его представления.''' # TODO реализовать его, когда удастся выделить совокупность доступных # способов представления. return None @property def gui_repr(self): return self._gui_repr class BaseParameter: '''Класс базовый класс всех параметров.''' default_type = ParameterType def __init__(self, name, group, desctiption: Description, shortname=None, argv=None): self._group = group self._name = name self._shortname = shortname self._argv = argv # Ссылка на экземпляр контейнера параметров. self.container = None self._value = None # Допустимые значения параметра, если таковые имеются. # self.choices = OrderedDict() # Комментарии, если таковой необходим. self.comment = None # Флаг, указывающий на то, что значение параметра было изменено # установлено пользователем с помощью метода set. self._set_by_user = False self._validated = False if hasattr(self, 'type'): self._parameter_type = self.type else: self._parameter_type = self.default_type() # Добавляем описание и инициализируем его. self._description = desctiption self._description.initialize(self._name, self._parameter_type, shortname=self._shortname) # Комментарий, наличие которого указывает на неактивность параметра. self.disactivity_comment = None # Коллекции текущих подписок, аргументов, из которых эти подписки # формируются и подписчиков. self._subscriptions = {} self._args = [] self._subscribers = [] # Флаги для проверки того необходимо ли проводить поиск подписок или # подписчиков. self._args_is_found = False self._sets_is_found = False # Метод для указания того, что в данный момент осуществляется расчет # значения параметра. self._calculating = False self._validation = False @property def choices(self): if isinstance(self._parameter_type, Choice): return self._parameter_type.choices @property def disactivity(self): return bool(self.disactivity_comment) def validate(self, container, datavars, value) -> None: '''Метод для проверки корректности параметра. Переопределяется при создании нового типа параметра.''' return None def validate_value(self, value): '''Метод для запуска валидации параметра с использованием переопределенного метода валидации.''' if self._validation: raise CyclicValidationError(self._name) with self._start_validation(): try: self.validate(self.container, self.container.datavars, value) except CyclicValidationError as error: raise CyclicValidationError(self._name, *error.queue) @contextmanager def _start_validation(self): '''Контекстный менеджер предназначеный для переключения параметра в режим валидации.''' try: self._validation = True yield self finally: self._validation = False def set(self, value) -> None: '''Метод для установки значения параметра.''' if isinstance(self._parameter_type, Table): print('VALUE:', self.value) self._value.change(*value) else: self._value = value self._set_by_user = True # Назначаем новые значения подписанным переменным. self._set_variables() @property def value(self): '''Метод для получения значения параметра. Выдается или дефолтное или установленное пользователем.''' if not self._set_by_user: if self._value is None and not self._calculating: self._value = self.update_value() return self._value def check_value_type(self, value): '''Метод для запуска проверки значения по типу параметра.''' return self._parameter_type.process_value(value, self) def bind(self, *variables) -> None: '''Метод для настройки взаимосвязи параметра с переменными.''' self._args = variables return self def bind_method(self, *variables): '''Метод переопределяемый для создания нового типа параметра. Должен возвращать вычисленное значение и активность параметра None -- активен, comment -- дизактивирован.''' if variables: return variables[0].value, None else: return None, None def set_to(self, *variables): '''Метод для добавления переменных, которым будет установлено значение параметра.''' self._subscribers = variables return self def _set_variables(self): '''Метод для установки значения параметра переменным-подписчикам.''' # Только если значение установлено пользователем. if self._set_by_user: if not self._sets_is_found: self._subscribers = self.find_variables(self._subscribers) self._sets_is_found = True for subscriber in self._subscribers: if isinstance(self._parameter_type, Table): subscriber.set(self.value.get_for_var()) else: subscriber.set(self.value) def _invalidate(self): '''Метод, через который переменные сообщают о необходимости пересчитать значение параметра с дефолтным значением.''' value = self.update_value() if not self._set_by_user: self._value = value self._validated = False def update_value(self): '''Метод для получения дефолтного значения.''' if not self._args_is_found: self._args = self.find_variables(self._args) self._args_is_found = True with self._start_calculate(): value, subscriptions = self._calculate_binding() # Обновляем подписки. Сначала убираем лишние. for subscription in self._subscriptions: if subscription not in subscriptions: subscription.subscribers.remove(self) # Теперь добавляем новые. for subscription in subscriptions: subscription.subscribers.add(self) self._subscriptions = subscriptions return self._parameter_type.process_value(value, self) def _calculate_binding(self): '''Метод для расчета значения параметра по умолчанию, используя заданные переменные.''' subscriptions = set() args = tuple(VariableWrapper(arg, subscriptions) for arg in self._args) try: value, self.disactivity_comment = self.bind_method(*args) return value, subscriptions except Exception as error: raise ParameterError('can not calculate using dependencies: {}' ' reason: {}'.format(', '.join( [subscription.get_fullname() for subscription in self._args]), str(error))) @contextmanager def _start_calculate(self): '''Менеджер контекста устанавливающий флаг, указывающий, что данная переменная в состоянии расчета.''' try: self._calculating = True yield self finally: self._calculating = False def find_variables(self, variables): '''Метод для поиска переменных, по их названию. Обходит список переменных и заменяет все строки с запросами в списке на результат поиска по запросу.''' output = [] for variable in variables: if isinstance(variable, str): variable = DependenceAPI.find_variable(variable, self.container.datavars) output.append(variable) return output def __repr__(self): description = self._description.short or self._description.full return (f"") class GroupWrapper: '''Класс обертки параметров группы, который возможно будет использоваться для организации работы графического интерфейса.''' def __init__(self, group, container): self.values = OrderedDict() self.container = container def set_group(self): '''Метод для установки значений параметров, осносящихся к группе.''' self.container.set_parameters(self.values) class Parameters: '''Класс контейнера параметров.''' def __init__(self, datavars: Datavars, check_order=[]): self._datavars = datavars self._parameters = OrderedDict() self._validation_dict = OrderedDict() self._order = check_order # Флаг указывающий на то, что в данный момент идет валидация параметров self._validation = False # Занятые имена параметров. self._names = set() # Список позиционных аргументов. self._argvs = [] # Словарь функций для взаимодействия графического клиента с группами # параметров. self._gui_helpers = {} def add(self, *parameters: Tuple[BaseParameter]): '''Метод для добавления некоторой совокупности параметров.''' for parameter in parameters: self.add_parameter(parameter) def set_order(self, *parameters): '''Метод для установки порядка проверки параметров.''' self._order = [] for parameter in parameters: if isinstance(parameter, str): parameter = self[parameter] self._order.append(parameter) for parameter in self: if parameter not in self._order: self._order.append(parameter) def add_parameter(self, parameter: BaseParameter) -> None: '''Метод для добавления параметров в контейнер.''' if parameter._name in self._names: raise ParameterError(f"Can not add parameter '{parameter._name}'." " Such name has already been added. ") elif parameter._shortname and parameter._shortname in self._names: raise ParameterError(f"Can not add parameter '{parameter._name}'." " Such name has already been added. ") elif (parameter._argv is not None and len(self._argvs) > parameter._argv and self._argvs[parameter._argv]): raise ParameterError(f"Can not add positional parameter " f"'{parameter._name}'. Position" f"'{parameter._argv}' is already taken.") parameter.container = self if parameter._group in self._parameters: self._parameters[parameter._group].append(parameter) else: self._parameters[parameter._group] = [parameter] self._names.add(parameter._name) if parameter._shortname: self._names.add(parameter._shortname) if parameter._argv is not None: # Если нужно, расширяем список позиций. if len(self._argvs) <= parameter._argv: while(len(self._argvs) <= parameter._argv): self._argvs.append(None) self._argvs[parameter._argv] = parameter self._order.append(parameter) parameter.update_value() def set_parameters(self, parameters: OrderedDict) -> None: '''Метод для установки значений некоторого числа параметров их проверки.''' # Сначала проверяем все значения по типам и составляем словарь # валидации. for parameter_name, value in parameters.items(): parameter = self[parameter_name] if not parameter.disactivity_comment: self._validation_dict[parameter] =\ parameter._parameter_type.process_value(value, parameter) parameter._validated = False # Теперь запускаем переопределенный метод для проверки параметров. with self._run_validation(): self.validate_parameters(self._validation_dict) def validate_parameters(self, parameters: OrderedDict): '''Метод для запуска валидации параметров.''' for parameter in self._order: if parameter in self._validation_dict: if (not parameter._validated and not parameter.disactivity_comment): parameter.validate_value(self._validation_dict[parameter]) parameter._validated = True parameter.set(self._validation_dict[parameter]) def validate_all(self): for parameter in self._order: if (not parameter._validated and not parameter.disactivity_comment): parameter.validate_value(self._validation_dict[parameter]) parameter._validated = True parameter.set(self._validation_dict[parameter]) @contextmanager def _run_validation(self): '''Контекстный менеджер предназначенный для перевода контейнера параметров в режим валидации.''' try: self._validation = True yield self finally: self._validation = False self._validation_dict = OrderedDict() def get_group_parameters(self, group_name: str): '''Метод для получения списка параметров, относящихся к указанной группе.''' return self._parameters[group_name] def get_descriptions(self) -> dict: '''Метод для получения словаря с описанием параметров.''' output = OrderedDict() for group, parameters in self._parameters.items(): for parameter in parameters: usage = ' ' + parameter._description.usage full = parameter._description.full if len(usage) > 23: usage = usage + '\n' full = ' ' * 24 + full elif len(usage) == 23: usage = usage + ' ' else: usage = usage.ljust(23) + ' ' parameter_description = (f"{usage}{full}") if group in output: output[group].append(parameter_description) else: output[group] = [parameter_description] return output @property def datavars(self) -> Union[Datavars, NamespaceNode]: return self._datavars def __getitem__(self, name: str) -> Any: for group, parameters in self._parameters.items(): for parameter in parameters: if parameter._name == name or parameter._shortname == name: if self._validation and not parameter._validated: # В режиме валидации параметров, при попытке получить # параметр сначала проверяем необходимость его проверки # И если нужно проверяем его. parameter_value =\ self._validation_dict.get(parameter, None) or\ parameter._value parameter.validate_value(parameter_value) parameter._validated = True return parameter else: return parameter raise ParameterError(f"No such parameter '{name}'.") def __iter__(self): for parameters in self._parameters.values(): for parameter in parameters: yield parameter