# vim: fileencoding=utf-8 # from .base_format import BaseFormat from jinja2 import Environment, PackageLoader from collections import OrderedDict from pyparsing import originalTextFor, Literal, ZeroOrMore, Word, printables,\ OneOrMore, alphanums, ParseException, pyparsing_unicode,\ Group, Optional, alphas, Keyword class ProFTPDFormat(BaseFormat): FORMAT = 'proftpd' _initialized = False def __init__(self, document_text: str, ignore_comments=False, join_before=False, comment_symbol=''): processing_methods = [self._parse_comment_line, self._parse_section_start_line, self._parse_section_end_line, self._parse_single_key_directive_line, self._parse_double_key_directive_line, self._parse_to_delete_double_key_directive_line, self._parse_full_key_directive_line, self._parse_plain_directive_line, self._parse_to_delete_plain_directive_line] super().__init__(processing_methods) self._ignore_comments = ignore_comments self._need_finish = True self._comments_processing = True self._section_stack = [] self._actions_stack = [] self._last_comments_list = [] if not self._initialized: self._initialize_parser() if document_text == '': self._document_dictionary = OrderedDict() else: document_lines = self._get_list_of_logic_lines(document_text) self._lines_to_dictionary(document_lines) @classmethod def _initialize_parser(cls): left_angle_bracket = Literal('<') right_angle_bracket = Literal('>') slash = Literal('/') action_symbols = (Literal('!') | Literal('-')) directive = Word(alphas, alphanums) section_value = Word(printables, excludeChars='<>') directive_value = Word(printables) cls._section_start_parser = (left_angle_bracket.suppress() + Optional(action_symbols, default='')('action') + Group(directive('name') + originalTextFor( OneOrMore(section_value) )('value') )('directive') + right_angle_bracket.suppress()) cls._section_end_parser = (left_angle_bracket.suppress() + slash.suppress() + directive('directive') + right_angle_bracket.suppress()) cls._plain_directive_parser = (Optional(action_symbols)('action') + Group(directive('name') + originalTextFor( OneOrMore(directive_value) )('value') )('directive') ) cls._delete_plain_directive_parser = (action_symbols('action') + directive('directive')) single_key_directive = (Keyword('AllowAll') | Keyword('DenyAll') | Keyword('AccessDenyMsg') | Keyword('AccessGrantMsg') | Keyword('ByteRatioErrMsg') | Keyword('LeechRatioMsg') | Keyword('CwdRatioMsg') | Keyword('FileRatioErrMsg')) cls._single_key_directive_parser = (Optional(action_symbols)('action') + single_key_directive( 'directive' )) double_key_directive = (Keyword('AccessDenyMsg') | Keyword('AccessGrantMsg') | Keyword('ByteRatioErrMsg') | Keyword('Allow from') | Keyword('Allow') | Keyword('AllowFilter') | Keyword('AnonymousGroup') | Keyword('AuthPAMConfig') | Keyword('Bind') | Keyword('CDPath') | Keyword('Define') | Keyword('Deny from') | Keyword('Deny') | Keyword('DenyFilter') | Keyword('DisplayChdir') | Keyword('ExtendedLog') | Keyword('AnonRatio') | Keyword('GroupRatio') | Keyword('HideGroup') | Keyword('HideUser') | Keyword('HostRatio') | Keyword('Include') | Keyword('LDAPAttr') | Keyword('LeechRatioMsg') | Keyword('CwdRatioMsg') | Keyword('FileRatioErrMsg') | Keyword('LogFormat') | Keyword('MaxClientsPerClass') | Keyword('PIDFile') | Keyword('RewriteMap') | Keyword('RewriteRule') | Keyword('SetEnv') | Keyword('SQLConnectInfo') | Keyword('SQLGroupWhereClause') | Keyword('SQLLog') | Keyword('SQLNamedQuery') | Keyword('SQLShowInfo') | Keyword('SQLUserInfo') | Keyword('SQLUserWhereClause') | Keyword('LoadModule') | Keyword('LoadFile') | Keyword('TransferRate') | Keyword('UnsetEnv') | Keyword('UserPassword') | Keyword('UserRatio') | Keyword('ModuleControlsACLs') | Keyword('ControlsACLs')) cls._double_key_directive_parser = (Optional(action_symbols)('action') + Group((double_key_directive + directive_value )('name') + originalTextFor( ZeroOrMore( directive_value ) )('value') )('directive') ) cls._delete_double_key_directive_parser = (action_symbols('action') + Group( ( double_key_directive + directive_value )('name') )('directive') ) full_key_directive = (Keyword('AllowClass') | Keyword('AllowGroup') | Keyword('AllowUser') | Keyword('Class') | Keyword('DenyClass') | Keyword('DenyGroup') | Keyword('DenyUser') | Keyword('DirFakeGroup') | Keyword('DirFakeUser') | Keyword('HideFiles') | Keyword('MaxRetrieveFileSize') | Keyword('MaxStoreFileSize') | Keyword('RewriteCondition') | Keyword('RewriteLock') | Keyword('TimeoutSession') | Keyword('UserAlias')) cls._full_key_directive_parser = (Optional(action_symbols)('action') + Group(full_key_directive + OneOrMore( directive_value ) )('directive') ) cls._comment_line = originalTextFor( Literal('#') + ZeroOrMore(Word(printables + pyparsing_unicode.alphanums)) )('comment') cls._initialized = True def _parse_section_start_line(self, line): try: parsing_result = self._section_start_parser.parseString(line) self._match = True section_name = tuple(parsing_result.directive.asList()) self._actions_stack.append(parsing_result.action) self._section_stack.append(section_name) except ParseException: return def _parse_section_end_line(self, line): try: parsing_result = self._section_end_parser.parseString(line) self._match = True current_section = self._section_stack.pop() self._actions_stack.pop() directive = tuple(parsing_result.directive) if not current_section[1] != directive: # Здесь будет кидаться исключение. self._fatal_error_flag = True return except ParseException: return def _parse_plain_directive_line(self, line): try: parsing_result = self._plain_directive_parser.parseString(line) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive.name, ) directive_value = [parsing_result.directive.value] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_to_delete_plain_directive_line(self, line): try: parsing_result = self._delete_plain_directive_parser.parseString( line ) self._match = True action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive, ) directive_name = action + context + directive directive_value = self._last_comments_list + [''] self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_single_key_directive_line(self, line): try: parsing_result = self._single_key_directive_parser.parseString( line ) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = (parsing_result.directive, ) directive_value = [''] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_double_key_directive_line(self, line): try: parsing_result = self._double_key_directive_parser.parseString( line ) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.name.asList()) directive_value = [parsing_result.directive.value] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_to_delete_double_key_directive_line(self, line): try: parsing_result = self._delete_double_key_directive_parser.\ parseString(line) self._match = True action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.name.asList()) directive_name = action + context + directive directive_value = self._last_comments_list + [''] self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_full_key_directive_line(self, line): try: parsing_result = self._full_key_directive_parser.parseString(line) self._match = True for action_item in self._actions_stack: if not action_item == '': action = (action_item, ) break else: action = (parsing_result.action, ) if not self._section_stack == []: context = (tuple(self._section_stack), ) else: context = ('', ) directive = tuple(parsing_result.directive.asList()) directive_value = [''] directive_name = action + context + directive directive_value = self._last_comments_list + directive_value self._last_comments_list = [] self._item_to_add = OrderedDict({directive_name: directive_value}) self._ready_to_update = True except ParseException: return def _parse_comment_line(self, line): try: result = self._comment_line.parseString(line) self._match = True if not self._ignore_comments: self._last_comments_list.append(result.comment) except ParseException: return @property def document_text(self): file_loader = PackageLoader('calculate.templates.format', self.TEMPLATES_DIRECTORY) formats_environment = Environment(loader=file_loader) formats_environment.globals.update(zip=zip) formats_environment.add_extension('jinja2.ext.do') template = formats_environment.get_template(self.FORMAT) document_text = template.render( document_dictionary=self._document_dictionary ) return document_text