# vim: fileencoding=utf-8 # from .base_format import Format from collections import OrderedDict from pyparsing import originalTextFor, Literal, Word, printables, OneOrMore,\ alphanums, ParseException, restOfLine, nums,\ delimitedList, Optional, Keyword, SkipTo, Group, Regex class LDAPFormat(Format): FORMAT = 'ldap' EXECUTABLE = False _initialized = False comment_symbol = '#' def __new__(cls, *args, **kwargs): if not cls._initialized: cls._initialize_parser() return super().__new__(cls) def __init__(self, document_text: str, template_path, ignore_comments=False, join_before=False, add_header=False, already_changed=False, **kwargs): processing_methods = [self._parse_comment_line, self._parse_type_line, self._parse_access_line, self._parse_access_line_to_delete, self._parse_syncrepl_line, self._parse_syncrepl_line_to_delete, self._parse_notunique_line, self._parse_index_line, self._parse_index_line_to_delete, self._parse_plain_directive_line, self._parse_plain_directive_line_to_delete] super().__init__(processing_methods) self._ignore_comments = ignore_comments self._comments_processing = True self._join_before = join_before self._need_finish = True if self._ignore_comments: self._current_type_section = OrderedDict() else: self._current_type_section = OrderedDict({'#': []}) self._current_type = ('', 'global') self._last_comments_list = [] if add_header and not ignore_comments: self.header, document_text = self._get_header_and_document_text( document_text, template_path, already_changed=already_changed) else: self.header = '' document_text = document_text.strip() if document_text == '': self._document_dictionary = OrderedDict() else: document_lines = self._get_list_of_logic_lines(document_text) self._lines_to_dictionary(document_lines) @classmethod def _initialize_parser(cls): '''Метод для инициализации парсеров.''' cls._comment_line = originalTextFor( Literal(cls.comment_symbol) + Regex(r'.*'))('comment') action_symbols = (Literal('!') | Literal('-')) assignment = Literal('=') # Для парсинга строк c директивами неуникальными для секции. not_unique_directives = originalTextFor( Keyword('include') | Keyword('moduleload') ) not_unique_value = originalTextFor( OneOrMore(Word(printables)) )('value') cls._not_unique_parser = (Optional(action_symbols, default='')('action') + not_unique_directives + not_unique_value + restOfLine.suppress()) # Для выделения областей global, backend и database. type_sections_keywords = originalTextFor( Keyword('backend') | Keyword('database') ) type_value = originalTextFor(Word(alphanums)) cls._type_line = (Optional(action_symbols, default='')('action') + type_sections_keywords + type_value + restOfLine.suppress()) # Для парсинга конструкции syncrepl rid= content_without_spaces = Word(printables, excludeChars='"') content_with_spaces = (Literal('"') + OneOrMore(Word(printables, excludeChars='"')) + Literal('"')) parameter_without_spaces = (Word(printables, excludeChars='"=') + assignment.suppress() + content_without_spaces) parameter_with_spaces = (Word(printables, excludeChars='"=') + assignment.suppress() + content_with_spaces) values = OneOrMore(originalTextFor(parameter_with_spaces | parameter_without_spaces))('Values') syncrepl_replica_id = originalTextFor(Literal('rid') + assignment.suppress() + Word(nums))('replicaID') cls._syncrepl_line_parser = (Group(Optional(action_symbols, default='')('action') + Keyword('syncrepl') + syncrepl_replica_id)('name') + values('Values') + restOfLine.suppress()) cls._syncrepl_value_parser = (Group(Optional(action_symbols, default='')('action') + originalTextFor( Word( printables, excludeChars='"=' ) ))('name') + assignment.suppress() + originalTextFor( OneOrMore( Word(printables) ) )('value')) cls._syncrepl_line_to_delete_parser = (Group(Optional( action_symbols, default='' )('action') + Keyword('syncrepl') + syncrepl_replica_id)('name') + restOfLine.suppress()) # Для парсинга конструкции # access to by || access_keyword = originalTextFor(Literal('access to'))('keyword') value = originalTextFor(parameter_with_spaces | parameter_without_spaces | content_without_spaces | content_with_spaces) cls._access_line_parser = (Group(Optional(action_symbols, default='')('action') + access_keyword + value)('name') + Keyword('by').suppress() + delimitedList( originalTextFor(value + SkipTo( Keyword('by'), include=False) | restOfLine ), delim='by' )('Values')) cls._access_value_parser = (Group(Optional(action_symbols, default='')('action') + originalTextFor(value))('name') + originalTextFor( Optional(Word(alphanums)) )('value')) cls._access_line_to_delete_parser = (Group(action_symbols('action') + access_keyword + value + restOfLine.suppress())('name')) # Для парсинга строк с директивами index. cls._index_line_parser = (Group(Optional(action_symbols, default='')('action') + Keyword('index') + originalTextFor(Word(printables)) )('name') + originalTextFor( OneOrMore(Word(printables)) )('value')) cls._index_line_to_delete_parser = (Group(action_symbols('action') + Keyword('index') + originalTextFor( Word(printables) ))('name')) # Для парсинга остальных директив. cls._directive_line_parser = (Group(Optional(action_symbols, default='')('action') + originalTextFor( Word(printables) ))('name') + originalTextFor( OneOrMore(Word( printables ) ))('value')) cls._directive_line_to_delete_parser = (action_symbols('action') + originalTextFor( Word(printables) ))('name') cls._initialized = True def _get_list_of_logic_lines(self, text): '''Метод для разбиения исходного документа на список логических строк, то есть с учетом того, что строка ldap файла начинающаяся с отступа является продолжением предыдущей.''' list_of_lines = [] lines_to_join = [] for line in text.splitlines(): if line.strip() == '': continue if not line.startswith(' ') and not line.startswith('\t'): joined_line = "".join(lines_to_join) if joined_line != '': list_of_lines.append(joined_line) lines_to_join = [] line.strip() else: line = ' ' + line.strip() lines_to_join.append(line) joined_line = "".join(lines_to_join) list_of_lines.append(joined_line) return list_of_lines def _parse_type_line(self, line): '''Метод для парсинга строк с объявлением областей backend или database ''' try: self._item_to_add = OrderedDict() parsing_result = self._type_line.parseString(line) self._match = True type_name = tuple(parsing_result.asList()) if self._current_type in self._document_dictionary.keys(): self._document_dictionary[self._current_type].update( self._current_type_section ) else: self._item_to_add[self._current_type] =\ self._current_type_section self._ready_to_update = True # Если глобальная область пуста -- передаем ее комментарии # следующей за ней области. if self._current_type == ('', 'global') and\ list(self._current_type_section.keys()) == ['#']: self._last_comments_list = self._current_type_section['#'] self._item_to_add[self._current_type] = OrderedDict() self._current_type_section = OrderedDict() self._current_type = type_name if self._last_comments_list != []: self._current_type_section['#'] = self._last_comments_list self._last_comments_list = [] except ParseException: return def _parse_notunique_line(self, line): '''Метод для парсинга строк c директивами неуникальными для секции. Их приходится парсить полностью как ключ словаря.''' try: self._item_to_add = OrderedDict() parsing_result = self._not_unique_parser.parseString(line) self._match = True parsing_result["value"] = parsing_result.value.strip() not_unique_name = tuple(parsing_result.asList()) parameter_value = [''] parameter_value = self._last_comments_list + parameter_value self._last_comments_list = [] self._current_type_section[not_unique_name] = parameter_value except ParseException: return def _parse_access_line(self, line): '''Метод для парсинга строк содержащих конструкцию access to by ||.''' try: parsing_result = self._access_line_parser.parseString(line) self._match = True values = [value.strip() for value in parsing_result.Values.asList()] values.reverse() parameter_name = tuple(parsing_result.name) value_dictionary = OrderedDict() if self._last_comments_list != []: value_dictionary['#'] = self._last_comments_list self._last_comments_list = [] for value in values: try: value_parsing = self._access_value_parser.\ parseString(value) param_name = tuple(value_parsing.name) param_value = value_parsing.value value_dictionary[param_name] = [param_value] except ParseException: continue parameter_value = value_dictionary if parameter_name in self._current_type_section.keys(): self._current_type_section[parameter_name].update( value_dictionary ) else: self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_access_line_to_delete(self, line): '''Метод для парсинга строк, предписывающих удаление конструкций access to, если указано только ее название и значение What.''' try: parsing_result = self._access_line_to_delete_parser.parseString( line ) self._match = True parameter_name = tuple(parsing_result.name.asList()) if parsing_result.name.action == '-': return parameter_value = OrderedDict({'#': self._last_comments_list}) self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_syncrepl_line(self, line): '''Метод для парсинга строк содержащих конструкцию syncrepl rep=.''' try: parsing_result = self._syncrepl_line_parser.parseString(line) self._match = True values = [value.strip() for value in parsing_result.Values.asList()] parameter_name = tuple(parsing_result.name.asList()) value_dictionary = OrderedDict() if self._last_comments_list != []: value_dictionary['#'] = self._last_comments_list self._last_comments_list = [] for value in values: try: value_parsing = self._syncrepl_value_parser.parseString( value ) param_name = tuple(value_parsing.name.asList()) param_value = value_parsing.value value_dictionary[param_name] = [param_value] except ParseException: continue parameter_value = value_dictionary if parameter_name in self._current_type_section.keys(): self._current_type_section[parameter_name].update( value_dictionary ) else: self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_syncrepl_line_to_delete(self, line): '''Метод для парсинга строк, предписывающих удаление конструкций syncrepl rid=, если указано только ее название и значение ReplicaID.''' try: parsing_result = self._syncrepl_line_to_delete_parser.parseString( line ) self._match = True parameter_name = tuple(parsing_result.name.asList()) if parsing_result.name.action == '-': return parameter_value = OrderedDict({'#': self._last_comments_list}) self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_index_line(self, line): '''Метод для парсинга строк с директивами index.''' try: parsing_result = self._index_line_parser.parseString(line) self._match = True parameter_name = tuple(parsing_result.name.asList()) parameter_value = parsing_result.value parameter_value = self._last_comments_list + [parameter_value] self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_index_line_to_delete(self, line): '''Метод для парсинга строк, предписывающих удаление директив index, если указано только из имя, но отсутвует значение.''' try: parsing_result = self._index_line_to_delete_parser.parseString( line ) self._match = True parameter_name = tuple(parsing_result.name.asList()) if parsing_result.name.action == '-': return parameter_value = self._last_comments_list self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_plain_directive_line(self, line): '''Метод для парсинга строк с простыми уникальными для секции директивами.''' try: parsing_result = self._directive_line_parser.parseString(line) self._match = True parameter_name = tuple(parsing_result.name.asList()) parameter_value = parsing_result.value parameter_value = self._last_comments_list + [parameter_value] self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_plain_directive_line_to_delete(self, line): '''Метод для парсинга строк, предписывающих удаление простых уникальных директив, если указано только их имя, но отсутствует значение.''' try: parsing_result = self._directive_line_to_delete_parser.parseString( line ) self._match = True parameter_name = tuple(parsing_result.name.asList()) if parsing_result.action == '-': return parameter_value = self._last_comments_list self._last_comments_list = [] self._current_type_section[parameter_name] = parameter_value except ParseException: return def _parse_comment_line(self, line): '''Метод для парсинга строк с комментариями и добавления их в список комментариев _last_comments_list, предназначенный для сбора комментариев и последующего их присваивания параметрам и секциям.''' try: parsing_result = self._comment_line.parseString(line) self._match = True if not self._ignore_comments: # До того, как первый элемент встречен -- все комментарии # должны быть присвоены глобальной области. if self._current_type == ('', 'global') and\ list(self._current_type_section.keys()) == ['#']: self._current_type_section['#'].append( parsing_result.comment ) else: self._last_comments_list.append(parsing_result.comment) except ParseException: return def _finish_method(self): '''Метод для завершения парсинга. В данном случае добавляет в итоговый словарь последнюю разобранную область.''' self._item_to_add = OrderedDict() if self._current_type in self._document_dictionary.keys(): self._document_dictionary[self._current_type].update( self._current_type_section ) else: self._item_to_add[self._current_type] = self._current_type_section self._document_dictionary.update(self._item_to_add) self._item_to_add = OrderedDict() self._current_type_section = OrderedDict()