You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

541 lines
25 KiB

4 years ago
# vim: fileencoding=utf-8
#
from .base_format import Format
4 years ago
from collections import OrderedDict
from pyparsing import originalTextFor, Literal, Word, printables, OneOrMore,\
alphanums, ParseException, restOfLine, nums,\
delimitedList, Optional, Keyword, SkipTo, Group, Regex
4 years ago
class LDAPFormat(Format):
FORMAT = 'ldap'
EXECUTABLE = False
_initialized = False
comment_symbol = '#'
def __new__(cls, *args, **kwargs):
if not cls._initialized:
cls._initialize_parser()
return super().__new__(cls)
def __init__(self, document_text: str,
template_path,
ignore_comments=False,
join_before=False,
add_header=False,
already_changed=False,
**kwargs):
4 years ago
processing_methods = [self._parse_comment_line,
self._parse_type_line,
self._parse_access_line,
self._parse_access_line_to_delete,
self._parse_syncrepl_line,
self._parse_syncrepl_line_to_delete,
self._parse_notunique_line,
self._parse_index_line,
self._parse_index_line_to_delete,
self._parse_plain_directive_line,
self._parse_plain_directive_line_to_delete]
super().__init__(processing_methods)
self._ignore_comments = ignore_comments
self._comments_processing = True
self._join_before = join_before
4 years ago
self._need_finish = True
if self._ignore_comments:
self._current_type_section = OrderedDict()
else:
self._current_type_section = OrderedDict({'#': []})
self._current_type = ('', 'global')
self._last_comments_list = []
if add_header and not ignore_comments:
self.header, document_text = self._get_header_and_document_text(
document_text,
template_path,
already_changed=already_changed)
else:
self.header = ''
document_text = document_text.strip()
4 years ago
if document_text == '':
self._document_dictionary = OrderedDict()
else:
document_lines = self._get_list_of_logic_lines(document_text)
self._lines_to_dictionary(document_lines)
@classmethod
def _initialize_parser(cls):
'''Метод для инициализации парсеров.'''
cls._comment_line = originalTextFor(
Literal(cls.comment_symbol)
+ Regex(r'.*'))('comment')
4 years ago
action_symbols = (Literal('!') | Literal('-'))
assignment = Literal('=')
# Для парсинга строк c директивами неуникальными для секции.
not_unique_directives = originalTextFor(
Keyword('include') |
Keyword('moduleload')
)
not_unique_value = originalTextFor(
OneOrMore(Word(printables))
)('value')
cls._not_unique_parser = (Optional(action_symbols,
default='')('action')
+ not_unique_directives
+ not_unique_value + restOfLine.suppress())
4 years ago
# Для выделения областей global, backend и database.
type_sections_keywords = originalTextFor(
Keyword('backend') |
Keyword('database')
)
type_value = originalTextFor(Word(alphanums))
cls._type_line = (Optional(action_symbols, default='')('action')
+ type_sections_keywords
+ type_value
+ restOfLine.suppress())
4 years ago
# Для парсинга конструкции syncrepl rid=<replica ID> <parameters>
content_without_spaces = Word(printables, excludeChars='"')
content_with_spaces = (Literal('"')
+ OneOrMore(Word(printables,
excludeChars='"'))
+ Literal('"'))
parameter_without_spaces = (Word(printables, excludeChars='"=')
+ assignment.suppress()
+ content_without_spaces)
parameter_with_spaces = (Word(printables, excludeChars='"=')
+ assignment.suppress()
+ content_with_spaces)
values = OneOrMore(originalTextFor(parameter_with_spaces |
parameter_without_spaces))('Values')
syncrepl_replica_id = originalTextFor(Literal('rid')
+ assignment.suppress()
+ Word(nums))('replicaID')
cls._syncrepl_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ Keyword('syncrepl')
+ syncrepl_replica_id)('name')
+ values('Values')
+ restOfLine.suppress())
cls._syncrepl_value_parser = (Group(Optional(action_symbols,
4 years ago
default='')('action')
+ originalTextFor(
4 years ago
Word(
printables,
excludeChars='"='
)
))('name')
+ assignment.suppress()
+ originalTextFor(
OneOrMore(
Word(printables)
)
)('value'))
cls._syncrepl_line_to_delete_parser = (Group(Optional(
4 years ago
action_symbols,
default=''
)('action')
+ Keyword('syncrepl')
+ syncrepl_replica_id)('name')
+ restOfLine.suppress())
4 years ago
# Для парсинга конструкции
# access to <what> by <who>|<access level>|<control>
access_keyword = originalTextFor(Literal('access to'))('keyword')
value = originalTextFor(parameter_with_spaces |
parameter_without_spaces |
content_without_spaces |
content_with_spaces)
cls._access_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ access_keyword
+ value)('name')
+ Keyword('by').suppress()
+ delimitedList(
4 years ago
originalTextFor(value +
SkipTo(
Keyword('by'),
include=False) |
restOfLine
),
delim='by'
)('Values'))
4 years ago
cls._access_value_parser = (Group(Optional(action_symbols,
default='')('action')
+ originalTextFor(value))('name')
+ originalTextFor(
4 years ago
Optional(Word(alphanums))
)('value'))
4 years ago
cls._access_line_to_delete_parser = (Group(action_symbols('action')
+ access_keyword
+ value
+ restOfLine.suppress())('name'))
4 years ago
# Для парсинга строк с директивами index.
cls._index_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ Keyword('index')
+ originalTextFor(Word(printables))
)('name')
+ originalTextFor(
4 years ago
OneOrMore(Word(printables))
)('value'))
cls._index_line_to_delete_parser = (Group(action_symbols('action')
+ Keyword('index')
+ originalTextFor(
4 years ago
Word(printables)
))('name'))
# Для парсинга остальных директив.
cls._directive_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ originalTextFor(
4 years ago
Word(printables)
))('name')
+ originalTextFor(
4 years ago
OneOrMore(Word(
printables
)
))('value'))
cls._directive_line_to_delete_parser = (action_symbols('action')
+ originalTextFor(
4 years ago
Word(printables)
))('name')
cls._initialized = True
4 years ago
def _get_list_of_logic_lines(self, text):
'''Метод для разбиения исходного документа на список логических строк,
то есть с учетом того, что строка ldap файла начинающаяся с отступа
является продолжением предыдущей.'''
4 years ago
list_of_lines = []
lines_to_join = []
for line in text.splitlines():
if line.strip() == '':
continue
if not line.startswith(' ') and not line.startswith('\t'):
joined_line = "".join(lines_to_join)
if joined_line != '':
list_of_lines.append(joined_line)
lines_to_join = []
line.strip()
else:
line = ' ' + line.strip()
lines_to_join.append(line)
joined_line = "".join(lines_to_join)
list_of_lines.append(joined_line)
return list_of_lines
def _parse_type_line(self, line):
'''Метод для парсинга строк с объявлением областей backend или database
'''
4 years ago
try:
self._item_to_add = OrderedDict()
parsing_result = self._type_line.parseString(line)
self._match = True
type_name = tuple(parsing_result.asList())
if self._current_type in self._document_dictionary.keys():
self._document_dictionary[self._current_type].update(
self._current_type_section
)
else:
self._item_to_add[self._current_type] =\
self._current_type_section
self._ready_to_update = True
# Если глобальная область пуста -- передаем ее комментарии
# следующей за ней области.
if self._current_type == ('', 'global') and\
list(self._current_type_section.keys()) == ['#']:
self._last_comments_list = self._current_type_section['#']
self._item_to_add[self._current_type] = OrderedDict()
self._current_type_section = OrderedDict()
self._current_type = type_name
if self._last_comments_list != []:
self._current_type_section['#'] = self._last_comments_list
self._last_comments_list = []
except ParseException:
return
def _parse_notunique_line(self, line):
'''Метод для парсинга строк c директивами неуникальными для секции.
Их приходится парсить полностью как ключ словаря.'''
4 years ago
try:
self._item_to_add = OrderedDict()
parsing_result = self._not_unique_parser.parseString(line)
self._match = True
parsing_result["value"] = parsing_result.value.strip()
4 years ago
not_unique_name = tuple(parsing_result.asList())
parameter_value = ['']
parameter_value = self._last_comments_list + parameter_value
self._last_comments_list = []
self._current_type_section[not_unique_name] = parameter_value
except ParseException:
return
def _parse_access_line(self, line):
'''Метод для парсинга строк содержащих конструкцию
access to <what> by <who>|<access level>|<control>.'''
4 years ago
try:
parsing_result = self._access_line_parser.parseString(line)
self._match = True
values = [value.strip() for value in
parsing_result.Values.asList()]
values.reverse()
parameter_name = tuple(parsing_result.name)
value_dictionary = OrderedDict()
if self._last_comments_list != []:
value_dictionary['#'] = self._last_comments_list
self._last_comments_list = []
for value in values:
try:
value_parsing = self._access_value_parser.\
parseString(value)
param_name = tuple(value_parsing.name)
param_value = value_parsing.value
value_dictionary[param_name] = [param_value]
except ParseException:
continue
parameter_value = value_dictionary
if parameter_name in self._current_type_section.keys():
self._current_type_section[parameter_name].update(
value_dictionary
)
else:
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_access_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление конструкций
access to, если указано только ее название и значение What.'''
4 years ago
try:
parsing_result = self._access_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = OrderedDict({'#': self._last_comments_list})
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_syncrepl_line(self, line):
'''Метод для парсинга строк содержащих конструкцию syncrepl
rep=<ReplicaID>.'''
4 years ago
try:
parsing_result = self._syncrepl_line_parser.parseString(line)
self._match = True
values = [value.strip() for value in
parsing_result.Values.asList()]
4 years ago
parameter_name = tuple(parsing_result.name.asList())
value_dictionary = OrderedDict()
if self._last_comments_list != []:
value_dictionary['#'] = self._last_comments_list
self._last_comments_list = []
for value in values:
try:
value_parsing = self._syncrepl_value_parser.parseString(
value
)
param_name = tuple(value_parsing.name.asList())
param_value = value_parsing.value
value_dictionary[param_name] = [param_value]
except ParseException:
continue
parameter_value = value_dictionary
if parameter_name in self._current_type_section.keys():
self._current_type_section[parameter_name].update(
value_dictionary
)
else:
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_syncrepl_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление конструкций
syncrepl rid=<ReplicaID>, если указано только ее название и значение
ReplicaID.'''
4 years ago
try:
parsing_result = self._syncrepl_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = OrderedDict({'#': self._last_comments_list})
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_index_line(self, line):
'''Метод для парсинга строк с директивами index.'''
4 years ago
try:
parsing_result = self._index_line_parser.parseString(line)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
parameter_value = parsing_result.value
parameter_value = self._last_comments_list + [parameter_value]
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_index_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление директив index,
если указано только из имя, но отсутвует значение.'''
4 years ago
try:
parsing_result = self._index_line_to_delete_parser.parseString(
line
)
4 years ago
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = self._last_comments_list
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_plain_directive_line(self, line):
'''Метод для парсинга строк с простыми уникальными для секции
директивами.'''
4 years ago
try:
parsing_result = self._directive_line_parser.parseString(line)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
parameter_value = parsing_result.value
parameter_value = self._last_comments_list + [parameter_value]
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_plain_directive_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление простых уникальных
директив, если указано только их имя, но отсутствует значение.'''
4 years ago
try:
parsing_result = self._directive_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.action == '-':
return
parameter_value = self._last_comments_list
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_comment_line(self, line):
'''Метод для парсинга строк с комментариями и добавления их в список
комментариев _last_comments_list, предназначенный для сбора
комментариев и последующего их присваивания параметрам и секциям.'''
4 years ago
try:
parsing_result = self._comment_line.parseString(line)
self._match = True
if not self._ignore_comments:
# До того, как первый элемент встречен -- все комментарии
# должны быть присвоены глобальной области.
if self._current_type == ('', 'global') and\
list(self._current_type_section.keys()) == ['#']:
self._current_type_section['#'].append(
parsing_result.comment
)
else:
self._last_comments_list.append(parsing_result.comment)
except ParseException:
return
def _finish_method(self):
'''Метод для завершения парсинга. В данном случае добавляет в итоговый
словарь последнюю разобранную область.'''
4 years ago
self._item_to_add = OrderedDict()
if self._current_type in self._document_dictionary.keys():
self._document_dictionary[self._current_type].update(
self._current_type_section
)
else:
self._item_to_add[self._current_type] = self._current_type_section
self._document_dictionary.update(self._item_to_add)
self._item_to_add = OrderedDict()
self._current_type_section = OrderedDict()