# vim: fileencoding=utf-8 # from .base_format import BaseFormat, FormatError from ..template_engine import ParametersContainer from jinja2 import Environment, PackageLoader from collections import OrderedDict from pyparsing import originalTextFor, Literal, ZeroOrMore, Word, printables,\ OneOrMore, alphanums, ParseException, pyparsing_unicode,\ Group, Optional, alphas, Keyword class ProFTPDFormat(BaseFormat): '''Класс формата ProFTPD. В отличие от других форматов, при разборе вложенных на несколько уровней блоков, в итоговый словарь добавляются не вложенные на несколько уровней словари, а отдельные объекты этих блоков, содержащие в названии последовательность имен тегов, к которым они относятся. ''' FORMAT = 'proftpd' EXECUTABLE = False _initialized = False comment_symbol = '#' def __new__(cls, *args, **kwargs): if not cls._initialized: cls._initialize_parser() return super().__new__(cls) def __init__(self, document_text: str, ignore_comments=False, join_before=False, parameters=ParametersContainer()): processing_methods = [self._parse_comment_line, self._parse_section_start_line, self._parse_section_end_line, self._parse_single_key_directive_line, self._parse_double_key_directive_line, self._parse_to_delete_double_key_directive_line, self._parse_full_key_directive_line, self._parse_plain_directive_line, self._parse_to_delete_plain_directive_line] super().__init__(processing_methods) self._ignore_comments = ignore_comments self._need_finish = True self._comments_processing = True self._join_before = join_before self._section_stack = [] self._actions_stack = [] self._last_comments_list = [] if document_text == '': self._document_dictionary = OrderedDict() else: document_lines = self._get_list_of_logic_lines(document_text) self._lines_to_dictionary(document_lines) @classmethod def _initialize_parser(cls): '''Метод для инициализации парсеров.''' left_angle_bracket = Literal('<') right_angle_bracket = Literal('>') slash = Literal('/') action_symbols = (Literal('!') | Literal('-')) directive = Word(alphas, alphanums) section_value = Word(printables, excludeChars='<>') directive_value = Word(printables) cls._section_start_parser = (left_angle_bracket.suppress() + Optional(action_symbols, default='')('action') + Group(directive('name') + originalTextFor( OneOrMore(section_value) )('value') )('directive') + right_angle_bracket.suppress()) cls._section_end_parser = (left_angle_bracket.suppress() + slash.suppress() + directive('directive') + right_angle_bracket.suppress()) cls._plain_directive_parser = (Optional(action_symbols)('action') + Group(directive('name') + originalTextFor( OneOrMore(directive_value) )('value') )('directive') ) cls._delete_plain_directive_parser = (action_symbols('action') + directive('directive')) single_key_directive = (Keyword('AllowAll') | Keyword('DenyAll') | Keyword('AccessDenyMsg') | Keyword('AccessGrantMsg') | Keyword('ByteRatioErrMsg') | Keyword('LeechRatioMsg') | Keyword('CwdRatioMsg') | Keyword('FileRatioErrMsg')) cls._single_key_directive_parser = (Optional(action_symbols)('action') + single_key_directive( 'directive' )) double_key_directive = (Keyword('AccessDenyMsg') | Keyword('AccessGrantMsg') | Keyword('ByteRatioErrMsg') | Keyword('Allow from') | Keyword('Allow') | Keyword('AllowFilter') | Keyword('AnonymousGroup') | Keyword('AuthPAMConfig') | Keyword('Bind') | Keyword('CDPath') | Keyword('Define') | Keyword('Deny from') | Keyword('Deny') | Keyword('DenyFilter') | Keyword('DisplayChdir') | Keyword('ExtendedLog') | Keyword('AnonRatio') | Keyword('GroupRatio') | Keyword('HideGroup') | Keyword('HideUser') | Keyword('HostRatio') | Keyword('Include') | Keyword('LDAPAttr') | Keyword('LeechRatioMsg') | Keyword('CwdRatioMsg') | Keyword('FileRatioErrMsg') | Keyword('LogFormat') | Keyword('MaxClientsPerClass') | Keyword('PIDFile') | Keyword('RewriteMap') | Keyword('RewriteRule') | Keyword('SetEnv') | Keyword('SQLConnectInfo') | Keyword('SQLGroupWhereClause') | Keyword('SQLLog') | Keyword('SQLNamedQuery') | Keyword('SQLShowInfo') | Keyword('SQLUserInfo') | Keyword('SQLUserWhereClause') | Keyword('LoadModule') | Keyword('LoadFile') | Keyword('TransferRate') | Keyword('UnsetEnv') | Keyword('UserPassword') | Keyword('UserRatio') | Keyword('ModuleControlsACLs') | Keyword('ControlsACLs')) cls._double_key_directive_parser = (Optional(action_symbols)('action') + Group((double_key_directive + directive_value )('name') + originalTextFor( ZeroOrMore( directive_value ) )('value') )('directive') ) cls._delete_double_key_directive_parser = (action_symbols('action') + Group( ( double_key_directive + directive_value )('name') )('directive') ) full_key_directive = (Keyword('AllowClass') | Keyword('AllowGroup') | Keyword('AllowUser') | Keyword('Class') | Keyword('DenyClass') | Keyword('DenyGroup') | Keyword('DenyUser') | Keyword('DirFakeGroup') | Keyword('DirFakeUser') | Keyword('HideFiles') | Keyword('MaxRetrieveFileSize') | Keyword('MaxStoreFileSize') | Keyword('RewriteCondition') | Keyword('RewriteLock') | Keyword('TimeoutSession') | Keyword('UserAlias')) cls._full_key_directive_parser = (Optional(action_symbols)('action') + Group(full_key_directive + OneOrMore( directive_value ) )('directive') ) cls._comment_line = originalTextFor( Literal(cls.comment_symbol) + ZeroOrMore(Word(printables + pyparsing_unicode.alphanums)) )('comment') cls._initialized = True def _parse_section_start_line(self, line): '''Метод для разбора тега открывающего секцию.''' try: parsing_result = self._section_start_parser.parseString(line) self._match = True section_name = tuple(parsing_result.directive.asList()) self._actions_stack.append(parsing_result.action) self._section_stack.append(section_name) except ParseException: return def _parse_section_end_line(self, line): '''Метод для разбора тега закрывающего секцию.''' try: parsing_result = self._section_end_parser.parseString(line) self._match = True current_section = self._section_stack.pop() self._actions_stack.pop() directive = tuple(parsing_result.directive) if not current_section[1] != directive: raise FormatError("incorrect end tag , expected ". format(directive, current_section)) except ParseException: return def _parse_plain_directive_line(self, line): '''Метод для разбора строк состоящих из имени параметра и его значения. ''' try: parsing_result = self._plain_directive_parser.parseString(line) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive.name, ) directive_value = [parsing_result.directive.value] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_to_delete_plain_directive_line(self, line): '''Метод для разбора строк с параметрами, подлежащими удалению, то есть содержащих имя параметра с символом ! и, опционально, его значение.''' try: parsing_result = self._delete_plain_directive_parser.parseString( line ) self._match = True action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive, ) directive_name = action + context + directive directive_value = self._last_comments_list + [''] self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_single_key_directive_line(self, line): '''Метод для разбора строк с директивами, состоящих лишь из одного имени директивы. ''' try: parsing_result = self._single_key_directive_parser.parseString( line ) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive, ) directive_value = [''] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_double_key_directive_line(self, line): '''Метод для разбора директив, добавление в словарь которых возможно посредством формирования ключа из названия директивы и последующего за ней значения.''' try: parsing_result = self._double_key_directive_parser.parseString( line ) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.name.asList()) directive_value = [parsing_result.directive.value] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_to_delete_double_key_directive_line(self, line): '''Метод для разбора директив, подлежащих удалению, ключи которых формируются из названия директивы и последующего за ней значения.''' try: parsing_result = self._delete_double_key_directive_parser.\ parseString(line) self._match = True action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.name.asList()) directive_name = action + context + directive directive_value = self._last_comments_list + [''] self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_full_key_directive_line(self, line): '''Метод для разбора строк с директивами, из имен которых невозможно составить уникальный ключ для итогового словаря. Такие директивы разбираем полностью как ключ для пустого значения в словаре.''' try: parsing_result = self._full_key_directive_parser.parseString(line) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.asList()) directive_value = [''] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_comment_line(self, line): '''Метод для разбора строк, содержащих комментарии.''' try: result = self._comment_line.parseString(line) self._match = True if not self._ignore_comments: self._last_comments_list.append(result.comment) except ParseException: return @property def document_text(self): file_loader = PackageLoader('calculate.templates.format', self.TEMPLATES_DIRECTORY) formats_environment = Environment(loader=file_loader) formats_environment.globals.update(zip=zip) formats_environment.add_extension('jinja2.ext.do') template = formats_environment.get_template(self.FORMAT) document_text = template.render( document_dictionary=self._document_dictionary ) return document_text