You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

523 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
from .base_format import BaseFormat
from collections import OrderedDict
from pyparsing import originalTextFor, Literal, ZeroOrMore, Word, printables,\
OneOrMore, alphanums, ParseException, restOfLine,\
pyparsing_unicode, nums, delimitedList, Optional,\
Keyword, SkipTo, Group
class LDAPFormat(BaseFormat):
def __init__(self, document_text: str, ignore_comments=False):
processing_methods = [self._parse_comment_line,
self._parse_type_line,
self._parse_access_line,
self._parse_access_line_to_delete,
self._parse_syncrepl_line,
self._parse_syncrepl_line_to_delete,
self._parse_notunique_line,
self._parse_index_line,
self._parse_index_line_to_delete,
self._parse_plain_directive_line,
self._parse_plain_directive_line_to_delete]
super().__init__(processing_methods)
self._ignore_comments = ignore_comments
self._comments_processing = True
self._need_finish = True
self._format = 'ldap'
if self._ignore_comments:
self._current_type_section = OrderedDict()
else:
self._current_type_section = OrderedDict({'#': []})
self._current_type = ('', 'global')
self._last_comments_list = []
self._initialize_parser()
if document_text == '':
self._document_dictionary = OrderedDict()
else:
document_lines = self._get_list_of_logic_lines(document_text)
self._lines_to_dictionary(document_lines)
def _initialize_parser(self):
self._comment_line = originalTextFor(
Literal('#')
+ ZeroOrMore(Word(printables
+ pyparsing_unicode.alphanums))
)('comment')
action_symbols = (Literal('!') | Literal('-'))
assignment = Literal('=')
# Для парсинга строк c директивами неуникальными для секции.
not_unique_directives = originalTextFor(
Keyword('include') |
Keyword('moduleload')
)
not_unique_value = originalTextFor(
OneOrMore(Word(printables))
)('value')
self._not_unique_parser = (Optional(action_symbols,
default='')('action')
+ not_unique_directives
+ not_unique_value + restOfLine.suppress())
# Для выделения областей global, backend и database.
type_sections_keywords = originalTextFor(
Keyword('backend') |
Keyword('database')
)
type_value = originalTextFor(Word(alphanums))
self._type_line = (Optional(action_symbols, default='')('action')
+ type_sections_keywords
+ type_value
+ restOfLine.suppress())
# Для парсинга конструкции syncrepl rid=<replica ID> <parameters>
content_without_spaces = Word(printables, excludeChars='"')
content_with_spaces = (Literal('"')
+ OneOrMore(Word(printables,
excludeChars='"'))
+ Literal('"'))
parameter_without_spaces = (Word(printables, excludeChars='"=')
+ assignment.suppress()
+ content_without_spaces)
parameter_with_spaces = (Word(printables, excludeChars='"=')
+ assignment.suppress()
+ content_with_spaces)
values = OneOrMore(originalTextFor(parameter_with_spaces |
parameter_without_spaces))('Values')
syncrepl_replica_id = originalTextFor(Literal('rid')
+ assignment.suppress()
+ Word(nums))('replicaID')
self._syncrepl_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ Keyword('syncrepl')
+ syncrepl_replica_id)('name')
+ values('Values')
+ restOfLine.suppress())
self._syncrepl_value_parser = (Group(Optional(action_symbols,
default='')('action')
+ originalTextFor(
Word(
printables,
excludeChars='"='
)
))('name')
+ assignment.suppress()
+ originalTextFor(
OneOrMore(
Word(printables)
)
)('value'))
self._syncrepl_line_to_delete_parser = (Group(Optional(
action_symbols,
default=''
)('action')
+ Keyword('syncrepl')
+ syncrepl_replica_id)('name')
+ restOfLine.suppress())
# Для парсинга конструкции
# access to <what> by <who>|<access level>|<control>
access_keyword = originalTextFor(Literal('access to'))('keyword')
value = originalTextFor(parameter_with_spaces |
parameter_without_spaces |
content_without_spaces |
content_with_spaces)
self._access_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ access_keyword
+ value)('name')
+ Keyword('by').suppress()
+ delimitedList(
originalTextFor(value +
SkipTo(
Keyword('by'),
include=False) |
restOfLine
),
delim='by'
)('Values'))
self._access_value_parser = (Group(Optional(action_symbols,
default='')('action')
+ originalTextFor(value))('name')
+ originalTextFor(
Optional(Word(alphanums))
)('value'))
self._access_line_to_delete_parser = (Group(action_symbols('action')
+ access_keyword
+ value
+ restOfLine.suppress())('name'))
# Для парсинга строк с директивами index.
self._index_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ Keyword('index')
+ originalTextFor(Word(printables))
)('name')
+ originalTextFor(
OneOrMore(Word(printables))
)('value'))
self._index_line_to_delete_parser = (Group(action_symbols('action')
+ Keyword('index')
+ originalTextFor(
Word(printables)
))('name'))
# Для парсинга остальных директив.
self._directive_line_parser = (Group(Optional(action_symbols,
default='')('action')
+ originalTextFor(
Word(printables)
))('name')
+ originalTextFor(
OneOrMore(Word(
printables
)
))('value'))
self._directive_line_to_delete_parser = (action_symbols('action')
+ originalTextFor(
Word(printables)
))('name')
def _get_list_of_logic_lines(self, text):
list_of_lines = []
lines_to_join = []
for line in text.splitlines():
if line.strip() == '':
continue
if not line.startswith(' ') and not line.startswith('\t'):
joined_line = "".join(lines_to_join)
if joined_line != '':
list_of_lines.append(joined_line)
lines_to_join = []
line.strip()
else:
line = ' ' + line.strip()
lines_to_join.append(line)
joined_line = "".join(lines_to_join)
list_of_lines.append(joined_line)
return list_of_lines
def _parse_type_line(self, line):
try:
self._item_to_add = OrderedDict()
parsing_result = self._type_line.parseString(line)
self._match = True
type_name = tuple(parsing_result.asList())
if self._current_type in self._document_dictionary.keys():
self._document_dictionary[self._current_type].update(
self._current_type_section
)
else:
self._item_to_add[self._current_type] =\
self._current_type_section
self._ready_to_update = True
# Если глобальная область пуста -- передаем ее комментарии
# следующей за ней области.
if self._current_type == ('', 'global') and\
list(self._current_type_section.keys()) == ['#']:
self._last_comments_list = self._current_type_section['#']
self._item_to_add[self._current_type] = OrderedDict()
self._current_type_section = OrderedDict()
self._current_type = type_name
if self._last_comments_list != []:
self._current_type_section['#'] = self._last_comments_list
self._last_comments_list = []
except ParseException:
return
def _parse_notunique_line(self, line):
'''Метод для парсинга строк c директивами неуникальными для секции.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
self._item_to_add = OrderedDict()
parsing_result = self._not_unique_parser.parseString(line)
self._match = True
parsing_result.value = parsing_result.value.strip()
not_unique_name = tuple(parsing_result.asList())
parameter_value = ['']
parameter_value = self._last_comments_list + parameter_value
self._last_comments_list = []
self._current_type_section[not_unique_name] = parameter_value
except ParseException:
return
def _parse_access_line(self, line):
'''Метод для парсинга строк содержащих конструкцию
access to <what> by <who>|<access level>|<control>.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._access_line_parser.parseString(line)
self._match = True
values = [value.strip() for value in
parsing_result.Values.asList()]
values.reverse()
parameter_name = tuple(parsing_result.name)
value_dictionary = OrderedDict()
if self._last_comments_list != []:
value_dictionary['#'] = self._last_comments_list
self._last_comments_list = []
for value in values:
try:
value_parsing = self._access_value_parser.\
parseString(value)
param_name = tuple(value_parsing.name)
param_value = value_parsing.value
value_dictionary[param_name] = [param_value]
except ParseException:
continue
parameter_value = value_dictionary
if parameter_name in self._current_type_section.keys():
self._current_type_section[parameter_name].update(
value_dictionary
)
else:
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_access_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление конструкций
access to, если указано только ее название и значение What.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._access_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = OrderedDict({'#': self._last_comments_list})
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_syncrepl_line(self, line):
'''Метод для парсинга строк содержащих конструкцию syncrepl
rep=<ReplicaID>.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._syncrepl_line_parser.parseString(line)
self._match = True
values = [value.strip() for value in parsing_result.Values.asList()]
parameter_name = tuple(parsing_result.name.asList())
value_dictionary = OrderedDict()
if self._last_comments_list != []:
value_dictionary['#'] = self._last_comments_list
self._last_comments_list = []
for value in values:
try:
value_parsing = self._syncrepl_value_parser.parseString(
value
)
param_name = tuple(value_parsing.name.asList())
param_value = value_parsing.value
value_dictionary[param_name] = [param_value]
except ParseException:
continue
parameter_value = value_dictionary
if parameter_name in self._current_type_section.keys():
self._current_type_section[parameter_name].update(
value_dictionary
)
else:
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_syncrepl_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление конструкций
syncrepl rid=<ReplicaID>, если указано только ее название и значение
ReplicaID.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._syncrepl_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = OrderedDict({'#': self._last_comments_list})
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_index_line(self, line):
'''Метод для парсинга строк с директивами index.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._index_line_parser.parseString(line)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
parameter_value = parsing_result.value
parameter_value = self._last_comments_list + [parameter_value]
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_index_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление директив index,
если указано только из имя, но отсутвует значение.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._index_line_to_delete_parser.parseString(line)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.name.action == '-':
return
parameter_value = self._last_comments_list
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_plain_directive_line(self, line):
'''Метод для парсинга строк с простыми уникальными для секции
директивами.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._directive_line_parser.parseString(line)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
parameter_value = parsing_result.value
parameter_value = self._last_comments_list + [parameter_value]
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_plain_directive_line_to_delete(self, line):
'''Метод для парсинга строк, предписывающих удаление простых уникальных
директив, если указано только их имя, но отсутствует значение.
Аргументы: line -- строка, которую нужно распарсить.
'''
try:
parsing_result = self._directive_line_to_delete_parser.parseString(
line
)
self._match = True
parameter_name = tuple(parsing_result.name.asList())
if parsing_result.action == '-':
return
parameter_value = self._last_comments_list
self._last_comments_list = []
self._current_type_section[parameter_name] = parameter_value
except ParseException:
return
def _parse_comment_line(self, line):
'''Метод для парсинга строк с комментариями и добавления их в список
комментариев _last_comments_list, предназначенный для сбора
комментариев и последующего их присваивания параметрам и секциям.
Аргументы: line -- строка, которую нужно распарсить.'''
try:
parsing_result = self._comment_line.parseString(line)
self._match = True
if not self._ignore_comments:
# До того, как первый элемент встречен -- все комментарии
# должны быть присвоены глобальной области.
if self._current_type == ('', 'global') and\
list(self._current_type_section.keys()) == ['#']:
self._current_type_section['#'].append(
parsing_result.comment
)
else:
self._last_comments_list.append(parsing_result.comment)
except ParseException:
return
def _finish_method(self):
self._item_to_add = OrderedDict()
if self._current_type in self._document_dictionary.keys():
self._document_dictionary[self._current_type].update(
self._current_type_section
)
else:
self._item_to_add[self._current_type] = self._current_type_section
self._document_dictionary.update(self._item_to_add)
self._item_to_add = OrderedDict()
self._current_type_section = OrderedDict()