No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

888 líneas
39 KiB

¡Este archivo contiene caracteres Unicode ambiguos!

Este archivo contiene caracteres Unicode ambiguos que pueden confundirse con otros en tu idioma actual. Si tu caso de uso es intencional y legítimo, puedes ignorar esta advertencia. Usa el botón de Escape para resaltar estos caracteres.

# vim: fileencoding=utf-8
#
import os
import logging
import importlib
import importlib.util
from jinja2 import Environment, PackageLoader
from calculate.variables.datavars import (
NamespaceNode,
VariableNode,
ListType,
IntegerType,
FloatType,
IniType,
TableType,
Namespace,
HashType,
VariableNotFoundError,
VariableError,
)
from calculate.utils.gentoo import ProfileWalker
from calculate.utils.files import read_file, FilesError
from calculate.utils.tools import Singleton
from pyparsing import (
Literal,
Word,
ZeroOrMore,
Group,
Optional,
restOfLine,
empty,
printables,
OneOrMore,
lineno,
line,
SkipTo,
LineEnd,
Combine,
nums,
)
from enum import Enum
from contextlib import contextmanager
class LoaderError(Exception):
'''Исключение выбрасываемое загрузчиком, если тот в принципе не может
загрузить переменные.'''
pass
class Define(Enum):
assign = 0
append = 1
remove = 2
class CalculateIniParser(metaclass=Singleton):
'''Класс парсера calculate.ini файлов.'''
def __init__(self):
self.operations = {"=": Define.assign,
"+=": Define.append,
"-=": Define.remove}
lbrack = Literal("[")
rbrack = Literal("]")
# comma = Literal(",").suppress()
comment_symbol = Literal(';') | Literal('#')
# Define = self.Define
value_operation = (Literal("=") | Combine(Literal("+") + Literal("="))
| Combine(Literal("-") + Literal("=")))
comment = comment_symbol + Optional(restOfLine)
section_name = (lbrack.suppress() + (~Word(nums)
+ Word(printables+'\t',
excludeChars='[]'))
+ rbrack.suppress())
value_name = Word(printables+'\t', excludeChars='=-+')
# non_comma = Word(printables+'\t', excludeChars=',')
clear_section = lbrack.suppress() + Group(empty) + rbrack.suppress()
row_index = lbrack.suppress() + Word(nums) + rbrack.suppress()
namespace_start = Group(OneOrMore(section_name)
+ (clear_section | ~lbrack)
+ LineEnd().suppress())
table_start = Group(OneOrMore(section_name)
+ (row_index | clear_section | ~lbrack)
+ LineEnd().suppress())
def add_lineno(string, location, tokens):
tokens.append(lineno(location, string))
section_start = (namespace_start('namespace') |
table_start('table'))
section_start.setParseAction(add_lineno)
# Если содержимое ini-файла не предваряется заголовком секции,
# значит эта строка ошибочна.
unexpected = Group(~section_start + SkipTo(LineEnd(),
include=True))("error")
unexpected.setParseAction(self._unexpected_token)
key_value = (~lbrack + value_name
+ value_operation + empty
+ restOfLine + LineEnd().suppress())
def process_key_value(string, location, tokens):
tokens[0] = tokens[0].strip()
tokens[1] = tokens[1].strip()
tokens.append(lineno(location, string))
key_value.setParseAction(process_key_value)
self.ini_section_parser = (section_start
+ Group(ZeroOrMore(
Group(key_value
| unexpected)))
| unexpected)
self.ini_section_parser.ignore(comment)
def parse(self, data: str):
for tokens, start, end in self.ini_section_parser.scanString(data):
if tokens.getName() == "error":
if tokens[1].strip():
yield({'error': (tokens[0], tokens[1])})
continue
section, section_lineno, defkeys = tokens
if section.getName() == 'namespace':
section_list = section.asList()
if section_list[-1] == []:
yield {'clear_section': (section_list[:-1],
section_lineno)}
else:
yield {'start_section': (section_list, section_lineno)}
for defkey in defkeys:
if defkey.getName() == "error":
if defkey[1].strip():
yield({'error': (defkey[0], defkey[1])})
continue
yield {'define_key': (defkey[0], defkey[2],
self.operations[defkey[1]],
defkey[3])}
else:
table_list = section.asList()
table_values = {}
for defkey in defkeys:
if defkey.getName() == "error":
if defkey[1].strip():
yield({'error': (defkey[0], defkey[1])})
continue
table_values.update({defkey[0]: defkey[2]})
yield {'start_table': (table_list, table_values,
section_lineno)}
def _unexpected_token(self, string, location, tokens):
'''Метод вызываемый парсером, если обнаружена некорректная строка,
предназначен для получения некорректной строки и ее дальнейшего
разбора.'''
return [lineno(location, string), line(location, string)]
class NamespaceIniFiller:
'''Класс, предназначенный для наполнения Namespace объекта переменными
из calculate.ini файла.'''
available_sections = {'custom'}
def __init__(self, restrict_creation=True):
self.ini_parser = CalculateIniParser()
self._errors = []
# Флаги, определяющие возможность создания новых переменных и новых
# пространств имен в данном пространстве имен.
self.restricted = restrict_creation
self.modify_only = False
def fill(self, namespace: NamespaceNode, ini_file_text: str) -> None:
'''Метод для разбора calculate.ini файла и добавления всех его
переменных в указанное пространство имен.'''
self.namespace = namespace
self.current_namespace = self.namespace
self._errors = []
for parsed_line in self.ini_parser.parse(ini_file_text):
self._line_processor(**parsed_line)
def _line_processor(self, start_section=None,
clear_section=None,
start_table=None,
define_key=None,
error=None, **kwargs):
'''Метод вызывающий обработку токенов, выдаваемых парсером в
зависимости от их типа.'''
if start_section is not None:
self.start_section(*start_section)
elif clear_section is not None:
self.clear_section(*clear_section)
elif start_table is not None:
self.start_table(*start_table)
elif define_key is not None:
self.define_key(*define_key)
elif error is not None:
self._set_error(error[0], 'SyntaxError', error[1])
def start_section(self, sections: str, lineno) -> None:
'''Метод для получения доступа и создания пространств имен.'''
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
self.current_namespace = self.namespace
for section in sections:
if isinstance(self.current_namespace, Datavars):
if section not in self.current_namespace:
self._set_error(lineno, 'VariableError',
"variables package '{}' is not found.".
format(section))
self.current_namespace = None
return
elif isinstance(self.current_namespace, NamespaceNode):
if section not in self.current_namespace._namespaces:
if (section in self.current_namespace._variables and
self.current_namespace[section].variable_type
is HashType):
# Если секция является хэшем, используем ее.
self.current_namespace = self.current_namespace.\
_variables[section]
return
elif not self.modify_only:
self.current_namespace.add_namespace(
NamespaceNode(section))
else:
self._set_error(lineno, 'VariableError',
"can not create namespace '{}.{}' in"
" not 'custom' namespace.".format(
self.current_namespace.get_fullname(),
section))
self.current_namespace = None
return
self.current_namespace = self.current_namespace.\
_namespaces[section]
def clear_section(self, sections: list, lineno) -> None:
'''Метод для очистки пространства имен.'''
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
current_namespace = self.namespace
for section in sections:
if isinstance(current_namespace, Datavars):
if section in current_namespace:
current_namespace = current_namespace[section]
else:
self._set_error(lineno, 'VariableError',
"can not clear"" namespace '{}'. Variables"
" package '{}' is not found.".format(
".".join(sections),
section))
return
elif isinstance(current_namespace, NamespaceNode):
if section in current_namespace._namespaces:
current_namespace = current_namespace[section]
elif (section in current_namespace._variables and
current_namespace._variables[section].variable_type
is TableType):
table_variable = current_namespace._variables[section]
table_to_clear = table_variable.get_value()
table_to_clear.clear()
table_variable.source = table_to_clear
return
else:
self._set_error(lineno, 'VariableError',
"can not clear namespace '{}'. Namespace"
" is not found.".format(
".".join(sections)))
return
if not self.modify_only:
current_namespace.clear()
else:
self._set_error(lineno, 'VariableError',
"can not clear namespace '{}' from not 'custom'"
" namespace.".format(current_namespace.
get_fullname()))
def start_table(self, sections: str, row, lineno) -> None:
'''Метод для создания и модификации таблиц.'''
if self.restricted:
self.modify_only = sections[0] not in self.available_sections
self.current_namespace = self.namespace
row_index = int(sections.pop())
table_name = sections.pop()
for section in sections:
if section not in self.current_namespace._namespaces:
if not self.modify_only:
self.current_namespace.add_namespace(
NamespaceNode(section))
else:
self._set_error(lineno, 'VariableError',
"can not create table '{}.{}', namespace"
" '{}' is not found.".format(
".".join(sections),
table_name, section))
self.current_namespace = None
return
self.current_namespace = self.current_namespace.\
_namespaces[section]
if table_name not in self.current_namespace._variables:
if not self.modify_only:
table_variable = VariableNode(table_name,
self.current_namespace,
variable_type=TableType,
source=[row])
else:
self._set_error(lineno, 'VariableError',
"can not create table '{}.{}' in not 'custom'"
" namespace.".format(self.current_namespace.
get_fullname(),
table_name))
else:
table_variable = self.current_namespace._variables[table_name]
table = table_variable.get_value()
if row_index < len(table):
table.change_row(row, row_index)
else:
table.add_row(row)
table_variable.source = table
def define_key(self, key: str, value: str, optype, lineno) -> None:
'''Метод для создания и модификации переменных.'''
if self.current_namespace is None:
return
if (isinstance(self.current_namespace, VariableNode) and
self.current_namespace.variable_type is HashType):
self.update_hash(key, value, optype, lineno)
else:
if optype == Define.assign:
if key not in self.current_namespace:
self.define_variable(key, value, lineno)
else:
self.change_value(key, value, lineno)
elif optype == Define.append:
if key not in self.current_namespace:
self.define_variable(key, value, lineno)
else:
self.append_value(key, value, lineno)
elif optype == Define.remove:
if key not in self.current_namespace:
self.define_variable(key, value, lineno)
else:
self.remove_value(key, value, lineno)
def change_value(self, key: str, value: str, lineno) -> None:
'''Метод для изменения значения переменной.'''
variable = self.current_namespace[key]
if variable.readonly:
self._set_error(lineno, 'VariableError',
"can not change readonly variable "
f"'{self.current_namespace.get_fullname()}.{key}'")
return
variable.source = value
def define_variable(self, key: str, value: str, lineno) -> None:
'''Метод для создания переменных в calculate.ini файле.'''
if not self.modify_only:
VariableNode(key, self.current_namespace, variable_type=IniType,
source=value)
else:
self._set_error(lineno, 'VariableError',
"can not create variable '{}.{}' in not 'custom'"
" namespace.".format(
self.current_namespace.get_fullname(),
key))
def append_value(self, key: str, value: str, lineno) -> None:
'''Метод выполняющий действия возложенные на оператор +=.'''
variable = self.current_namespace[key]
if variable.readonly:
self._set_error(lineno, 'VariableError',
"can not change readonly variable "
f"'{self.current_namespace.get_fullname()}.{key}'")
return
variable_value = variable.get_value()
if variable.variable_type is IniType:
value_list = value.split(',')
variable_list = variable_value.split(',')
for item in value_list:
if item not in variable_list:
variable_list.append(item.strip())
variable_value = ','.join(variable_list)
elif variable.variable_type is ListType:
value_list = value.split(',')
for item in value_list:
if item not in variable_value:
variable_value.append(item.strip())
elif variable.variable_type is IntegerType:
variable_value += int(value)
elif variable.variable_type is FloatType:
variable_value += float(value)
variable.source = variable_value
def remove_value(self, key: str, value: str, lineno) -> None:
'''Метод выполняющий действия возложенные на оператор -=.'''
variable = self.current_namespace[key]
if variable.readonly:
self._set_error(lineno, 'VariableError',
"can not change readonly variable "
f"'{self.current_namespace.get_fullname()}.{key}'")
return
variable_value = variable.get_value()
if variable.variable_type is IniType:
value_list = value.split(',')
variable_list = [item.strip() for item in
variable_value.split(',')]
for item in value_list:
if item in variable_list:
variable_list.remove(item.strip())
variable_value = ','.join(variable_list)
elif variable.variable_type is ListType:
value_list = value.split(',')
for item in value_list:
if item in variable_value:
variable_value.remove(item.strip())
elif variable.variable_type is IntegerType:
variable_value -= int(value)
elif variable.variable_type is FloatType:
variable_value -= float(value)
variable.source = variable_value
def update_hash(self, key: str, value: str, optype, lineno):
'''Метод для изменения переменных хэшей через calculate.ini.'''
if self.current_namespace.readonly:
self._set_error(lineno, 'VariableError',
"can not change readonly hash variable "
f"'{self.current_namespace.get_fullname()}'")
return
hash_to_update = self.current_namespace.get_value().get_hash()
if key not in hash_to_update:
# Если ключ отсутствует в хэше, то проверяем, является ли он
# фиксированным.
if self.current_namespace.fixed:
self._set_error(lineno, 'VariableError',
"key '{}' is unavailable for fixed hash '{}',"
" available keys: '{}'.".format(
key,
self.current_namespace.get_fullname(),
', '.join(self.current_namespace.
get_value()._fields)))
return
else:
hash_to_update.update({key: value})
elif optype == Define.assign:
hash_to_update.update({key: value})
elif optype == Define.append:
current_value = hash_to_update[key]
hash_to_update[key] = current_value + value
elif optype == Define.remove:
current_value = hash_to_update[key]
if (isinstance(current_value, int) or
isinstance(current_value, float)):
hash_to_update[key] = current_value - value
elif isinstance(current_value, str):
value_list = [item.strip() for item in value.split(',')]
current_value = [item.strip() for item in
current_value.split(',')]
for value_to_remove in value_list:
if value_to_remove in current_value:
current_value.remove(value_to_remove)
hash_to_update[key] = ','.join(current_value)
self.current_namespace.source = hash_to_update
def _set_error(self, lineno, error_type, line):
'''Метод для добавления ошибки в лог.'''
self._errors.append("{}:{}: {}".format(error_type, lineno, line))
@property
def errors(self):
errors = self._errors
self._errors = []
return errors
class VariableLoader:
'''Класс загрузчика переменных из python-файлов и из ini-файлов.'''
ini_basename = "calculate.ini"
def __init__(self, datavars, variables_path, repository_map=None):
self.datavars = datavars
self.logger = datavars.logger
self.ini_filler = NamespaceIniFiller()
self.variables_path = os.path.join(
__file__[:-len("calculate/variables/loader.py")],
variables_path)
self.variables_package = '.'.join(os.path.normpath(
variables_path).split("/"))
self.repository_map = repository_map
def load_variables_package(self, package_name: str) -> None:
'''Метод для загрузки пакетов с переменными.'''
directory_path = os.path.join(self.variables_path, package_name)
package = '{}.{}'.format(self.variables_package, package_name)
package_namespace = NamespaceNode(package_name)
self.datavars.root.add_namespace(package_namespace)
self._fill_from_package(package_namespace, directory_path, package)
def load_from_profile(self):
'''Метод для загрузки переменных из calculate.ini профиля.'''
# Проверяем наличие таблицы репозиториев в переменных.
if self.repository_map == {}:
return
if self.repository_map is None:
repositories_variable_path = ['os', 'gentoo', 'repositories']
current_namespace = self.datavars
for section in repositories_variable_path:
if section in current_namespace:
current_namespace = current_namespace[section]
else:
self.logger.error("Variable 'os.gentoo.repositories'"
" is not found. Can not load profile"
" variables.")
return
self.repository_map = self._get_repository_map(self.datavars)
# Проверяем наличие пути к профилю в переменных.
if ('profile' in self.datavars.os.gentoo and
'path' in self.datavars.os.gentoo.profile):
profile_path = self.datavars.os.gentoo.profile.path
else:
self.logger.error("Variable 'os.gentoo.profile.path'"
" is not found. Can not load profile"
" variables.")
return
self.logger.info("Load variables from profile: '{}'.".format(
profile_path))
self._fill_from_profile_ini(profile_path)
def load_user_variables(self):
'''Метод для загрузки переменных из calculate.ini указанных в
переменных env_order и env_path.'''
try:
env_order = self.datavars.main.cl.system.env_order
env_path = self.datavars.main.cl.system.env_path
except VariableNotFoundError as error:
self.logger.warning("Can not load additional variables: {}".
format(str(error)))
return
for ini_file in env_order:
self.logger.info("Loading variables from file: '{}'".format(
ini_file))
if ini_file in env_path:
self.fill_from_custom_ini(env_path[ini_file].value)
self.logger.info("Variables from '{}' are loaded".format(
ini_file))
else:
self.logger.warning("File '{}' is not found. Variables are"
" not loaded".format(ini_file))
def _fill_from_package(self, current_namespace: NamespaceNode,
directory_path: str, package: str) -> None:
'''Метод для зaполнения переменных из python-файла.'''
file_nodes = []
directory_nodes = []
# Просматриваем директорию
for node in os.scandir(directory_path):
if node.is_dir():
directory_nodes.append(node)
elif node.is_file() and node.name.endswith('.py'):
file_nodes.append(node)
# Сначала загружаем переменные из файлов.
for file_node in file_nodes:
if not file_node.name.endswith('.py'):
continue
file_name = file_node.name[:-3]
Namespace.set_current_namespace(current_namespace)
# with self.test(file_name, current_namespace):
# importlib.import_module('{}.{}'.format(package, file_name))
spec = importlib.util.spec_from_file_location(
'{}.{}'.format(package, file_name),
file_node.path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'import_variables'):
module.import_variables()
# Обходим остальные директории.
for directory_node in directory_nodes:
namespace = NamespaceNode(directory_node.name)
current_namespace.add_namespace(namespace)
self._fill_from_package(namespace, directory_node.path,
'{}.{}'.format(package,
directory_node.name))
def _fill_from_profile_ini(self, profile_path):
'''Метод для зaполнения переменных из ini-файла.'''
profile_walker = ProfileWalker(self.ini_basename,
self.repository_map)
for file_path in profile_walker.find(profile_path):
try:
ini_file_text = read_file(file_path)
self.ini_filler.fill(self.datavars, ini_file_text)
except FilesError:
self.logger.error("Can not load profile variables from"
" unexisting file: {}".format(file_path))
def _get_repository_map(self, datavars):
'''Метод для получения из переменной словаря с репозиториями и путями
к ним.'''
return {repo['name']: repo['path']
for repo in datavars.os.gentoo.repositories}
def fill_from_custom_ini(self, file_path: str):
'''Метод для заполнения переменных из конкретного указанного файла.'''
if os.path.exists(file_path):
ini_file_text = read_file(file_path)
self.ini_filler.fill(self.datavars, ini_file_text)
parsing_errors = self.ini_filler.errors
if parsing_errors:
for error in parsing_errors:
self.logger.error(error)
self.logger.warning('Some variables was not loaded.')
else:
self.logger.info('All variables are loaded.')
else:
self.logger.error("Variables are not loaded. File '{}' does"
" not exist.".format(file_path))
@contextmanager
def test(self, file_name, namespace):
'''Контекстный менеджер для тестирования.'''
print('IMPORT: {}.{}'.format(namespace.get_fullname(), file_name))
try:
yield self
finally:
print('IMPORTED FROM: {}.{}'.format(namespace.get_fullname(),
file_name))
class CalculateIniSaver:
'''Класс для сохранения значений переменных в указанные ini-файлы.'''
def __init__(self, ini_parser=None):
self.ini_parser = CalculateIniParser()
self.operations = {Define.assign: '=',
Define.append: '+=',
Define.remove: '-='}
file_loader = PackageLoader('calculate', 'variables')
environment = Environment(loader=file_loader)
self.ini_template = environment.get_template('ini_template')
def save_to_ini(self, target_path, variables_to_save):
'''Метод для сохранения переменных в указанный ini-файл.'''
ini_file_text = read_file(target_path)
ini_dictionary = self._parse_ini(ini_file_text)
for namespace in variables_to_save:
if namespace in ini_dictionary:
ini_dictionary[namespace].update(variables_to_save[namespace])
else:
ini_dictionary[namespace] = variables_to_save[namespace]
ini_file_text = self._get_ini_text(ini_dictionary)
with open(target_path, 'w') as ini_file:
ini_file.write(ini_file_text)
def _parse_ini(self, ini_file_text):
'''Метод для разбора текста ini-файла в словарь, в который далее будут
добавляться измененные переменные.'''
current_namespace = None
ini_dictionary = dict()
for parsed_line in self.ini_parser.parse(ini_file_text):
line_type = next(iter(parsed_line))
line_content = parsed_line[line_type]
if (line_type == 'start_section' or
line_type == 'start_table'):
current_namespace = tuple(line_content[0])
if current_namespace not in ini_dictionary:
ini_dictionary[current_namespace] = dict()
elif (line_type == 'clear_section' or
line_type == 'clear_table'):
current_namespace = (*line_content[0], '')
ini_dictionary[current_namespace] = dict()
elif line_type == 'define_key':
namespace = ini_dictionary[current_namespace]
namespace.update({line_content[0]:
(self.operations[line_content[2]],
line_content[1])})
return ini_dictionary
def _get_ini_text(self, ini_dictionary):
'''Метод для получения текста ini файла, полученного в результате
наложения изменений из тегов save в шаблонах.'''
ini_text = self.ini_template.render(ini_dictionary=ini_dictionary)
return ini_text
class Datavars:
'''Класс для хранения переменных и управления ими.'''
def __init__(self, variables_path='calculate/vars', repository_map=None,
logger=None):
self._variables_path = variables_path
self._available_packages = self._get_available_packages()
if logger is not None:
self.logger = logger
else:
logger = logging.getLogger("main")
# stream_handler = logging.StreamHandler()
# logger.addHandler(stream_handler)
self.logger = logger
self.root = NamespaceNode('<root>')
self._loader = VariableLoader(self, self._variables_path,
repository_map=repository_map)
Namespace.reset()
Namespace.set_datavars(self)
self._loader.load_from_profile()
self._loader.load_user_variables()
# Создаем словарь переменных, которые нужно сохранить потом в
# ini-файлах.
try:
self.variables_to_save = {target: dict() for target in
self.main.cl.system.env_order
if target in
self.main.cl.system.env_path}
except VariableNotFoundError:
self.variables_to_save = dict()
def reset(self):
'''Метод для сброса модуля переменных.'''
self.root.clear()
self.root = NamespaceNode('<root>')
self._available_packages.clear()
self._available_packages = self._get_available_packages()
Namespace.set_datavars(self)
def _get_available_packages(self) -> dict:
'''Метод для получения словаря с имеющимися пакетами переменных
и путями к ним.'''
variables_path = os.path.join(
__file__[:-len("calculate/variables/loader.py")],
self._variables_path)
available_packages = dict()
for file_name in os.listdir(variables_path):
if file_name.startswith('__'):
continue
file_path = os.path.join(variables_path, file_name)
if os.path.isdir(file_path):
available_packages.update({file_name: file_path})
return available_packages
def _load_package(self, package_name):
'''Метод для загрузки переменных содержащихся в указанном пакете.'''
self.logger.info("Loading datavars package '{}'".format(package_name))
try:
self._loader.load_variables_package(package_name)
except Exception as error:
raise VariableError("Can not load datavars package: {}".
format(error))
@property
def available_packages(self):
packages = set(self._available_packages)
packages.update({'custom'})
return packages
def __getattr__(self, package_name: str):
'''Метод возвращает ноду пространства имен, соответствующего искомому
пакету.'''
if package_name in self.root._namespaces:
return self.root[package_name]
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return self.root[package_name]
elif package_name not in self._available_packages:
raise VariableNotFoundError("datavars package '{}' is not found".
format(package_name))
else:
self._load_package(package_name)
return self.root[package_name]
def __getitem__(self, package_name: str) -> None:
'''Метод возвращает ноду пространства имен, соответствующего искомому
пакету.'''
if package_name in self.root:
return self.root[package_name]
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return self.root[package_name]
elif package_name == 'tasks':
self.create_tasks()
return self.root[package_name]
elif package_name not in self._available_packages:
raise VariableNotFoundError("variables package '{}' is not found".
format(package_name))
else:
self._load_package(package_name)
return self.root[package_name]
def __contains__(self, package_name):
if package_name in self.root._namespaces:
return True
elif package_name == 'custom':
custom_namespace = NamespaceNode('custom')
self.root.add_namespace(custom_namespace)
return True
elif package_name == 'tasks':
self.create_tasks()
return True
elif (package_name not in self._available_packages
and package_name != 'custom'):
return False
else:
self._load_package(package_name)
return True
def add_namespace(self, namespace_node):
self.root.add_namespace(namespace_node)
def create_tasks(self):
'''Метод для создания всех необходимых пространств имен для работы
задач.'''
tasks = NamespaceNode('tasks')
self.add_namespace(tasks)
env = NamespaceNode('env')
tasks.add_namespace(env)
env.add_namespace('loop')
@property
def _namespaces(self):
return self.root._namespaces
def save_variables(self):
'''Метод для сохранения значений переменных в calculate.ini файлах.'''
target_paths = self.main.cl.system.env_path
saver = CalculateIniSaver()
for target in self.variables_to_save:
if self.variables_to_save[target]:
dict_to_save = self.variables_to_save[target]
target_path = target_paths[target].value
saver.save_to_ini(target_path, dict_to_save)
self.logger.info("Variables for '{}' is saved in the"
" file: {}".format(target, target_path))