# vim: fileencoding=utf-8 # import os import logging import importlib import importlib.util from jinja2 import Environment, FileSystemLoader from calculate.variables.datavars import NamespaceNode, VariableNode,\ ListType, IntegerType,\ FloatType, IniType, TableType,\ Namespace, HashType,\ VariableNotFoundError, VariableError from calculate.utils.gentoo import ProfileWalker from calculate.utils.files import read_file, FilesError from calculate.utils.tools import Singleton from pyparsing import Literal, Word, ZeroOrMore, Group, Optional, restOfLine,\ empty, printables, OneOrMore, lineno, line, SkipTo,\ LineEnd, Combine, nums from enum import Enum from contextlib import contextmanager class LoaderError(Exception): '''Исключение выбрасываемое загрузчиком, если тот в принципе не может загрузить переменные.''' pass class Define(Enum): assign = 0 append = 1 remove = 2 class CalculateIniParser(metaclass=Singleton): '''Класс парсера calculate.ini файлов.''' def __init__(self): self.operations = {"=": Define.assign, "+=": Define.append, "-=": Define.remove} lbrack = Literal("[") rbrack = Literal("]") # comma = Literal(",").suppress() comment_symbol = Literal(';') | Literal('#') # Define = self.Define value_operation = (Literal("=") | Combine(Literal("+") + Literal("=")) | Combine(Literal("-") + Literal("="))) comment = comment_symbol + Optional(restOfLine) section_name = (lbrack.suppress() + (~Word(nums) + Word(printables+'\t', excludeChars='[]')) + rbrack.suppress()) value_name = Word(printables+'\t', excludeChars='=-+') # non_comma = Word(printables+'\t', excludeChars=',') clear_section = lbrack.suppress() + Group(empty) + rbrack.suppress() row_index = lbrack.suppress() + Word(nums) + rbrack.suppress() namespace_start = Group(OneOrMore(section_name) + (clear_section | ~lbrack) + LineEnd().suppress()) table_start = Group(OneOrMore(section_name) + (row_index | clear_section | ~lbrack) + LineEnd().suppress()) def add_lineno(string, location, tokens): tokens.append(lineno(location, string)) section_start = (namespace_start('namespace') | table_start('table')) section_start.setParseAction(add_lineno) # Если содержимое ini-файла не предваряется заголовком секции, # значит эта строка ошибочна. unexpected = Group(~section_start + SkipTo(LineEnd(), include=True))("error") unexpected.setParseAction(self._unexpected_token) key_value = (~lbrack + value_name + value_operation + empty + restOfLine + LineEnd().suppress()) def process_key_value(string, location, tokens): tokens[0] = tokens[0].strip() tokens[1] = tokens[1].strip() tokens.append(lineno(location, string)) key_value.setParseAction(process_key_value) self.ini_section_parser = (section_start + Group(ZeroOrMore( Group(key_value | unexpected))) | unexpected) self.ini_section_parser.ignore(comment) def parse(self, data: str): for tokens, start, end in self.ini_section_parser.scanString(data): if tokens.getName() == "error": if tokens[1].strip(): yield({'error': (tokens[0], tokens[1])}) continue section, section_lineno, defkeys = tokens if section.getName() == 'namespace': section_list = section.asList() if section_list[-1] == []: yield {'clear_section': (section_list[:-1], section_lineno)} else: yield {'start_section': (section_list, section_lineno)} for defkey in defkeys: if defkey.getName() == "error": if defkey[1].strip(): yield({'error': (defkey[0], defkey[1])}) continue yield {'define_key': (defkey[0], defkey[2], self.operations[defkey[1]], defkey[3])} else: table_list = section.asList() table_values = {} for defkey in defkeys: if defkey.getName() == "error": if defkey[1].strip(): yield({'error': (defkey[0], defkey[1])}) continue table_values.update({defkey[0]: defkey[2]}) yield {'start_table': (table_list, table_values, section_lineno)} def _unexpected_token(self, string, location, tokens): '''Метод вызываемый парсером, если обнаружена некорректная строка, предназначен для получения некорректной строки и ее дальнейшего разбора.''' return [lineno(location, string), line(location, string)] class NamespaceIniFiller: '''Класс, предназначенный для наполнения Namespace объекта переменными из calculate.ini файла.''' available_sections = {'custom'} def __init__(self, restrict_creation=True): self.ini_parser = CalculateIniParser() self._errors = [] # Флаги, определяющие возможность создания новых переменных и новых # пространств имен в данном пространстве имен. self.restricted = restrict_creation self.modify_only = False def fill(self, namespace: NamespaceNode, ini_file_text: str) -> None: '''Метод для разбора calculate.ini файла и добавления всех его переменных в указанное пространство имен.''' self.namespace = namespace self.current_namespace = self.namespace self._errors = [] for parsed_line in self.ini_parser.parse(ini_file_text): self._line_processor(**parsed_line) def _line_processor(self, start_section=None, clear_section=None, start_table=None, define_key=None, error=None, **kwargs): '''Метод вызывающий обработку токенов, выдаваемых парсером в зависимости от их типа.''' if start_section is not None: self.start_section(*start_section) elif clear_section is not None: self.clear_section(*clear_section) elif start_table is not None: self.start_table(*start_table) elif define_key is not None: self.define_key(*define_key) elif error is not None: self._set_error(error[0], 'SyntaxError', error[1]) def start_section(self, sections: str, lineno) -> None: '''Метод для получения доступа и создания пространств имен.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections self.current_namespace = self.namespace for section in sections: if isinstance(self.current_namespace, Datavars): if section not in self.current_namespace: self._set_error(lineno, 'VariableError', "variables package '{}' is not found.". format(section)) self.current_namespace = None return elif isinstance(self.current_namespace, NamespaceNode): if section not in self.current_namespace.namespaces: if (section in self.current_namespace.variables and self.current_namespace[section].variable_type is HashType): # Если секция является хэшем, используем ее. self.current_namespace = self.current_namespace.\ variables[section] return elif not self.modify_only: self.current_namespace.add_namespace( NamespaceNode(section)) else: self._set_error(lineno, 'VariableError', "can not create namespace '{}.{}' in" " not 'custom' namespace.".format( self.current_namespace.get_fullname(), section)) self.current_namespace = None return self.current_namespace = self.current_namespace.namespaces[section] def clear_section(self, sections: list, lineno) -> None: '''Метод для очистки пространства имен.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections current_namespace = self.namespace for section in sections: if isinstance(current_namespace, Datavars): if section in current_namespace: current_namespace = current_namespace[section] else: self._set_error(lineno, 'VariableError', "can not clear"" namespace '{}'. Variables" " package '{}' is not found.".format( ".".join(sections), section)) return elif isinstance(current_namespace, NamespaceNode): if section in current_namespace.namespaces: current_namespace = current_namespace[section] elif (section in current_namespace.variables and current_namespace.variables[section].variable_type is TableType): table_variable = current_namespace.variables[section] table_to_clear = table_variable.get_value() table_to_clear.clear() table_variable.source = table_to_clear return else: self._set_error(lineno, 'VariableError', "can not clear namespace '{}'. Namespace" " is not found.".format( ".".join(sections))) return if not self.modify_only: current_namespace.clear() else: self._set_error(lineno, 'VariableError', "can not clear namespace '{}' from not 'custom'" " namespace.".format(current_namespace. get_fullname())) def start_table(self, sections: str, row, lineno) -> None: '''Метод для создания и модификации таблиц.''' if self.restricted: self.modify_only = sections[0] not in self.available_sections self.current_namespace = self.namespace row_index = int(sections.pop()) table_name = sections.pop() for section in sections: if section not in self.current_namespace.namespaces: if not self.modify_only: self.current_namespace.add_namespace( NamespaceNode(section)) else: self._set_error(lineno, 'VariableError', "can not create table '{}.{}', namespace" " '{}' is not found.".format( ".".join(sections), table_name, section)) self.current_namespace = None return self.current_namespace = self.current_namespace.namespaces[section] if table_name not in self.current_namespace.variables: if not self.modify_only: table_variable = VariableNode(table_name, self.current_namespace, variable_type=TableType, source=[row]) else: self._set_error(lineno, 'VariableError', "can not create table '{}.{}' in not 'custom'" " namespace.".format(self.current_namespace. get_fullname(), table_name)) else: table_variable = self.current_namespace.variables[table_name] table = table_variable.get_value() if row_index < len(table): table.change_row(row, row_index) else: table.add_row(row) table_variable.source = table def define_key(self, key: str, value: str, optype, lineno) -> None: '''Метод для создания и модификации переменных.''' if self.current_namespace is None: return if (isinstance(self.current_namespace, VariableNode) and self.current_namespace.variable_type is HashType): self.update_hash(key, value, optype, lineno) else: if optype == Define.assign: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.change_value(key, value, lineno) elif optype == Define.append: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.append_value(key, value, lineno) elif optype == Define.remove: if key not in self.current_namespace: self.define_variable(key, value, lineno) else: self.remove_value(key, value, lineno) def change_value(self, key: str, value: str, lineno) -> None: '''Метод для изменения значения переменной.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable.source = value def define_variable(self, key: str, value: str, lineno) -> None: '''Метод для создания переменных в calculate.ini файле.''' if not self.modify_only: VariableNode(key, self.current_namespace, variable_type=IniType, source=value) else: self._set_error(lineno, 'VariableError', "can not create variable '{}.{}' in not 'custom'" " namespace.".format( self.current_namespace.get_fullname(), key)) def append_value(self, key: str, value: str, lineno) -> None: '''Метод выполняющий действия возложенные на оператор +=.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable_value = variable.get_value() if variable.variable_type is IniType: value_list = value.split(',') variable_list = variable_value.split(',') for item in value_list: if item not in variable_list: variable_list.append(item.strip()) variable_value = ','.join(variable_list) elif variable.variable_type is ListType: value_list = value.split(',') for item in value_list: if item not in variable_value: variable_value.append(item.strip()) elif variable.variable_type is IntegerType: variable_value += int(value) elif variable.variable_type is FloatType: variable_value += float(value) variable.source = variable_value def remove_value(self, key: str, value: str, lineno) -> None: '''Метод выполняющий действия возложенные на оператор -=.''' variable = self.current_namespace[key] if variable.readonly: self._set_error(lineno, 'VariableError', "can not change readonly variable " f"'{self.current_namespace.get_fullname()}.{key}'") return variable_value = variable.get_value() if variable.variable_type is IniType: value_list = value.split(',') variable_list = [item.strip() for item in variable_value.split(',')] for item in value_list: if item in variable_list: variable_list.remove(item.strip()) variable_value = ','.join(variable_list) elif variable.variable_type is ListType: value_list = value.split(',') for item in value_list: if item in variable_value: variable_value.remove(item.strip()) elif variable.variable_type is IntegerType: variable_value -= int(value) elif variable.variable_type is FloatType: variable_value -= float(value) variable.source = variable_value def update_hash(self, key: str, value: str, optype, lineno): '''Метод для изменения переменных хэшей через calculate.ini.''' if self.current_namespace.readonly: self._set_error(lineno, 'VariableError', "can not change readonly hash variable " f"'{self.current_namespace.get_fullname()}'") return hash_to_update = self.current_namespace.get_value().get_hash() if key not in hash_to_update: # Если ключ отсутствует в хэше, то проверяем, является ли он # фиксированным. if self.current_namespace.fixed: self._set_error(lineno, 'VariableError', "key '{}' is unavailable for fixed hash '{}'," " available keys: '{}'.".format( key, self.current_namespace.get_fullname(), ', '.join(self.current_namespace. get_value()._fields))) return else: hash_to_update.update({key: value}) elif optype == Define.assign: hash_to_update.update({key: value}) elif optype == Define.append: current_value = hash_to_update[key] hash_to_update[key] = current_value + value elif optype == Define.remove: current_value = hash_to_update[key] if (isinstance(current_value, int) or isinstance(current_value, float)): hash_to_update[key] = current_value - value elif isinstance(current_value, str): value_list = [item.strip() for item in value.split(',')] current_value = [item.strip() for item in current_value.split(',')] for value_to_remove in value_list: if value_to_remove in current_value: current_value.remove(value_to_remove) hash_to_update[key] = ','.join(current_value) self.current_namespace.source = hash_to_update def _set_error(self, lineno, error_type, line): '''Метод для добавления ошибки в лог.''' self._errors.append("{}:{}: {}".format(error_type, lineno, line)) @property def errors(self): errors = self._errors self._errors = [] return errors class VariableLoader: '''Класс загрузчика переменных из python-файлов и из ini-файлов.''' ini_basename = "calculate.ini" def __init__(self, datavars, variables_path, repository_map=None): self.datavars = datavars self.logger = datavars.logger self.ini_filler = NamespaceIniFiller() self.variables_path = variables_path self.variables_package = '.'.join(variables_path.split('/')) self.repository_map = repository_map def load_variables_package(self, package_name: str) -> None: '''Метод для загрузки пакетов с переменными.''' directory_path = os.path.join(self.variables_path, package_name) package = '{}.{}'.format(self.variables_package, package_name) package_namespace = NamespaceNode(package_name) self.datavars.root.add_namespace(package_namespace) self._fill_from_package(package_namespace, directory_path, package) def load_from_profile(self): '''Метод для загрузки переменных из calculate.ini профиля.''' # Проверяем наличие таблицы репозиториев в переменных. if self.repository_map == {}: return if self.repository_map is None: repositories_variable_path = ['os', 'gentoo', 'repositories'] current_namespace = self.datavars for section in repositories_variable_path: if section in current_namespace: current_namespace = current_namespace[section] else: self.logger.error("Variable 'os.gentoo.repositories'" " is not found. Can not load profile" " variables.") return self.repository_map = self._get_repository_map(self.datavars) # Проверяем наличие пути к профилю в переменных. if ('profile' in self.datavars.os.gentoo and 'path' in self.datavars.os.gentoo.profile): profile_path = self.datavars.os.gentoo.profile.path else: self.logger.error("Variable 'os.gentoo.profile.path'" " is not found. Can not load profile" " variables.") return self.logger.info("Load variables from profile: '{}'.".format( profile_path)) self._fill_from_profile_ini(profile_path) def load_user_variables(self): '''Метод для загрузки переменных из calculate.ini указанных в переменных env_order и env_path.''' try: env_order = self.datavars.system.env_order env_path = self.datavars.system.env_path except VariableNotFoundError as error: self.logger.warning("Can not load additional variables: {}". format(str(error))) return for ini_file in env_order: self.logger.info("Loading variables from file: '{}'".format( ini_file)) if ini_file in env_path: self.fill_from_custom_ini(env_path[ini_file].value) self.logger.info("Variables from '{}' are loaded".format( ini_file)) else: self.logger.warning("File '{}' is not found. Variables are" " not loaded".format(ini_file)) def _fill_from_package(self, current_namespace: NamespaceNode, directory_path: str, package: str) -> None: '''Метод для зaполнения переменных из python-файла.''' file_nodes = [] directory_nodes = [] # Просматриваем директорию for node in os.scandir(directory_path): if node.is_dir(): directory_nodes.append(node) elif node.is_file() and node.name.endswith('.py'): file_nodes.append(node) # Сначала загружаем переменные из файлов. for file_node in file_nodes: if not file_node.name.endswith('.py'): continue file_name = file_node.name[:-3] Namespace.set_current_namespace(current_namespace) # with self.test(file_name, current_namespace): # importlib.import_module('{}.{}'.format(package, file_name)) spec = importlib.util.spec_from_file_location( '{}.{}'.format(package, file_name), file_node.path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Обходим остальные директории. for directory_node in directory_nodes: namespace = NamespaceNode(directory_node.name) current_namespace.add_namespace(namespace) self._fill_from_package(namespace, directory_node.path, '{}.{}'.format(package, directory_node.name)) def _fill_from_profile_ini(self, profile_path): '''Метод для зaполнения переменных из ini-файла.''' profile_walker = ProfileWalker(self.ini_basename, self.repository_map) for file_path in profile_walker.find(profile_path): try: ini_file_text = read_file(file_path) self.ini_filler.fill(self.datavars, ini_file_text) except FilesError: self.logger.error("Can not load profile variables from" " unexisting file: {}".format(file_path)) def _get_repository_map(self, datavars): '''Метод для получения из переменной словаря с репозиториями и путями к ним.''' return {repo['name']: repo['path'] for repo in datavars.os.gentoo.repositories} def fill_from_custom_ini(self, file_path: str): '''Метод для заполнения переменных из конкретного указанного файла.''' if os.path.exists(file_path): ini_file_text = read_file(file_path) self.ini_filler.fill(self.datavars, ini_file_text) parsing_errors = self.ini_filler.errors if parsing_errors: for error in parsing_errors: self.logger.error(error) self.logger.warning('Some variables was not loaded.') else: self.logger.info('All variables are loaded.') else: self.logger.error("Variables are not loaded. File '{}' does" " not exist.".format(file_path)) @contextmanager def test(self, file_name, namespace): '''Контекстный менеджер для тестирования.''' print('IMPORT: {}.{}'.format(namespace.get_fullname(), file_name)) try: yield self finally: print('IMPORTED FROM: {}.{}'.format(namespace.get_fullname(), file_name)) class CalculateIniSaver: '''Класс для сохранения значений переменных в указанные ini-файлы.''' def __init__(self, ini_parser=None): self.ini_parser = CalculateIniParser() self.operations = {Define.assign: '=', Define.append: '+=', Define.remove: '-='} file_loader = FileSystemLoader('calculate/variables') environment = Environment(loader=file_loader) self.ini_template = environment.get_template('ini_template') def save_to_ini(self, target_path, variables_to_save): '''Метод для сохранения переменных в указанный ini-файл.''' ini_file_text = read_file(target_path) ini_dictionary = self._parse_ini(ini_file_text) for namespace in variables_to_save: if namespace in ini_dictionary: ini_dictionary[namespace].update(variables_to_save[namespace]) else: ini_dictionary[namespace] = variables_to_save[namespace] ini_file_text = self._get_ini_text(ini_dictionary) with open(target_path, 'w') as ini_file: ini_file.write(ini_file_text) def _parse_ini(self, ini_file_text): '''Метод для разбора текста ini-файла в словарь, в который далее будут добавляться измененные переменные.''' current_namespace = None ini_dictionary = dict() for parsed_line in self.ini_parser.parse(ini_file_text): line_type = next(iter(parsed_line)) line_content = parsed_line[line_type] if (line_type == 'start_section' or line_type == 'start_table'): current_namespace = tuple(line_content[0]) if current_namespace not in ini_dictionary: ini_dictionary[current_namespace] = dict() elif (line_type == 'clear_section' or line_type == 'clear_table'): current_namespace = (*line_content[0], '') ini_dictionary[current_namespace] = dict() elif line_type == 'define_key': namespace = ini_dictionary[current_namespace] namespace.update({line_content[0]: (self.operations[line_content[2]], line_content[1])}) return ini_dictionary def _get_ini_text(self, ini_dictionary): '''Метод для получения текста ini файла, полученного в результате наложения изменений из тегов save в шаблонах.''' ini_text = self.ini_template.render(ini_dictionary=ini_dictionary) return ini_text.strip() class Datavars: '''Класс для хранения переменных и управления ими.''' def __init__(self, variables_path='calculate/vars', repository_map=None, logger=None): self._variables_path = variables_path self._available_packages = self._get_available_packages() if logger is not None: self.logger = logger else: logger = logging.getLogger("main") # stream_handler = logging.StreamHandler() # logger.addHandler(stream_handler) self.logger = logger self.root = NamespaceNode('') self._loader = VariableLoader(self, self._variables_path, repository_map=repository_map) Namespace.reset() Namespace.set_datavars(self) self._loader.load_from_profile() self._loader.load_user_variables() # Создаем словарь переменных, которые нужно сохранить потом в # ini-файлах. try: self.variables_to_save = {target: dict() for target in self.system.env_order if target in self.system.env_path} except VariableNotFoundError: self.variables_to_save = dict() def reset(self): '''Метод для сброса модуля переменных.''' self.root.clear() self.root = NamespaceNode('') self._available_packages.clear() self._available_packages = self._get_available_packages() Namespace.set_datavars(self) def _get_available_packages(self) -> dict: '''Метод для получения словаря с имеющимися пакетами переменных и путями к ним.''' available_packages = dict() for file_name in os.listdir(self._variables_path): if file_name.startswith('__'): continue file_path = os.path.join(self._variables_path, file_name) if os.path.isdir(file_path): available_packages.update({file_name: file_path}) return available_packages def _load_package(self, package_name): '''Метод для загрузки переменных содержащихся в указанном пакете.''' self.logger.info("Loading datavars package '{}'".format(package_name)) try: self._loader.load_variables_package(package_name) except Exception as error: raise VariableError("Can not load datavars package: {}". format(error)) @property def available_packages(self): packages = set(self._available_packages) packages.update({'custom'}) return packages def __getattr__(self, package_name: str): '''Метод возвращает ноду пространства имен, соответствующего искомому пакету.''' if package_name in self.root.namespaces: return self.root[package_name] elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return self.root[package_name] elif package_name not in self._available_packages: raise VariableNotFoundError("datavars package '{}' is not found". format(package_name)) else: self._load_package(package_name) return self.root[package_name] def __getitem__(self, package_name: str) -> None: '''Метод возвращает ноду пространства имен, соответствующего искомому пакету.''' if package_name in self.root: return self.root[package_name] elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return self.root[package_name] elif package_name == 'tasks': self.create_tasks() return self.root[package_name] elif package_name not in self._available_packages: raise VariableNotFoundError("variables package '{}' is not found". format(package_name)) else: self._load_package(package_name) return self.root[package_name] def __contains__(self, package_name): if package_name in self.root.namespaces: return True elif package_name == 'custom': custom_namespace = NamespaceNode('custom') self.root.add_namespace(custom_namespace) return True elif package_name == 'tasks': self.create_tasks() return True elif (package_name not in self._available_packages and package_name != 'custom'): return False else: self._load_package(package_name) return True def add_namespace(self, namespace_node): self.root.add_namespace(namespace_node) def create_tasks(self): '''Метод для создания всех необходимых пространств имен для работы задач.''' tasks = NamespaceNode('tasks') self.add_namespace(tasks) env = NamespaceNode('env') tasks.add_namespace(env) env.add_namespace('loop') @property def namespaces(self): return self.root.namespaces def save_variables(self): '''Метод для сохранения значений переменных в calculate.ini файлах.''' target_paths = self.system.env_path saver = CalculateIniSaver() for target in self.variables_to_save: if self.variables_to_save[target]: dict_to_save = self.variables_to_save[target] target_path = target_paths[target].value saver.save_to_ini(target_path, dict_to_save) self.logger.info("Variables for '{}' is saved in the" " file: {}".format(target, target_path))