You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
420 lines
18 KiB
420 lines
18 KiB
# vim: fileencoding=utf-8
|
|
#
|
|
from .base_format import BaseFormat
|
|
from jinja2 import Environment, PackageLoader
|
|
from collections import OrderedDict
|
|
from pyparsing import originalTextFor, Literal, ZeroOrMore, Word, printables,\
|
|
OneOrMore, alphanums, ParseException, pyparsing_unicode,\
|
|
Group, Optional, alphas, Keyword
|
|
|
|
|
|
class ProFTPDFormat(BaseFormat):
|
|
_initialized = False
|
|
|
|
def __init__(self, document_text: str,
|
|
ignore_comments=False,
|
|
join_before=False,
|
|
comment_symbol=''):
|
|
processing_methods = [self._parse_comment_line,
|
|
self._parse_section_start_line,
|
|
self._parse_section_end_line,
|
|
self._parse_single_key_directive_line,
|
|
self._parse_double_key_directive_line,
|
|
self._parse_to_delete_double_key_directive_line,
|
|
self._parse_full_key_directive_line,
|
|
self._parse_plain_directive_line,
|
|
self._parse_to_delete_plain_directive_line]
|
|
|
|
super().__init__(processing_methods)
|
|
self._ignore_comments = ignore_comments
|
|
self._need_finish = True
|
|
self._comments_processing = True
|
|
self._format = 'proftpd'
|
|
|
|
self._section_stack = []
|
|
self._actions_stack = []
|
|
|
|
self._last_comments_list = []
|
|
if not self._initialized:
|
|
self._initialize_parser()
|
|
|
|
if document_text == '':
|
|
self._document_dictionary = OrderedDict()
|
|
else:
|
|
document_lines = self._get_list_of_logic_lines(document_text)
|
|
self._lines_to_dictionary(document_lines)
|
|
|
|
@classmethod
|
|
def _initialize_parser(cls):
|
|
left_angle_bracket = Literal('<')
|
|
right_angle_bracket = Literal('>')
|
|
slash = Literal('/')
|
|
|
|
action_symbols = (Literal('!') |
|
|
Literal('-'))
|
|
|
|
directive = Word(alphas, alphanums)
|
|
|
|
section_value = Word(printables, excludeChars='<>')
|
|
|
|
directive_value = Word(printables)
|
|
|
|
cls._section_start_parser = (left_angle_bracket.suppress()
|
|
+ Optional(action_symbols,
|
|
default='')('action')
|
|
+ Group(directive('name')
|
|
+ originalTextFor(
|
|
OneOrMore(section_value)
|
|
)('value')
|
|
)('directive')
|
|
+ right_angle_bracket.suppress())
|
|
|
|
cls._section_end_parser = (left_angle_bracket.suppress()
|
|
+ slash.suppress()
|
|
+ directive('directive')
|
|
+ right_angle_bracket.suppress())
|
|
|
|
cls._plain_directive_parser = (Optional(action_symbols)('action')
|
|
+ Group(directive('name')
|
|
+ originalTextFor(
|
|
OneOrMore(directive_value)
|
|
)('value')
|
|
)('directive')
|
|
)
|
|
|
|
cls._delete_plain_directive_parser = (action_symbols('action')
|
|
+ directive('directive'))
|
|
|
|
single_key_directive = (Keyword('AllowAll') |
|
|
Keyword('DenyAll') |
|
|
Keyword('AccessDenyMsg') |
|
|
Keyword('AccessGrantMsg') |
|
|
Keyword('ByteRatioErrMsg') |
|
|
Keyword('LeechRatioMsg') |
|
|
Keyword('CwdRatioMsg') |
|
|
Keyword('FileRatioErrMsg'))
|
|
|
|
cls._single_key_directive_parser = (Optional(action_symbols)('action')
|
|
+ single_key_directive(
|
|
'directive'
|
|
))
|
|
|
|
double_key_directive = (Keyword('AccessDenyMsg') |
|
|
Keyword('AccessGrantMsg') |
|
|
Keyword('ByteRatioErrMsg') |
|
|
Keyword('Allow from') | Keyword('Allow') |
|
|
Keyword('AllowFilter') |
|
|
Keyword('AnonymousGroup') |
|
|
Keyword('AuthPAMConfig') |
|
|
Keyword('Bind') |
|
|
Keyword('CDPath') |
|
|
Keyword('Define') |
|
|
Keyword('Deny from') | Keyword('Deny') |
|
|
Keyword('DenyFilter') |
|
|
Keyword('DisplayChdir') |
|
|
Keyword('ExtendedLog') |
|
|
Keyword('AnonRatio') |
|
|
Keyword('GroupRatio') |
|
|
Keyword('HideGroup') |
|
|
Keyword('HideUser') |
|
|
Keyword('HostRatio') |
|
|
Keyword('Include') |
|
|
Keyword('LDAPAttr') |
|
|
Keyword('LeechRatioMsg') |
|
|
Keyword('CwdRatioMsg') |
|
|
Keyword('FileRatioErrMsg') |
|
|
Keyword('LogFormat') |
|
|
Keyword('MaxClientsPerClass') |
|
|
Keyword('PIDFile') |
|
|
Keyword('RewriteMap') |
|
|
Keyword('RewriteRule') |
|
|
Keyword('SetEnv') |
|
|
Keyword('SQLConnectInfo') |
|
|
Keyword('SQLGroupWhereClause') |
|
|
Keyword('SQLLog') |
|
|
Keyword('SQLNamedQuery') |
|
|
Keyword('SQLShowInfo') |
|
|
Keyword('SQLUserInfo') |
|
|
Keyword('SQLUserWhereClause') |
|
|
Keyword('LoadModule') |
|
|
Keyword('LoadFile') |
|
|
Keyword('TransferRate') |
|
|
Keyword('UnsetEnv') |
|
|
Keyword('UserPassword') |
|
|
Keyword('UserRatio') |
|
|
Keyword('ModuleControlsACLs') |
|
|
Keyword('ControlsACLs'))
|
|
|
|
cls._double_key_directive_parser = (Optional(action_symbols)('action')
|
|
+ Group((double_key_directive
|
|
+ directive_value
|
|
)('name')
|
|
+ originalTextFor(
|
|
ZeroOrMore(
|
|
directive_value
|
|
)
|
|
)('value')
|
|
)('directive')
|
|
)
|
|
|
|
cls._delete_double_key_directive_parser = (action_symbols('action')
|
|
+ Group(
|
|
(
|
|
double_key_directive
|
|
+ directive_value
|
|
)('name')
|
|
)('directive')
|
|
)
|
|
|
|
full_key_directive = (Keyword('AllowClass') |
|
|
Keyword('AllowGroup') |
|
|
Keyword('AllowUser') |
|
|
Keyword('Class') |
|
|
Keyword('DenyClass') |
|
|
Keyword('DenyGroup') |
|
|
Keyword('DenyUser') |
|
|
Keyword('DirFakeGroup') |
|
|
Keyword('DirFakeUser') |
|
|
Keyword('HideFiles') |
|
|
Keyword('MaxRetrieveFileSize') |
|
|
Keyword('MaxStoreFileSize') |
|
|
Keyword('RewriteCondition') |
|
|
Keyword('RewriteLock') |
|
|
Keyword('TimeoutSession') |
|
|
Keyword('UserAlias'))
|
|
|
|
cls._full_key_directive_parser = (Optional(action_symbols)('action')
|
|
+ Group(full_key_directive
|
|
+ OneOrMore(
|
|
directive_value
|
|
)
|
|
)('directive')
|
|
)
|
|
|
|
cls._comment_line = originalTextFor(
|
|
Literal('#')
|
|
+ ZeroOrMore(Word(printables
|
|
+ pyparsing_unicode.alphanums))
|
|
)('comment')
|
|
cls._initialized = True
|
|
|
|
def _parse_section_start_line(self, line):
|
|
try:
|
|
parsing_result = self._section_start_parser.parseString(line)
|
|
self._match = True
|
|
|
|
section_name = tuple(parsing_result.directive.asList())
|
|
self._actions_stack.append(parsing_result.action)
|
|
self._section_stack.append(section_name)
|
|
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_section_end_line(self, line):
|
|
try:
|
|
parsing_result = self._section_end_parser.parseString(line)
|
|
self._match = True
|
|
|
|
current_section = self._section_stack.pop()
|
|
self._actions_stack.pop()
|
|
directive = tuple(parsing_result.directive)
|
|
|
|
if not current_section[1] != directive:
|
|
# Здесь будет кидаться исключение.
|
|
self._fatal_error_flag = True
|
|
return
|
|
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_plain_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._plain_directive_parser.parseString(line)
|
|
self._match = True
|
|
|
|
for action_item in self._actions_stack:
|
|
if not action_item == '':
|
|
action = (action_item, )
|
|
break
|
|
else:
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = (parsing_result.directive.name, )
|
|
directive_value = [parsing_result.directive.value]
|
|
|
|
directive_name = action + context + directive
|
|
directive_value = self._last_comments_list + directive_value
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_to_delete_plain_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._delete_plain_directive_parser.parseString(
|
|
line
|
|
)
|
|
self._match = True
|
|
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = (parsing_result.directive, )
|
|
directive_name = action + context + directive
|
|
|
|
directive_value = self._last_comments_list + ['']
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_single_key_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._single_key_directive_parser.parseString(
|
|
line
|
|
)
|
|
self._match = True
|
|
|
|
for action_item in self._actions_stack:
|
|
if not action_item == '':
|
|
action = (action_item, )
|
|
break
|
|
else:
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = (parsing_result.directive, )
|
|
directive_value = ['']
|
|
|
|
directive_name = action + context + directive
|
|
directive_value = self._last_comments_list + directive_value
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_double_key_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._double_key_directive_parser.parseString(
|
|
line
|
|
)
|
|
self._match = True
|
|
|
|
for action_item in self._actions_stack:
|
|
if not action_item == '':
|
|
action = (action_item, )
|
|
break
|
|
else:
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = tuple(parsing_result.directive.name.asList())
|
|
directive_value = [parsing_result.directive.value]
|
|
|
|
directive_name = action + context + directive
|
|
directive_value = self._last_comments_list + directive_value
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_to_delete_double_key_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._delete_double_key_directive_parser.\
|
|
parseString(line)
|
|
self._match = True
|
|
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = tuple(parsing_result.directive.name.asList())
|
|
directive_name = action + context + directive
|
|
|
|
directive_value = self._last_comments_list + ['']
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_full_key_directive_line(self, line):
|
|
try:
|
|
parsing_result = self._full_key_directive_parser.parseString(line)
|
|
self._match = True
|
|
|
|
for action_item in self._actions_stack:
|
|
if not action_item == '':
|
|
action = (action_item, )
|
|
break
|
|
else:
|
|
action = (parsing_result.action, )
|
|
|
|
if not self._section_stack == []:
|
|
context = (tuple(self._section_stack), )
|
|
else:
|
|
context = ('', )
|
|
|
|
directive = tuple(parsing_result.directive.asList())
|
|
directive_value = ['']
|
|
|
|
directive_name = action + context + directive
|
|
directive_value = self._last_comments_list + directive_value
|
|
self._last_comments_list = []
|
|
|
|
self._item_to_add = OrderedDict({directive_name: directive_value})
|
|
self._ready_to_update = True
|
|
except ParseException:
|
|
return
|
|
|
|
def _parse_comment_line(self, line):
|
|
try:
|
|
result = self._comment_line.parseString(line)
|
|
self._match = True
|
|
|
|
if not self._ignore_comments:
|
|
self._last_comments_list.append(result.comment)
|
|
except ParseException:
|
|
return
|
|
|
|
def get_document_text(self):
|
|
file_loader = PackageLoader('calculate.templates.format',
|
|
self.TEMPLATES_DIRECTORY)
|
|
formats_environment = Environment(loader=file_loader)
|
|
formats_environment.globals.update(zip=zip)
|
|
formats_environment.add_extension('jinja2.ext.do')
|
|
template = formats_environment.get_template(self._format)
|
|
document_text = template.render(
|
|
document_dictionary=self._document_dictionary
|
|
)
|
|
return document_text
|