# vim: fileencoding=utf-8 # import os import logging import importlib import importlib.util from jinja2 import Environment, FileSystemLoader from calculate.variables.datavars import ( NamespaceNode, VariableNode, ListType, IntegerType, FloatType, IniType, TableType, Namespace, HashType, VariableNotFoundError, VariableError, ) from calculate.utils.gentoo import ProfileWalker from calculate.utils.files import read_file, FilesError from calculate.utils.tools import Singleton from pyparsing import ( Literal, Word, ZeroOrMore, Group, Optional, restOfLine, empty, printables, OneOrMore, lineno, line, SkipTo, LineEnd, Combine, nums, ) from enum import Enum from contextlib import contextmanager class LoaderError(Exception): '''Исключение выбрасываемое загрузчиком, если тот в принципе не может загрузить переменные.''' pass class Define(Enum): assign = 0 append = 1 remove = 2 class CalculateIniParser(metaclass=Singleton): '''Класс парсера calculate.ini файлов.''' def __init__(self): self.operations = {"=": Define.assign, "+=": Define.append, "-=": Define.remove} lbrack = Literal("[") rbrack = Literal("]") # comma = Literal(",").suppress() comment_symbol = Literal(';') | Literal('#') # Define = self.Define value_operation = (Literal("=") | Combine(Literal("+") + Literal("=")) | Combine(Literal("-") + Literal("="))) comment = comment_symbol + Optional(restOfLine) section_name = (lbrack.suppress() + (~Word(nums) + Word(printables+'\t', excludeChars='[]')) + rbrack.suppress()) value_name = Word(printables+'\t', excludeChars='=-+') # non_comma = Word(printables+'\t', excludeChars=',') clear_section = lbrack.suppress() + Group(empty) + rbrack.suppress() row_index = lbrack.suppress() + Word(nums) + rbrack.suppress() namespace_start = Group(OneOrMore(section_name) + (clear_section | ~lbrack) + LineEnd().suppress()) table_start = Group(OneOrMore(section_name) + (row_index | clear_section | ~lbrack) + LineEnd().suppress()) def add_lineno(string, location, tokens): tokens.append(lineno(location, string)) section_start = (namespace_start('namespace') | table_start('table')) section_start.setParseAction(add_lineno) # Если содержимое ini-файла не предваряется заголовком секции, # значит эта строка ошибочна. unexpected = Group(~section_start + SkipTo(LineEnd(), include=True))("error") unexpected.setParseAction(self._unexpected_token) key_value = (~lbrack + value_name + value_operation + empty + restOfLine + LineEnd().suppress()) def process_key_value(string, location, tokens): tokens[0] = tokens[0].strip() tokens[1] = tokens[1].strip() tokens.append(lineno(location, string)) key_value.setParseAction(process_key_value) self.ini_section_parser = (section_start + Group(ZeroOrMore( Group(key_value | unexpected))) | unexpected) self.ini_section_parser.ignore(comment) def parse(self, data: str): for tokens, start, end in self.ini_section_parser.scanString(data): if tokens.getName() == "error": if tokens[1].strip(): yield({'error': (tokens[0], tokens[1])}) continue section, section_lineno, defkeys = tokens if section.getName() == 'namespace': section_list = section.asList() if section_list[-1] == []: yield {'clear_section': (section_list[:-1], section_lineno)} else: yield {'start_section': (section_list, section_lineno)} for defkey in defkeys: if defkey.getName() == "error": if defkey[1].strip(): yield({'error': (defkey[0], defkey[1])}) continue yield {'define_key': (defkey[0], defkey[2], self.operations[defkey[1]], defkey[3])} else: table_list = section.asList() table_values = {} for defkey in defkeys: if defkey.getName() == "error": if defkey[1].strip(): yield({'error': (defkey[0], defkey[1])}) continue table_values.update({defkey[0]: defkey[2]}) yield {'start_table': (table_list, table_values, section_lineno)} def _unexpected_token(self, string, location, tokens): '''Метод вызываемый парсером, если обнаружена некорректная строка, предназначен для получения некорректной строки и ее дальнейшего разбора.''' return [lineno(location, string), line(location, string)] class NamespaceIniFiller: '''Класс, предназначенный для наполнения Namespace объекта переменными из calculate.ini файла.''' available_sections = {'custom'} def __init__(self, restrict_creation=True): self.ini_parser = CalculateIniParser() self._errors = [] # Флаги, определяющие возможность создания новых переменных и новых # пространств имен в данном пространстве имен. self.restricted = restrict_creation self.modify_only = False def fill(self, namespace: NamespaceNode, ini_file_text: str) -> None: '''Метод для разбора calculate.ini файла и добавления всех его переменных в указанное пространство имен.''' self.namespace = namespace self.current_namespace = self.namespace self._errors = [] for parsed_line in self.ini_parser.parse(ini_file_text): self._line_processor(**parsed_line) def _line_processor(self, start_section=None, clear_section=None, start_table=None, define_key=None, error=None, **kwargs): '''Метод вызывающий обработку токенов, выдаваемых парсером в зависимости от их типа.''' if start_section is not None: self.start_section(*start_section) elif clear_section is not None: self.clear_section(*clear_section) elif start_table is not None: self.start_table(*start_table) elif define_key is not None: self.define_key(*define_key) elif error is not None: self._set_error(error[0], 'SyntaxError', error[1]) def start_section(self, sections: str, lineno) -> None: '''Метод для получения доступа и создания пространств имен.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections self.current_namespace = self.namespace for section in sections: if isinstance(self.current_namespace, Datavars): if section not in self.current_namespace: self._set_error(lineno, 'VariableError', "variables package '{}' is not found.". format(section)) self.current_namespace = None return elif isinstance(self.current_namespace, NamespaceNode): if section not in self.current_namespace.namespaces: if (section in self.current_namespace.variables and self.current_namespace[section].variable_type is HashType): # Если секция является хэшем, используем ее. self.current_namespace = self.current_namespace.\ variables[section] return elif not self.modify_only: self.current_namespace.add_namespace( NamespaceNode(section)) else: self._set_error(lineno, 'VariableError', "can not create namespace '{}.{}' in" " not 'custom' namespace.".format( self.current_namespace.get_fullname(), section)) self.current_namespace = None return self.current_namespace = self.current_namespace.namespaces[section] def clear_section(self, sections: list, lineno) -> None: '''Метод для очистки пространства имен.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections current_namespace = self.namespace for section in sections: if isinstance(current_namespace, Datavars): if section in current_namespace: current_namespace = current_namespace[section] else: self._set_error(lineno, 'VariableError', "can not clear"" namespace '{}'. Variables" " package '{}' is not found.".format( ".".join(sections), section)) return elif isinstance(current_namespace, NamespaceNode): if section in current_namespace.namespaces: current_namespace = current_namespace[section] elif (section in current_namespace.variables and current_namespace.variables[section].variable_type is TableType): table_variable = current_namespace.variables[section] table_to_clear = table_variable.get_value() table_to_clear.clear() table_variable.source = table_to_clear return else: self._set_error(lineno, 'VariableError', "can not clear namespace '{}'. Namespace" " is not found.".format( ".".join(sections))) return if not self.modify_only: current_namespace.clear() else: self._set_error(lineno, 'VariableError', "can not clear namespace '{}' from not 'custom'" " namespace.".format(current_namespace. get_fullname())) def start_table(self, sections: str, row, lineno) -> None: '''Метод для создания и модификации таблиц.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections self.current_namespace = self.namespace row_index = int(sections.pop()) table_name = sections.pop() for section in sections: if section not in self.current_namespace.namespaces: if not self.modify_only: self.current_namespace.add_namespace( NamespaceNode(section)) else: self._set_error(lineno, 'VariableError', "can not create table '{}.{}', namespace" " '{}' is not found.".format( ".".join(sections), table_name, section)) self.current_namespace = None return self.current_namespace = self.current_namespace.namespaces[section] if table_name not in self.current_namespace.variables: if not self.modify_only: table_variable = VariableNode(table_name, self.current_namespace, variable_type=TableType, source=[row]) else: self._set_error(lineno, 'VariableError', "can not create table '{}.{}' in not 'custom'" " namespace.".format(self.current_namespace. get_fullname(), table_name)) else: table_variable = self.current_namespace.variables[table_name] table = table_variable.get_value() if row_index < len(table): table.change_row(row, row_index) else: table.add_row(row) table_variable.source = table def define_key(self, key: str, value: str, optype, lineno) -> None: '''Метод для создания и модификации переменных.''' if self.current_namespace is None: return if (isinstance(self.current_namespace, VariableNode) and self.current_namespace.variable_type is HashType): self.update_hash(key, value, optype, lineno) else: if optype == Define.assign: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.change_value(key, value, lineno) elif optype == Define.append: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.append_value(key, value, lineno) elif optype == Define.remove: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.remove_value(key, value, lineno) def change_value(self, key: str, value: str, lineno) -> None: '''Метод для изменения значения переменной.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable.source = value def define_variable(self, key: str, value: str, lineno) -> None: '''Метод для создания переменных в calculate.ini файле.''' if not self.modify_only: VariableNode(key, self.current_namespace, variable_type=IniType, source=value) else: self._set_error(lineno, 'VariableError', "can not create variable '{}.{}' in not 'custom'" " namespace.".format( self.current_namespace.get_fullname(), key)) def append_value(self, key: str, value: str, lineno) -> None: '''Метод выполняющий действия возложенные на оператор +=.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable_value = variable.get_value() if variable.variable_type is IniType: value_list = value.split(',') variable_list = variable_value.split(',') for item in value_list: if item not in variable_list: variable_list.append(item.strip()) variable_value = ','.join(variable_list) elif variable.variable_type is ListType: value_list = value.split(',') for item in value_list: if item not in variable_value: variable_value.append(item.strip()) elif variable.variable_type is IntegerType: variable_value += int(value) elif variable.variable_type is FloatType: variable_value += float(value) variable.source = variable_value def remove_value(self, key: str, value: str, lineno) -> None: '''Метод выполняющий действия возложенные на оператор -=.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable_value = variable.get_value() if variable.variable_type is IniType: value_list = value.split(',') variable_list = [item.strip() for item in variable_value.split(',')] for item in value_list: if item in variable_list: variable_list.remove(item.strip()) variable_value = ','.join(variable_list) elif variable.variable_type is ListType: value_list = value.split(',') for item in value_list: if item in variable_value: variable_value.remove(item.strip()) elif variable.variable_type is IntegerType: variable_value -= int(value) elif variable.variable_type is FloatType: variable_value -= float(value) variable.source = variable_value def update_hash(self, key: str, value: str, optype, lineno): '''Метод для изменения переменных хэшей через calculate.ini.''' if self.current_namespace.readonly: self._set_error(lineno, 'VariableError', "can not change readonly hash variable " f"'{self.current_namespace.get_fullname()}'") return hash_to_update = self.current_namespace.get_value().get_hash() if key not in hash_to_update: # Если ключ отсутствует в хэше, то проверяем, является ли он # фиксированным. if self.current_namespace.fixed: self._set_error(lineno, 'VariableError', "key '{}' is unavailable for fixed hash '{}'," " available keys: '{}'.".format( key, self.current_namespace.get_fullname(), ', '.join(self.current_namespace. get_value()._fields))) return else: hash_to_update.update({key: value}) elif optype == Define.assign: hash_to_update.update({key: value}) elif optype == Define.append: current_value = hash_to_update[key] hash_to_update[key] = current_value + value elif optype == Define.remove: current_value = hash_to_update[key] if (isinstance(current_value, int) or isinstance(current_value, float)): hash_to_update[key] = current_value - value elif isinstance(current_value, str): value_list = [item.strip() for item in value.split(',')] current_value = [item.strip() for item in current_value.split(',')] for value_to_remove in value_list: if value_to_remove in current_value: current_value.remove(value_to_remove) hash_to_update[key] = ','.join(current_value) self.current_namespace.source = hash_to_update def _set_error(self, lineno, error_type, line): '''Метод для добавления ошибки в лог.''' self._errors.append("{}:{}: {}".format(error_type, lineno, line)) @property def errors(self): errors = self._errors self._errors = [] return errors class VariableLoader: '''Класс загрузчика переменных из python-файлов и из ini-файлов.''' ini_basename = "calculate.ini" def __init__(self, datavars, variables_path, repository_map=None): self.datavars = datavars self.logger = datavars.logger self.ini_filler = NamespaceIniFiller() self.variables_path = variables_path self.variables_package = '.'.join(variables_path.split('/')) self.repository_map = repository_map def load_variables_package(self, package_name: str) -> None: '''Метод для загрузки пакетов с переменными.''' directory_path = os.path.join(self.variables_path, package_name) package = '{}.{}'.format(self.variables_package, package_name) package_namespace = NamespaceNode(package_name) self.datavars.root.add_namespace(package_namespace) self._fill_from_package(package_namespace, directory_path, package) def load_from_profile(self): '''Метод для загрузки переменных из calculate.ini профиля.''' # Проверяем наличие таблицы репозиториев в переменных. if self.repository_map == {}: return if self.repository_map is None: repositories_variable_path = ['os', 'gentoo', 'repositories'] current_namespace = self.datavars for section in repositories_variable_path: if section in current_namespace: current_namespace = current_namespace[section] else: self.logger.error("Variable 'os.gentoo.repositories'" " is not found. Can not load profile" " variables.") return self.repository_map = self._get_repository_map(self.datavars) # Проверяем наличие пути к профилю в переменных. if ('profile' in self.datavars.os.gentoo and 'path' in self.datavars.os.gentoo.profile): profile_path = self.datavars.os.gentoo.profile.path else: self.logger.error("Variable 'os.gentoo.profile.path'" " is not found. Can not load profile" " variables.") return self.logger.info("Load variables from profile: '{}'.".format( profile_path)) self._fill_from_profile_ini(profile_path) def load_user_variables(self): '''Метод для загрузки переменных из calculate.ini указанных в переменных env_order и env_path.''' try: env_order = self.datavars.system.env_order env_path = self.datavars.system.env_path except VariableNotFoundError as error: self.logger.warning("Can not load additional variables: {}". format(str(error))) return for ini_file in env_order: self.logger.info("Loading variables from file: '{}'".format( ini_file)) if ini_file in env_path: self.fill_from_custom_ini(env_path[ini_file].value) self.logger.info("Variables from '{}' are loaded".format( ini_file)) else: self.logger.warning("File '{}' is not found. Variables are" " not loaded".format(ini_file)) def _fill_from_package(self, current_namespace: NamespaceNode, directory_path: str, package: str) -> None: '''Метод для зaполнения переменных из python-файла.''' file_nodes = [] directory_nodes = [] # Просматриваем директорию for node in os.scandir(directory_path): if node.is_dir(): directory_nodes.append(node) elif node.is_file() and node.name.endswith('.py'): file_nodes.append(node) # Сначала загружаем переменные из файлов. for file_node in file_nodes: if not file_node.name.endswith('.py'): continue file_name = file_node.name[:-3] Namespace.set_current_namespace(current_namespace) # with self.test(file_name, current_namespace): # importlib.import_module('{}.{}'.format(package, file_name)) spec = importlib.util.spec_from_file_location( '{}.{}'.format(package, file_name), file_node.path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Обходим остальные директории. for directory_node in directory_nodes: namespace = NamespaceNode(directory_node.name) current_namespace.add_namespace(namespace) self._fill_from_package(namespace, directory_node.path, '{}.{}'.format(package, directory_node.name)) def _fill_from_profile_ini(self, profile_path): '''Метод для зaполнения переменных из ini-файла.''' profile_walker = ProfileWalker(self.ini_basename, self.repository_map) for file_path in profile_walker.find(profile_path): try: ini_file_text = read_file(file_path) self.ini_filler.fill(self.datavars, ini_file_text) except FilesError: self.logger.error("Can not load profile variables from" " unexisting file: {}".format(file_path)) def _get_repository_map(self, datavars): '''Метод для получения из переменной словаря с репозиториями и путями к ним.''' return {repo['name']: repo['path'] for repo in datavars.os.gentoo.repositories} def fill_from_custom_ini(self, file_path: str): '''Метод для заполнения переменных из конкретного указанного файла.''' if os.path.exists(file_path): ini_file_text = read_file(file_path) self.ini_filler.fill(self.datavars, ini_file_text) parsing_errors = self.ini_filler.errors if parsing_errors: for error in parsing_errors: self.logger.error(error) self.logger.warning('Some variables was not loaded.') else: self.logger.info('All variables are loaded.') else: self.logger.error("Variables are not loaded. File '{}' does" " not exist.".format(file_path)) @contextmanager def test(self, file_name, namespace): '''Контекстный менеджер для тестирования.''' print('IMPORT: {}.{}'.format(namespace.get_fullname(), file_name)) try: yield self finally: print('IMPORTED FROM: {}.{}'.format(namespace.get_fullname(), file_name)) class CalculateIniSaver: '''Класс для сохранения значений переменных в указанные ini-файлы.''' def __init__(self, ini_parser=None): self.ini_parser = CalculateIniParser() self.operations = {Define.assign: '=', Define.append: '+=', Define.remove: '-='} file_loader = FileSystemLoader('calculate/variables') environment = Environment(loader=file_loader) self.ini_template = environment.get_template('ini_template') def save_to_ini(self, target_path, variables_to_save): '''Метод для сохранения переменных в указанный ini-файл.''' ini_file_text = read_file(target_path) ini_dictionary = self._parse_ini(ini_file_text) for namespace in variables_to_save: if namespace in ini_dictionary: ini_dictionary[namespace].update(variables_to_save[namespace]) else: ini_dictionary[namespace] = variables_to_save[namespace] ini_file_text = self._get_ini_text(ini_dictionary) with open(target_path, 'w') as ini_file: ini_file.write(ini_file_text) def _parse_ini(self, ini_file_text): '''Метод для разбора текста ini-файла в словарь, в который далее будут добавляться измененные переменные.''' current_namespace = None ini_dictionary = dict() for parsed_line in self.ini_parser.parse(ini_file_text): line_type = next(iter(parsed_line)) line_content = parsed_line[line_type] if (line_type == 'start_section' or line_type == 'start_table'): current_namespace = tuple(line_content[0]) if current_namespace not in ini_dictionary: ini_dictionary[current_namespace] = dict() elif (line_type == 'clear_section' or line_type == 'clear_table'): current_namespace = (*line_content[0], '') ini_dictionary[current_namespace] = dict() elif line_type == 'define_key': namespace = ini_dictionary[current_namespace] namespace.update({line_content[0]: (self.operations[line_content[2]], line_content[1])}) return ini_dictionary def _get_ini_text(self, ini_dictionary): '''Метод для получения текста ini файла, полученного в результате наложения изменений из тегов save в шаблонах.''' ini_text = self.ini_template.render(ini_dictionary=ini_dictionary) return ini_text.strip() class Datavars: '''Класс для хранения переменных и управления ими.''' def __init__(self, variables_path='calculate/vars', repository_map=None, logger=None): self._variables_path = variables_path self._available_packages = self._get_available_packages() if logger is not None: self.logger = logger else: logger = logging.getLogger("main") # stream_handler = logging.StreamHandler() # logger.addHandler(stream_handler) self.logger = logger self.root = NamespaceNode('') self._loader = VariableLoader(self, self._variables_path, repository_map=repository_map) Namespace.reset() Namespace.set_datavars(self) self._loader.load_from_profile() self._loader.load_user_variables() # Создаем словарь переменных, которые нужно сохранить потом в # ini-файлах. try: self.variables_to_save = {target: dict() for target in self.system.env_order if target in self.system.env_path} except VariableNotFoundError: self.variables_to_save = dict() def reset(self): '''Метод для сброса модуля переменных.''' self.root.clear() self.root = NamespaceNode('') self._available_packages.clear() self._available_packages = self._get_available_packages() Namespace.set_datavars(self) def _get_available_packages(self) -> dict: '''Метод для получения словаря с имеющимися пакетами переменных и путями к ним.''' available_packages = dict() for file_name in os.listdir(self._variables_path): if file_name.startswith('__'): continue file_path = os.path.join(self._variables_path, file_name) if os.path.isdir(file_path): available_packages.update({file_name: file_path}) return available_packages def _load_package(self, package_name): '''Метод для загрузки переменных содержащихся в указанном пакете.''' self.logger.info("Loading datavars package '{}'".format(package_name)) try: self._loader.load_variables_package(package_name) except Exception as error: raise VariableError("Can not load datavars package: {}". format(error)) @property def available_packages(self): packages = set(self._available_packages) packages.update({'custom'}) return packages def __getattr__(self, package_name: str): '''Метод возвращает ноду пространства имен, соответствующего искомому пакету.''' if package_name in self.root.namespaces: return self.root[package_name] elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return self.root[package_name] elif package_name not in self._available_packages: raise VariableNotFoundError("datavars package '{}' is not found". format(package_name)) else: self._load_package(package_name) return self.root[package_name] def __getitem__(self, package_name: str) -> None: '''Метод возвращает ноду пространства имен, соответствующего искомому пакету.''' if package_name in self.root: return self.root[package_name] elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return self.root[package_name] elif package_name == 'tasks': self.create_tasks() return self.root[package_name] elif package_name not in self._available_packages: raise VariableNotFoundError("variables package '{}' is not found". format(package_name)) else: self._load_package(package_name) return self.root[package_name] def __contains__(self, package_name): if package_name in self.root.namespaces: return True elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return True elif package_name == 'tasks': self.create_tasks() return True elif (package_name not in self._available_packages and package_name != 'custom'): return False else: self._load_package(package_name) return True def add_namespace(self, namespace_node): self.root.add_namespace(namespace_node) def create_tasks(self): '''Метод для создания всех необходимых пространств имен для работы задач.''' tasks = NamespaceNode('tasks') self.add_namespace(tasks) env = NamespaceNode('env') tasks.add_namespace(env) env.add_namespace('loop') @property def namespaces(self): return self.root.namespaces def save_variables(self): '''Метод для сохранения значений переменных в calculate.ini файлах.''' target_paths = self.system.env_path saver = CalculateIniSaver() for target in self.variables_to_save: if self.variables_to_save[target]: dict_to_save = self.variables_to_save[target] target_path = target_paths[target].value saver.save_to_ini(target_path, dict_to_save) self.logger.info("Variables for '{}' is saved in the" " file: {}".format(target, target_path))