|
|
|
|
# vim: fileencoding=utf-8
|
|
|
|
|
#
|
|
|
|
|
from collections import OrderedDict
|
|
|
|
|
from jinja2 import Environment, PackageLoader
|
|
|
|
|
from typing import Callable, List, Tuple, Union
|
|
|
|
|
from pprint import pprint
|
|
|
|
|
from copy import copy
|
|
|
|
|
import re
|
|
|
|
|
try:
|
|
|
|
|
from lxml.etree.ElementTree import fromstring
|
|
|
|
|
except ImportError:
|
|
|
|
|
from xml.etree.ElementTree import fromstring
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FormatError(Exception):
|
|
|
|
|
def __init__(self, message: str, executable: bool = False):
|
|
|
|
|
super().__init__(message)
|
|
|
|
|
self.executable: bool = executable
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Format:
|
|
|
|
|
FORMAT: str = 'none'
|
|
|
|
|
CALCULATE_VERSION: Union[str, None] = None
|
|
|
|
|
SHEBANG_PATTERN: str = r"^(?P<shebang>#!\s*[\w\d\/]+\n)"
|
|
|
|
|
|
|
|
|
|
def __init__(self, processing_methods: List[Callable]):
|
|
|
|
|
self._processing_methods: List[Callable] = processing_methods
|
|
|
|
|
self._document_dictionary: OrderedDict = OrderedDict()
|
|
|
|
|
self._item_to_add: OrderedDict = OrderedDict()
|
|
|
|
|
|
|
|
|
|
self.TEMPLATES_DIRECTORY: str = 'templates'
|
|
|
|
|
|
|
|
|
|
self._fatal_error_flag: bool = False
|
|
|
|
|
self._ready_to_update: bool = False
|
|
|
|
|
self._match: bool = False
|
|
|
|
|
|
|
|
|
|
self._need_finish: bool = False
|
|
|
|
|
self._comments_processing: bool = False
|
|
|
|
|
|
|
|
|
|
self._join_before: bool = False
|
|
|
|
|
self._join_before_in_areas: bool = False
|
|
|
|
|
|
|
|
|
|
# для отладки.
|
|
|
|
|
self._line_timer: int = 0
|
|
|
|
|
|
|
|
|
|
def _lines_to_dictionary(self, document_lines: List[str]) -> None:
|
|
|
|
|
'''Основной метод для парсинга документа. Принимает список строк,
|
|
|
|
|
к каждой строке применяет парсеры, определенные для некоторого формата.
|
|
|
|
|
Первый парсер, которому удается разобрать строку используется для
|
|
|
|
|
формирования словаря.'''
|
|
|
|
|
for line in document_lines:
|
|
|
|
|
for processing_method in self._processing_methods:
|
|
|
|
|
try:
|
|
|
|
|
processing_method(line)
|
|
|
|
|
except FormatError as error:
|
|
|
|
|
self._document_dictionary = OrderedDict()
|
|
|
|
|
raise FormatError("can not parse line: {}, reason: {}".
|
|
|
|
|
format(line, str(error)))
|
|
|
|
|
|
|
|
|
|
if self._is_match():
|
|
|
|
|
if self._is_ready_to_update():
|
|
|
|
|
self._document_dictionary.update(self._item_to_add)
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
# Действия если не удалось разобрать строку.
|
|
|
|
|
self._document_dictionary = OrderedDict()
|
|
|
|
|
raise FormatError('can not parse line: {}'.
|
|
|
|
|
format(line))
|
|
|
|
|
|
|
|
|
|
if self._need_finish:
|
|
|
|
|
self._finish_method()
|
|
|
|
|
|
|
|
|
|
def _parse_xml_to_dictionary(self, xml_document_text: str) -> None:
|
|
|
|
|
'''Метод для парсинга xml файлов.
|
|
|
|
|
Файлы xml предварительно не разбиваются на строки, а разбираются с
|
|
|
|
|
помощью модуля lxml. Перевод в словарь осуществляется методами формата,
|
|
|
|
|
рекурсивно вызывающимися в зависимости от типа тега.'''
|
|
|
|
|
root = fromstring(xml_document_text)
|
|
|
|
|
self._document_dictionary = self._processing_methods[root.tag](root)
|
|
|
|
|
|
|
|
|
|
def print_dictionary(self) -> None:
|
|
|
|
|
'''Метод для отладки.'''
|
|
|
|
|
pprint(self._document_dictionary)
|
|
|
|
|
|
|
|
|
|
def join_template(self, template: "Format"):
|
|
|
|
|
'''Метод запускающий наложение шаблона.'''
|
|
|
|
|
self._join(self._document_dictionary,
|
|
|
|
|
template._document_dictionary,
|
|
|
|
|
self._join_before)
|
|
|
|
|
|
|
|
|
|
def _get_list_of_logic_lines(self, text: str) -> List[str]:
|
|
|
|
|
'''Метод разбивающий документ на список логических строк -- то есть
|
|
|
|
|
учитывающий при разбиении возможность разбиение одной строки на
|
|
|
|
|
несколько с помощью бэкслеша. В некоторых форматах переопределен.'''
|
|
|
|
|
list_of_lines = []
|
|
|
|
|
lines_to_join = []
|
|
|
|
|
for line in text.splitlines():
|
|
|
|
|
line = line.lstrip()
|
|
|
|
|
if line.rstrip() == '':
|
|
|
|
|
continue
|
|
|
|
|
if not line.endswith("\\"):
|
|
|
|
|
lines_to_join.append(line)
|
|
|
|
|
joined_line = "".join(lines_to_join)
|
|
|
|
|
list_of_lines.append(joined_line)
|
|
|
|
|
lines_to_join = []
|
|
|
|
|
else:
|
|
|
|
|
lines_to_join.append(line[:-1])
|
|
|
|
|
return list_of_lines
|
|
|
|
|
|
|
|
|
|
def _join(self, original: OrderedDict,
|
|
|
|
|
template: OrderedDict, join_before: bool):
|
|
|
|
|
'''Основной метод для наложения шаблонов путем объединения их словарей
|
|
|
|
|
выполняемого рекурсивно.'''
|
|
|
|
|
if template == OrderedDict():
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if join_before:
|
|
|
|
|
forwarded_items = OrderedDict()
|
|
|
|
|
|
|
|
|
|
for key_value in template:
|
|
|
|
|
if key_value[0] == '!':
|
|
|
|
|
# Удаление соответствующего элемента из original.
|
|
|
|
|
# Сначала получаем ключ без символа действия.
|
|
|
|
|
if isinstance(key_value, tuple):
|
|
|
|
|
item_to_delete = ('',) + key_value[1:]
|
|
|
|
|
elif isinstance(key_value, str):
|
|
|
|
|
item_to_delete = key_value[1:]
|
|
|
|
|
|
|
|
|
|
# Удаляем соответствующий элемент, если он есть в оригинале.
|
|
|
|
|
if item_to_delete in original.keys():
|
|
|
|
|
del(original[item_to_delete])
|
|
|
|
|
elif key_value[0] == '-':
|
|
|
|
|
# Замена соответствующего элемента из original.
|
|
|
|
|
# Сначала получаем ключ без символа действия.
|
|
|
|
|
if isinstance(key_value, tuple):
|
|
|
|
|
item_to_replace = ('',) + key_value[1:]
|
|
|
|
|
elif isinstance(key_value, str):
|
|
|
|
|
item_to_replace = key_value[1:]
|
|
|
|
|
else:
|
|
|
|
|
# Сюда надо вставить исключение.
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Если соответствующего элемента нет в оригинале -- пропускаем.
|
|
|
|
|
if item_to_replace not in original.keys():
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Если секция для замены в шаблоне пустая -- удаляем
|
|
|
|
|
# соответствующую секцию.
|
|
|
|
|
if isinstance(template[key_value], dict) and\
|
|
|
|
|
template[key_value] == OrderedDict():
|
|
|
|
|
original.pop(item_to_replace)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Если символ замены стоит перед параметром, а не перед
|
|
|
|
|
# секцией -- просто заменяем значение параметра.
|
|
|
|
|
if not isinstance(template[key_value], dict):
|
|
|
|
|
original[item_to_replace] = template[key_value]
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Если обработка комментариев включена -- сохраняем
|
|
|
|
|
# комментарии к заменяемой секции.
|
|
|
|
|
if self._comments_processing:
|
|
|
|
|
if '#' in original[item_to_replace]:
|
|
|
|
|
replacement = OrderedDict({'#':
|
|
|
|
|
original[item_to_replace]
|
|
|
|
|
['#']}
|
|
|
|
|
)
|
|
|
|
|
# накладываем словарь шаблона на пустой словарь, чтобы
|
|
|
|
|
# выполнить все управляющие элементы, которые
|
|
|
|
|
# могут туда попасть.
|
|
|
|
|
self._join(replacement,
|
|
|
|
|
template[key_value],
|
|
|
|
|
self._join_before_in_areas)
|
|
|
|
|
else:
|
|
|
|
|
replacement = OrderedDict()
|
|
|
|
|
self._join(replacement,
|
|
|
|
|
template[key_value],
|
|
|
|
|
self._join_before_in_areas)
|
|
|
|
|
|
|
|
|
|
# Если после наложения шаблона словарь замены оказался
|
|
|
|
|
# пустым -- удаляем соотвествующий элемент в оригинале.
|
|
|
|
|
if (replacement == OrderedDict() or
|
|
|
|
|
replacement.keys() == {'#'}):
|
|
|
|
|
del(original[item_to_replace])
|
|
|
|
|
else:
|
|
|
|
|
original[item_to_replace] = replacement
|
|
|
|
|
else:
|
|
|
|
|
original[item_to_replace] = OrderedDict()
|
|
|
|
|
self._join(original[item_to_replace],
|
|
|
|
|
template[key_value],
|
|
|
|
|
self._join_before_in_areas)
|
|
|
|
|
|
|
|
|
|
if (original[item_to_replace] == OrderedDict() or
|
|
|
|
|
original[item_to_replace].keys() == {'#'}):
|
|
|
|
|
del(original[item_to_replace])
|
|
|
|
|
|
|
|
|
|
elif key_value not in original.keys():
|
|
|
|
|
if isinstance(template[key_value], dict):
|
|
|
|
|
dictionary_to_add = OrderedDict()
|
|
|
|
|
self._join(dictionary_to_add,
|
|
|
|
|
template[key_value],
|
|
|
|
|
self._join_before_in_areas)
|
|
|
|
|
if dictionary_to_add != OrderedDict():
|
|
|
|
|
if not join_before:
|
|
|
|
|
original[key_value] = dictionary_to_add
|
|
|
|
|
else:
|
|
|
|
|
forwarded_items[key_value] = dictionary_to_add
|
|
|
|
|
else:
|
|
|
|
|
if not join_before:
|
|
|
|
|
original[key_value] = template[key_value]
|
|
|
|
|
else:
|
|
|
|
|
forwarded_items[key_value] = template[key_value]
|
|
|
|
|
else:
|
|
|
|
|
if isinstance(original[key_value], dict) and \
|
|
|
|
|
isinstance(template[key_value], dict):
|
|
|
|
|
self._join(original[key_value],
|
|
|
|
|
template[key_value],
|
|
|
|
|
self._join_before_in_areas)
|
|
|
|
|
else:
|
|
|
|
|
if self._comments_processing:
|
|
|
|
|
# Я пока еще не понял почему, но должно быть так:
|
|
|
|
|
if not original[key_value] and not template[key_value]:
|
|
|
|
|
continue
|
|
|
|
|
original[key_value][-1] = template[key_value][-1]
|
|
|
|
|
else:
|
|
|
|
|
original[key_value] = template[key_value]
|
|
|
|
|
if join_before:
|
|
|
|
|
for key_value in reversed(forwarded_items.keys()):
|
|
|
|
|
original[key_value] = forwarded_items[key_value]
|
|
|
|
|
original.move_to_end(key_value, last=False)
|
|
|
|
|
|
|
|
|
|
def make_template(self, template: "Format") -> "Format":
|
|
|
|
|
'''Метод для запуска генерации шаблонов путем сравнения пары исходных
|
|
|
|
|
файлов.'''
|
|
|
|
|
full_diff, set_to_check = self.compare_dictionaries(
|
|
|
|
|
self._document_dictionary,
|
|
|
|
|
template._document_dictionary
|
|
|
|
|
)
|
|
|
|
|
template_object = copy(self)
|
|
|
|
|
template_object._document_dictionary = full_diff
|
|
|
|
|
return template_object
|
|
|
|
|
|
|
|
|
|
def compare_dictionaries(self, dict_1: OrderedDict,
|
|
|
|
|
dict_2: OrderedDict
|
|
|
|
|
) -> Tuple[OrderedDict, set]:
|
|
|
|
|
'''Основной метод для генерации шаблонов путем сравнения пары исходных
|
|
|
|
|
файлов. Работает рекурсивно.'''
|
|
|
|
|
to_remove_dictionary = OrderedDict()
|
|
|
|
|
to_add_dictionary = OrderedDict()
|
|
|
|
|
to_replace_dictionary = OrderedDict()
|
|
|
|
|
unchanged_set = set()
|
|
|
|
|
|
|
|
|
|
to_remove = dict_1.keys() - dict_2.keys()
|
|
|
|
|
|
|
|
|
|
if '#' in to_remove:
|
|
|
|
|
to_remove.remove('#')
|
|
|
|
|
|
|
|
|
|
for key in dict_1:
|
|
|
|
|
if key in to_remove:
|
|
|
|
|
if isinstance(key, tuple):
|
|
|
|
|
new_key = ('!', *key[1:])
|
|
|
|
|
else:
|
|
|
|
|
new_key = '!{}'.format(key)
|
|
|
|
|
if isinstance(dict_1[key], dict):
|
|
|
|
|
to_remove_dictionary.update({new_key: dict_1[key]})
|
|
|
|
|
else:
|
|
|
|
|
if self._comments_processing:
|
|
|
|
|
to_remove_dictionary.update({new_key:
|
|
|
|
|
[dict_1[key][-1]]})
|
|
|
|
|
else:
|
|
|
|
|
to_remove_dictionary.update({new_key: dict_1[key]})
|
|
|
|
|
|
|
|
|
|
to_add = dict_2.keys() - dict_1.keys()
|
|
|
|
|
|
|
|
|
|
if '#' in to_add:
|
|
|
|
|
to_add.remove('#')
|
|
|
|
|
|
|
|
|
|
for key in dict_2:
|
|
|
|
|
if key in to_add:
|
|
|
|
|
if isinstance(dict_2[key], dict):
|
|
|
|
|
section = dict_2[key].copy()
|
|
|
|
|
if '#' in section:
|
|
|
|
|
section.remove('#')
|
|
|
|
|
to_add_dictionary.update({key: section})
|
|
|
|
|
else:
|
|
|
|
|
if self._comments_processing:
|
|
|
|
|
to_add_dictionary.update({key: [dict_2[key][-1]]})
|
|
|
|
|
else:
|
|
|
|
|
to_add_dictionary.update({key: dict_2[key]})
|
|
|
|
|
|
|
|
|
|
intersect = dict_1.keys() & dict_2.keys()
|
|
|
|
|
for key in intersect:
|
|
|
|
|
if (isinstance(dict_1[key], dict) and
|
|
|
|
|
isinstance(dict_2[key], dict) and
|
|
|
|
|
dict_1[key] != dict_2[key]):
|
|
|
|
|
diff, set_to_check = self.compare_dictionaries(dict_1[key],
|
|
|
|
|
dict_2[key])
|
|
|
|
|
if set_to_check:
|
|
|
|
|
to_add_dictionary.update({key: diff})
|
|
|
|
|
else:
|
|
|
|
|
if isinstance(key, tuple):
|
|
|
|
|
new_key = ('-', *key[1:])
|
|
|
|
|
else:
|
|
|
|
|
new_key = '-{}'.format(key)
|
|
|
|
|
to_replace_dictionary.update({new_key:
|
|
|
|
|
dict_2[key]})
|
|
|
|
|
elif dict_1[key] != dict_2[key]:
|
|
|
|
|
if self._comments_processing:
|
|
|
|
|
to_add_dictionary.update({key: [dict_2[key][-1]]})
|
|
|
|
|
else:
|
|
|
|
|
to_add_dictionary.update({key: dict_2[key]})
|
|
|
|
|
else:
|
|
|
|
|
unchanged_set.add(key)
|
|
|
|
|
|
|
|
|
|
full_diff = OrderedDict()
|
|
|
|
|
full_diff.update(**to_remove_dictionary,
|
|
|
|
|
**to_replace_dictionary,
|
|
|
|
|
**to_add_dictionary)
|
|
|
|
|
return full_diff, unchanged_set
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def document_text(self) -> str:
|
|
|
|
|
'''Метод для получения текста документа. Использует jinja2 для
|
|
|
|
|
рендеринга документа.'''
|
|
|
|
|
file_loader = PackageLoader('calculate.templates.format',
|
|
|
|
|
self.TEMPLATES_DIRECTORY)
|
|
|
|
|
# formats_environment = Environment(loader=file_loader,
|
|
|
|
|
# trim_blocks=True,
|
|
|
|
|
# lstrip_blocks=True)
|
|
|
|
|
formats_environment = Environment(loader=file_loader)
|
|
|
|
|
formats_environment.globals.update(zip=zip)
|
|
|
|
|
formats_environment.add_extension('jinja2.ext.do')
|
|
|
|
|
template = formats_environment.get_template(self.FORMAT)
|
|
|
|
|
document_text = template.render(
|
|
|
|
|
document_dictionary=self._document_dictionary
|
|
|
|
|
)
|
|
|
|
|
return '{}{}'.format(self.header, document_text)
|
|
|
|
|
|
|
|
|
|
def _finish_method(self):
|
|
|
|
|
'''Метод для выполнения заключительных действий парсинга.
|
|
|
|
|
Переопределяется в форматах. Вызывается при self._need_finish = True'''
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _is_ready_to_update(self) -> bool:
|
|
|
|
|
'''Метод для проверки флага self._ready_to_update, указывающего, что
|
|
|
|
|
сформированная форматом секция документа, находящаяся в
|
|
|
|
|
self._item_to_add, может быть добавлена в словарь документа.'''
|
|
|
|
|
is_ready, self._ready_to_update = self._ready_to_update, False
|
|
|
|
|
return is_ready
|
|
|
|
|
|
|
|
|
|
def _is_match(self) -> bool:
|
|
|
|
|
'''Метод для проверки флага self._is_match, указывающего что текущий
|
|
|
|
|
парсер, использованный форматом, смог распарсить строку и использовать
|
|
|
|
|
другие парсеры не нужно.'''
|
|
|
|
|
is_match, self._match = self._match, False
|
|
|
|
|
return is_match
|
|
|
|
|
|
|
|
|
|
def _get_header_and_document_text(self, input_text: str,
|
|
|
|
|
template_path: str,
|
|
|
|
|
already_changed: bool = False,
|
|
|
|
|
check_shebang: bool = False
|
|
|
|
|
) -> Tuple[str, str]:
|
|
|
|
|
'''Метод для создания заголовка измененного файла и удаления его из
|
|
|
|
|
текста исходного файла.'''
|
|
|
|
|
template_paths = []
|
|
|
|
|
|
|
|
|
|
if check_shebang:
|
|
|
|
|
# Удаление #!
|
|
|
|
|
shebang_regex = re.compile(self.SHEBANG_PATTERN)
|
|
|
|
|
shebang_result = shebang_regex.search(input_text)
|
|
|
|
|
if shebang_result is not None:
|
|
|
|
|
shebang = shebang_result.groupdict()['shebang']
|
|
|
|
|
input_text = shebang_regex.sub("", input_text)
|
|
|
|
|
else:
|
|
|
|
|
shebang = ""
|
|
|
|
|
|
|
|
|
|
header_pattern = self._get_header_pattern()
|
|
|
|
|
header_regex = re.compile(header_pattern)
|
|
|
|
|
parsing_result = header_regex.search(input_text)
|
|
|
|
|
|
|
|
|
|
if already_changed and self.comment_symbol and parsing_result:
|
|
|
|
|
for template in parsing_result.\
|
|
|
|
|
groupdict()['template_paths'].strip().split('\n'):
|
|
|
|
|
if template.startswith(self.comment_symbol):
|
|
|
|
|
template = template[len(self.comment_symbol):]
|
|
|
|
|
template_paths.append(template.strip())
|
|
|
|
|
|
|
|
|
|
template_paths.append(template_path)
|
|
|
|
|
header = self._make_header(template_paths)
|
|
|
|
|
document_text = re.sub(header_pattern, '', input_text)
|
|
|
|
|
|
|
|
|
|
if check_shebang:
|
|
|
|
|
return header, document_text, shebang
|
|
|
|
|
else:
|
|
|
|
|
return header, document_text
|
|
|
|
|
|
|
|
|
|
def _make_header(self, template_paths: list) -> str:
|
|
|
|
|
if not self.comment_symbol:
|
|
|
|
|
return ""
|
|
|
|
|
elif self.comment_symbol in ("xml", "XML"):
|
|
|
|
|
return ("<?xml version='1.0' encoding='UTF-8'?>\n" +
|
|
|
|
|
'<!--\n' +
|
|
|
|
|
'Modified by Calculate Utilities {}\n' +
|
|
|
|
|
'Processed template files:\n' +
|
|
|
|
|
'\n'.join(template_paths) + '\n' +
|
|
|
|
|
'-->\n').format(self.CALCULATE_VERSION)
|
|
|
|
|
else:
|
|
|
|
|
return ('{0}' + '-' * 79 + '\n' +
|
|
|
|
|
'{0} Modified by Calculate Utilities {1}\n' +
|
|
|
|
|
'{0} Processed template files:\n' +
|
|
|
|
|
'{0} ' + '\n{0} '.join(template_paths) + '\n' +
|
|
|
|
|
'{0}' + '-' * 79 + '\n').format(self.comment_symbol,
|
|
|
|
|
self.CALCULATE_VERSION)
|
|
|
|
|
|
|
|
|
|
def _get_header_pattern(self) -> str:
|
|
|
|
|
if self.comment_symbol in {"xml", "XML"}:
|
|
|
|
|
return (r'<!--\n' +
|
|
|
|
|
r'\s*Modified by Calculate Utilities [\d\w\.]*\n' +
|
|
|
|
|
r'\s*Processed template files:\n' +
|
|
|
|
|
r'\s*(?P<template_paths>(\s*[/\w\d\-_\.]*\n)+)' +
|
|
|
|
|
r'-->\n?')
|
|
|
|
|
else:
|
|
|
|
|
return (r'^{0}' + r'-' * 79 + r'\n' +
|
|
|
|
|
r'{0} Modified by Calculate Utilities [\d\w\.]*\n' +
|
|
|
|
|
r'{0} Processed template files:\n' +
|
|
|
|
|
r'(?P<template_paths>({0}\s*[/\w\d\-_\.]*\n)+)' +
|
|
|
|
|
r'{0}' + r'-' * 79 + r'\n?').format(self.comment_symbol)
|
|
|
|
|
|
|
|
|
|
def __bool__(self) -> bool:
|
|
|
|
|
return bool(self._document_dictionary)
|