You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
27 KiB

import os
import importlib
import importlib.util
from calculate.variables.datavars import NamespaceNode, VariableNode,\
ListType, IntegerType,\
FloatType, IniType, TableType,\
Namespace, VariableError, HashType
from calculate.utils.gentoo import ProfileWalker
from calculate.utils.files import read_file, FilesError
from pyparsing import Literal, Word, ZeroOrMore, Group, Optional, restOfLine,\
empty, printables, OneOrMore, lineno, line, col, SkipTo,\
LineEnd, Combine, nums
from enum import Enum
from contextlib import contextmanager
class Define(Enum):
assign = 0
append = 1
remove = 2
class CalculateIniParser:
'''Класс парсера calculate.ini файлов.'''
def __init__(self):
self._errors = []
self.operations = {"=": Define.assign,
"+=": Define.append,
"-=": Define.remove}
lbrack = Literal("[")
rbrack = Literal("]")
# comma = Literal(",").suppress()
comment_symbol = Literal(';') | Literal('#')
# Define = self.Define
value_operation = (Literal("=") | Combine(Literal("+") + Literal("="))
| Combine(Literal("-") + Literal("=")))
comment = comment_symbol + Optional(restOfLine)
section_name = (lbrack.suppress() + (~Word(nums)
+ Word(printables+'\t',
excludeChars='[]'))
+ rbrack.suppress())
value_name = Word(printables+'\t', excludeChars='=-+')
# non_comma = Word(printables+'\t', excludeChars=',')
clear_section = lbrack.suppress() + Group(empty) + rbrack.suppress()
row_index = lbrack.suppress() + Word(nums) + rbrack.suppress()
namespace_start = Group(OneOrMore(section_name)
+ (clear_section | ~lbrack)
+ LineEnd().suppress())
table_start = Group(OneOrMore(section_name)
+ (row_index | clear_section | ~lbrack)
+ LineEnd().suppress())
section_start = (namespace_start('namespace') |
table_start('table'))
# Если содержимое ini-файла не предваряется заголовком секции,
# значит эта строка ошибочна.
unexpected = Group(~section_start + SkipTo(LineEnd(),
include=True))("error")
unexpected.setParseAction(self._unexpected_token)
key_value = (~lbrack + value_name
+ value_operation + empty
+ restOfLine + LineEnd().suppress())
def strip_key_value(tokens):
tokens[0] = tokens[0].strip()
tokens[1] = tokens[1].strip()
key_value.setParseAction(strip_key_value)
self.ini_section_parser = (section_start
+ Group(ZeroOrMore(
Group(key_value | unexpected)))
| unexpected)
self.ini_section_parser.ignore(comment)
def parse(self, data: str):
for tokens, start, end in self.ini_section_parser.scanString(data):
if tokens.getName() == "error":
continue
section, defkeys = tokens
if section.getName() == 'namespace':
section_list = section.asList()
if section_list[-1] == []:
yield {'clear_section': (section_list[:-1], )}
else:
yield {'start_section': (section_list, )}
for defkey in defkeys:
if defkey.getName() == "error":
continue
yield {'define_key': (defkey[0], defkey[2],
self.operations[defkey[1]])}
else:
table_list = section.asList()
if table_list[-1] == []:
yield {'clear_table': (table_list[:-1], )}
else:
table_values = {}
for defkey in defkeys:
if defkey.getName() == "error":
continue
table_values.update({defkey[0]: defkey[2]})
yield {'start_table': (table_list, table_values)}
def _unexpected_token(self, string, location, tokens):
'''Метод вызываемый парсером, если обнаружена некорректная строка,
предназначен для получения некорректной строки и ее дальнейшего
разбора.'''
error_line = line(location, string).strip()
if error_line:
self._errors.append((error_line, lineno(location, string),
col(location, string)))
@property
def errors(self):
errors = self._errors
self._errors = []
return errors
class NamespaceIniFiller:
'''Класс, предназначенный для наполнения Namespace объекта переменными
из calculate.ini файла.'''
available_sections = {'custom'}
def __init__(self, restrict_creation=True):
self.ini_parser = CalculateIniParser()
self.restricted = restrict_creation
self.modify_only = False
def error(self, lineno, error_message):
self.errors.append(lineno, error_message)
def fill(self, namespace: NamespaceNode, ini_file_text: str) -> None:
'''Метод для разбора calculate.ini файла и добавления всех его
переменных в указанное пространство имен.'''
self.namespace = namespace
self.current_namespace = self.namespace
self.errors = []
for parsed_line in self.ini_parser.parse(ini_file_text):
self._line_processor(**parsed_line)
def _line_processor(self, start_section=None,
clear_section=None,
start_table=None,
clear_table=None,
define_key=None,
error=None, **kwargs):
'''Метод вызывающий обработку токенов, выдаваемых парсером в
зависимости от их типа.'''
if start_section is not None:
self.start_section(*start_section)
elif clear_section is not None:
self.clear_section(*clear_section)
elif start_table is not None:
self.start_table(*start_table)
elif clear_table is not None:
self.clear_table(*clear_table)
elif define_key is not None:
self.define_key(*define_key)
for error in self.ini_parser.errors:
self.set_error(*error)
def start_section(self, sections: str) -> None:
'''Метод для получения доступа и создания пространств имен.'''
print('SECTIONS: {}'.format(sections))
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
self.current_namespace = self.namespace
for section in sections:
print('START SECTION: {}'.format(section))
if isinstance(self.current_namespace, Datavars):
if section not in self.current_namespace:
raise VariableError("variables package '{}' is not found".
format(section))
elif isinstance(self.current_namespace, NamespaceNode):
if section not in self.current_namespace.namespaces:
if (section in self.current_namespace.variables and
self.current_namespace[section].variable_type
is HashType):
# Если секция является хэшем, используем ее.
self.current_namespace = self.current_namespace.\
variables[section]
return
else:
if not self.modify_only:
self.current_namespace.add_namespace(
NamespaceNode(section))
else:
self.current_namespace = None
return
self.current_namespace = self.current_namespace.namespaces[section]
def clear_section(self, sections: list) -> None:
'''Метод для очистки пространства имен.'''
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
current_namespace = self.namespace
for section in sections:
if (isinstance(current_namespace, Datavars) and
section in current_namespace):
current_namespace = current_namespace[section]
elif isinstance(current_namespace, NamespaceNode):
if section in current_namespace.namespaces:
current_namespace = current_namespace[section]
elif (section in current_namespace.variables and
current_namespace.variables[section].variable_type
is TableType):
table_variable = current_namespace.variables[section]
table_to_clear = table_variable.get_value()
table_to_clear.clear()
table_variable.source = table_to_clear
return
else:
return
if not self.modify_only:
current_namespace.clear()
def start_table(self, sections: str, row) -> None:
'''Метод для создания и модификации таблиц.'''
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
self.current_namespace = self.namespace
row_index = int(sections.pop())
table_name = sections.pop()
for section in sections:
if section not in self.current_namespace.namespaces:
if not self.modify_only:
self.current_namespace.add_namespace(
NamespaceNode(section))
else:
self.current_namespace = None
return
self.current_namespace = self.current_namespace.namespaces[section]
if table_name not in self.current_namespace.variables:
if not self.modify_only:
table_variable = VariableNode(table_name,
self.current_namespace,
variable_type=TableType,
source=[row])
else:
table_variable = self.current_namespace.variables[table_name]
table = table_variable.get_value()
if row_index < len(table):
table.change_row(row, row_index)
else:
table.add_row(row)
table_variable.source = table
def define_key(self, key: str, value: str, optype) -> None:
print('DEFINE KEY: {} -> {} TO {}'.format(key, value,
self.current_namespace))
if self.current_namespace is None:
return
if (isinstance(self.current_namespace, VariableNode) and
self.current_namespace.variable_type is HashType):
self.update_hash(key, value, optype)
else:
if optype == Define.assign:
if key not in self.current_namespace:
self.define_variable(key, value)
else:
self.change_value(key, value)
elif optype == Define.append:
if key not in self.current_namespace:
self.define_variable(key, value)
else:
self.append_value(key, value)
elif optype == Define.remove:
if key not in self.current_namespace:
self.define_variable(key, value)
else:
self.remove_value(key, value)
def change_value(self, key: str, value: str) -> None:
'''Метод для изменения значения переменной.'''
variable = self.current_namespace[key]
variable.source = value
def define_variable(self, key: str, value: str) -> None:
'''Метод для создания переменных в calculate.ini файле.'''
if not self.modify_only:
VariableNode(key, self.current_namespace, variable_type=IniType,
source=value)
else:
# TODO Какая-то обработка ошибки.
pass
def append_value(self, key: str, value: str) -> None:
'''Метод выполняющий действия возложенные на оператор +=.'''
variable = self.current_namespace[key]
variable_value = variable.get_value()
if variable.variable_type is IniType:
value_list = value.split(',')
variable_list = variable_value.split(',')
for item in value_list:
if item not in variable_list:
variable_list.append(item.strip())
variable_value = ','.join(variable_list)
elif variable.variable_type is ListType:
value_list = value.split(',')
for item in value_list:
if item not in variable_value:
variable_value.append(item.strip())
elif variable.variable_type is IntegerType:
variable_value += int(value)
elif variable.variable_type is FloatType:
variable_value += float(value)
variable.source = variable_value
def remove_value(self, key: str, value: str) -> None:
'''Метод выполняющий действия возложенные на оператор -=.'''
variable = self.current_namespace[key]
variable_value = variable.get_value()
if variable.variable_type is IniType:
value_list = value.split(',')
variable_list = [item.strip() for item in
variable_value.split(',')]
for item in value_list:
if item in variable_list:
variable_list.remove(item.strip())
variable_value = ','.join(variable_list)
elif variable.variable_type is ListType:
value_list = value.split(',')
for item in value_list:
if item in variable_value:
variable_value.remove(item.strip())
elif variable.variable_type is IntegerType:
variable_value -= int(value)
elif variable.variable_type is FloatType:
variable_value -= float(value)
variable.source = variable_value
def update_hash(self, key: str, value: str, optype):
'''Метод для изменения переменных хэшей через calculate.ini.'''
hash_to_update = self.current_namespace.get_value().get_hash()
if key not in hash_to_update:
# Если ключ отсутствует в хэше, то проверяем, является ли он
# фиксированным.
if self.current_namespace.fixed:
raise VariableError("key '{}' is unavailable for fixed"
" hash, available keys: '{}'".
format(key,
', '.join(self.current_namespace.
get_value()._fields)))
else:
hash_to_update.update({key: value})
elif optype == Define.assign:
hash_to_update.update({key: value})
elif optype == Define.append:
current_value = hash_to_update[key]
hash_to_update[key] = current_value + value
elif optype == Define.remove:
current_value = hash_to_update[key]
if (isinstance(current_value, int) or
isinstance(current_value, float)):
hash_to_update[key] = current_value - value
elif isinstance(current_value, str):
value_list = [item.strip() for item in value.split(',')]
current_value = [item.strip() for item in
current_value.split(',')]
for value_to_remove in value_list:
if value_to_remove in current_value:
current_value.remove(value_to_remove)
hash_to_update[key] = ','.join(current_value)
self.current_namespace.source = hash_to_update
def set_error(self, line, lineno, col):
'''Метод для добавления ошибки в лог.'''
self.error(lineno, "Syntax error: {}".format(line))
class VariableLoader:
'''Класс загрузчика переменных из python-файлов и из ini-файлов.'''
ini_basename = "calculate.ini"
def __init__(self, datavars, variables_path, repository_map=None):
self.datavars = datavars
self.ini_filler = NamespaceIniFiller()
self.variables_path = variables_path
self.variables_package = '.'.join(variables_path.split('/'))
self.repository_map = repository_map
def load_variables_package(self, package_name: str) -> None:
'''Метод для загрузки пакетов с переменными.'''
directory_path = os.path.join(self.variables_path, package_name)
package = '{}.{}'.format(self.variables_package, package_name)
package_namespace = NamespaceNode(package_name)
self.datavars.root.add_namespace(package_namespace)
self._fill_from_package(package_namespace, directory_path, package)
def load_from_profile(self):
'''Метод для загрузки переменных из calculate.ini профиля.'''
# Проверяем наличие таблицы репозиториев в переменных.
if self.repository_map == {}:
return
if self.repository_map is None:
repositories_variable_path = ['os', 'gentoo', 'repositories']
current_namespace = self.datavars
for section in repositories_variable_path:
if section in current_namespace:
current_namespace = current_namespace[section]
else:
# TODO детальнее продумать действия при отсутствии нужной
# переменной.
return
self.repository_map = self._get_repository_map(self.datavars)
# Проверяем наличие пути к профилю в переменных.
if ('profile' in self.datavars.os.gentoo and
'path' in self.datavars.os.gentoo.profile):
profile_path = self.datavars.os.gentoo.profile.path
else:
# TODO детальнее продумать действия при отсутствии нужной
# переменной.
return
self._fill_from_ini(profile_path)
def load_user_variables(self):
'''Метод для загрузки переменных из calculate.ini указанных в
переменных env_order и env_path.'''
if ('system' in self.datavars and 'env_order' in self.datavars.system
and 'env_path' in self.datavars.system):
env_order = self.datavars.system.env_order
env_path = self.datavars.system.env_path
for ini_file in env_order:
if ini_file in env_path:
self.fill_from_custom_ini(env_path[ini_file].value)
def _fill_from_package(self, current_namespace: NamespaceNode,
directory_path: str, package: str) -> None:
'''Метод для зaполнения переменных из python-файла.'''
file_nodes = []
directory_nodes = []
# Просматриваем директорию
for node in os.scandir(directory_path):
if node.is_dir():
directory_nodes.append(node)
elif node.is_file() and node.name.endswith('.py'):
file_nodes.append(node)
# Сначала загружаем переменные из файлов.
for file_node in file_nodes:
file_name = file_node.name[:-3]
Namespace.set_current_namespace(current_namespace)
with self.test(file_name, current_namespace):
# importlib.import_module('{}.{}'.format(package, file_name))
spec = importlib.util.spec_from_file_location(
'{}.{}'.format(package, file_name),
file_node.path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Обходим остальные директории.
for directory_node in directory_nodes:
namespace = NamespaceNode(directory_node.name)
current_namespace.add_namespace(namespace)
self._fill_from_package(namespace, directory_node.path,
'{}.{}'.format(package,
directory_node.name))
def _fill_from_ini(self, profile_path):
'''Метод для зaполнения переменных из ini-файла.'''
profile_walker = ProfileWalker(self.ini_basename,
self.repository_map)
for file_path in profile_walker.find(profile_path):
try:
ini_file_text = read_file(file_path)
self.ini_filler.fill(self.datavars, ini_file_text)
except FilesError:
# TODO продумать обработку ошибок.
pass
def _get_repository_map(self, datavars):
'''Метод для получения из переменной словаря с репозиториями и путями
к ним.'''
return {repo['name']: repo['path']
for repo in datavars.os.gentoo.repositories}
def fill_from_custom_ini(self, file_path: str):
'''Метод для заполнения переменных из конкретного указанного файла.'''
print('LOAD FROM INI: {}'.format(file_path))
if os.path.exists(file_path):
ini_file_text = read_file(file_path)
self.ini_filler.fill(self.datavars, ini_file_text)
@contextmanager
def test(self, file_name, namespace):
print('IMPORT: {}.{}'.format(namespace.get_fullname(), file_name))
try:
yield self
finally:
print('IMPORTED FROM: {}.{}'.format(namespace.get_fullname(),
file_name))
class Datavars:
'''Класс для хранения переменных и управления ими.'''
def __init__(self, variables_path='calculate/vars', repository_map=None):
self._variables_path = variables_path
self._available_packages = self._get_available_packages()
self.root = NamespaceNode('<root>')
self._loader = VariableLoader(self, self._variables_path,
repository_map=repository_map)
Namespace.reset()
Namespace.set_datavars(self)
self._loader.load_from_profile()
self._loader.load_user_variables()
def reset(self):
'''Метод для сброса модуля переменных.'''
self.root.clear()
self.root = NamespaceNode('<root>')
self._available_packages.clear()
self._available_packages = self._get_available_packages()
Namespace.set_datavars(self)
def _get_available_packages(self) -> dict:
'''Метод для получения словаря с имеющимися пакетами переменных
и путями к ним.'''
available_packages = dict()
for file_name in os.listdir(self._variables_path):
if file_name.startswith('__'):
continue
file_path = os.path.join(self._variables_path, file_name)
if os.path.isdir(file_path):
available_packages.update({file_name: file_path})
return available_packages
def __getattr__(self, package_name: str):
'''Метод возвращает ноду пространства имен, соответствующего искомому
пакету.'''
if package_name in self.root.namespaces:
return self.root[package_name]
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return self.root[package_name]
elif package_name not in self._available_packages:
raise VariableError("variables package '{}' is not found".
format(package_name))
else:
self._loader.load_variables_package(package_name)
return self.root[package_name]
def __getitem__(self, package_name: str) -> None:
'''Метод возвращает ноду пространства имен, соответствующего искомому
пакету.'''
if package_name in self.root:
return self.root[package_name]
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return self.root[package_name]
elif package_name not in self._available_packages:
raise VariableError("variables package '{}' is not found".
format(package_name))
else:
self._loader.load_variables_package(package_name)
return self.root[package_name]
def __contains__(self, package_name):
if package_name in self.root.namespaces:
return True
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return True
elif (package_name not in self._available_packages
and package_name != 'custom'):
return False
else:
self._loader.load_variables_package(package_name)
return True
@property
def namespaces(self):
return self.root.namespaces