You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# vim: fileencoding=utf-8
#
from .base_format import Format, FormatError
from jinja2 import Environment, PackageLoader
from collections import OrderedDict
from pyparsing import originalTextFor, Literal, Word, printables, Regex,\
OneOrMore, alphanums, ParseException, Group, Optional,\
alphas, Keyword, ZeroOrMore
class ProFTPDFormat(Format):
'''Класс формата ProFTPD. В отличие от других форматов, при разборе
вложенных на несколько уровней блоков, в итоговый словарь добавляются не
вложенные на несколько уровней словари, а отдельные объекты этих блоков,
содержащие в названии последовательность имен тегов, к которым они
относятся.
'''
FORMAT = 'proftpd'
EXECUTABLE = False
_initialized = False
comment_symbol = '#'
def __new__(cls, *args, **kwargs):
if not cls._initialized:
cls._initialize_parser()
return super().__new__(cls)
def __init__(self, document_text: str,
template_path,
ignore_comments=False,
join_before=False,
add_header=False,
already_changed=False,
**kwargs):
processing_methods = [self._parse_comment_line,
self._parse_section_start_line,
self._parse_section_end_line,
self._parse_single_key_directive_line,
self._parse_double_key_directive_line,
self._parse_to_delete_double_key_directive_line,
self._parse_full_key_directive_line,
self._parse_plain_directive_line,
self._parse_to_delete_plain_directive_line]
super().__init__(processing_methods)
self._ignore_comments = ignore_comments
self._need_finish = True
self._comments_processing = True
self._join_before = join_before
self._section_stack = []
self._actions_stack = []
self._last_comments_list = []
if add_header and not ignore_comments:
self.header, document_text = self._get_header_and_document_text(
document_text,
template_path,
already_changed=already_changed)
else:
self.header = ''
document_text = document_text.strip()
if document_text == '':
self._document_dictionary = OrderedDict()
else:
document_lines = self._get_list_of_logic_lines(document_text)
self._lines_to_dictionary(document_lines)
@classmethod
def _initialize_parser(cls):
'''Метод для инициализации парсеров.'''
left_angle_bracket = Literal('<')
right_angle_bracket = Literal('>')
slash = Literal('/')
action_symbols = (Literal('!') |
Literal('-'))
directive = Word(alphas, alphanums)
section_value = Word(printables, excludeChars='<>')
directive_value = Word(printables)
cls._section_start_parser = (left_angle_bracket.suppress()
+ Optional(action_symbols,
default='')('action')
+ Group(directive('name')
+ originalTextFor(
OneOrMore(section_value)
)('value')
)('directive')
+ right_angle_bracket.suppress())
cls._section_end_parser = (left_angle_bracket.suppress()
+ slash.suppress()
+ directive('directive')
+ right_angle_bracket.suppress())
cls._plain_directive_parser = (Optional(action_symbols)('action')
+ Group(directive('name')
+ originalTextFor(
OneOrMore(directive_value)
)('value')
)('directive')
)
cls._delete_plain_directive_parser = (action_symbols('action')
+ directive('directive'))
single_key_directive = (Keyword('AllowAll') |
Keyword('DenyAll') |
Keyword('AccessDenyMsg') |
Keyword('AccessGrantMsg') |
Keyword('ByteRatioErrMsg') |
Keyword('LeechRatioMsg') |
Keyword('CwdRatioMsg') |
Keyword('FileRatioErrMsg'))
cls._single_key_directive_parser = (Optional(action_symbols)('action')
+ single_key_directive(
'directive'
))
double_key_directive = (Keyword('AccessDenyMsg') |
Keyword('AccessGrantMsg') |
Keyword('ByteRatioErrMsg') |
Keyword('Allow from') | Keyword('Allow') |
Keyword('AllowFilter') |
Keyword('AnonymousGroup') |
Keyword('AuthPAMConfig') |
Keyword('Bind') |
Keyword('CDPath') |
Keyword('Define') |
Keyword('Deny from') | Keyword('Deny') |
Keyword('DenyFilter') |
Keyword('DisplayChdir') |
Keyword('ExtendedLog') |
Keyword('AnonRatio') |
Keyword('GroupRatio') |
Keyword('HideGroup') |
Keyword('HideUser') |
Keyword('HostRatio') |
Keyword('Include') |
Keyword('LDAPAttr') |
Keyword('LeechRatioMsg') |
Keyword('CwdRatioMsg') |
Keyword('FileRatioErrMsg') |
Keyword('LogFormat') |
Keyword('MaxClientsPerClass') |
Keyword('PIDFile') |
Keyword('RewriteMap') |
Keyword('RewriteRule') |
Keyword('SetEnv') |
Keyword('SQLConnectInfo') |
Keyword('SQLGroupWhereClause') |
Keyword('SQLLog') |
Keyword('SQLNamedQuery') |
Keyword('SQLShowInfo') |
Keyword('SQLUserInfo') |
Keyword('SQLUserWhereClause') |
Keyword('LoadModule') |
Keyword('LoadFile') |
Keyword('TransferRate') |
Keyword('UnsetEnv') |
Keyword('UserPassword') |
Keyword('UserRatio') |
Keyword('ModuleControlsACLs') |
Keyword('ControlsACLs'))
cls._double_key_directive_parser = (Optional(action_symbols)('action')
+ Group((double_key_directive
+ directive_value
)('name')
+ originalTextFor(
ZeroOrMore(
directive_value
)
)('value')
)('directive')
)
cls._delete_double_key_directive_parser = (action_symbols('action')
+ Group(
(
double_key_directive
+ directive_value
)('name')
)('directive')
)
full_key_directive = (Keyword('AllowClass') |
Keyword('AllowGroup') |
Keyword('AllowUser') |
Keyword('Class') |
Keyword('DenyClass') |
Keyword('DenyGroup') |
Keyword('DenyUser') |
Keyword('DirFakeGroup') |
Keyword('DirFakeUser') |
Keyword('HideFiles') |
Keyword('MaxRetrieveFileSize') |
Keyword('MaxStoreFileSize') |
Keyword('RewriteCondition') |
Keyword('RewriteLock') |
Keyword('TimeoutSession') |
Keyword('UserAlias'))
cls._full_key_directive_parser = (Optional(action_symbols)('action')
+ Group(full_key_directive
+ OneOrMore(
directive_value
)
)('directive')
)
cls._comment_line = originalTextFor(
Literal(cls.comment_symbol)
+ Regex(r'.*'))('comment')
cls._initialized = True
def _parse_section_start_line(self, line):
'''Метод для разбора тега открывающего секцию.'''
try:
parsing_result = self._section_start_parser.parseString(line)
self._match = True
section_name = tuple(parsing_result.directive.asList())
self._actions_stack.append(parsing_result.action)
self._section_stack.append(section_name)
except ParseException:
return
def _parse_section_end_line(self, line):
'''Метод для разбора тега закрывающего секцию.'''
try:
parsing_result = self._section_end_parser.parseString(line)
self._match = True
current_section = self._section_stack.pop()
self._actions_stack.pop()
directive = tuple(parsing_result.directive)
if not current_section[1] != directive:
raise FormatError("incorrect end tag </{}>, expected </{}>".
format(directive, current_section))
except ParseException:
return
def _parse_plain_directive_line(self, line):
'''Метод для разбора строк состоящих из имени параметра и его значения.
'''
try:
parsing_result = self._plain_directive_parser.parseString(line)
self._match = True
for action_item in self._actions_stack:
if not action_item == '':
action = (action_item, )
break
else:
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = (parsing_result.directive.name, )
directive_value = [parsing_result.directive.value]
directive_name = action + context + directive
directive_value = self._last_comments_list + directive_value
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_to_delete_plain_directive_line(self, line):
'''Метод для разбора строк с параметрами, подлежащими удалению, то есть
содержащих имя параметра с символом ! и, опционально, его значение.'''
try:
parsing_result = self._delete_plain_directive_parser.parseString(
line
)
self._match = True
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = (parsing_result.directive, )
directive_name = action + context + directive
directive_value = self._last_comments_list + ['']
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_single_key_directive_line(self, line):
'''Метод для разбора строк с директивами, состоящих лишь из одного
имени директивы.
'''
try:
parsing_result = self._single_key_directive_parser.parseString(
line
)
self._match = True
for action_item in self._actions_stack:
if not action_item == '':
action = (action_item, )
break
else:
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = (parsing_result.directive, )
directive_value = ['']
directive_name = action + context + directive
directive_value = self._last_comments_list + directive_value
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_double_key_directive_line(self, line):
'''Метод для разбора директив, добавление в словарь которых возможно
посредством формирования ключа из названия директивы и последующего за
ней значения.'''
try:
parsing_result = self._double_key_directive_parser.parseString(
line
)
self._match = True
for action_item in self._actions_stack:
if not action_item == '':
action = (action_item, )
break
else:
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = tuple(parsing_result.directive.name.asList())
directive_value = [parsing_result.directive.value]
directive_name = action + context + directive
directive_value = self._last_comments_list + directive_value
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_to_delete_double_key_directive_line(self, line):
'''Метод для разбора директив, подлежащих удалению, ключи которых
формируются из названия директивы и последующего за ней значения.'''
try:
parsing_result = self._delete_double_key_directive_parser.\
parseString(line)
self._match = True
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = tuple(parsing_result.directive.name.asList())
directive_name = action + context + directive
directive_value = self._last_comments_list + ['']
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_full_key_directive_line(self, line):
'''Метод для разбора строк с директивами, из имен которых невозможно
составить уникальный ключ для итогового словаря. Такие директивы
разбираем полностью как ключ для пустого значения в словаре.'''
try:
parsing_result = self._full_key_directive_parser.parseString(line)
self._match = True
for action_item in self._actions_stack:
if not action_item == '':
action = (action_item, )
break
else:
action = (parsing_result.action, )
if not self._section_stack == []:
context = (tuple(self._section_stack), )
else:
context = ('', )
directive = tuple(parsing_result.directive.asList())
directive_value = ['']
directive_name = action + context + directive
directive_value = self._last_comments_list + directive_value
self._last_comments_list = []
self._item_to_add = OrderedDict({directive_name: directive_value})
self._ready_to_update = True
except ParseException:
return
def _parse_comment_line(self, line):
'''Метод для разбора строк, содержащих комментарии.'''
try:
result = self._comment_line.parseString(line)
self._match = True
if not self._ignore_comments:
self._last_comments_list.append(result.comment)
except ParseException:
return
@property
def document_text(self):
file_loader = PackageLoader('calculate.templates.format',
self.TEMPLATES_DIRECTORY)
formats_environment = Environment(loader=file_loader)
formats_environment.globals.update(zip=zip)
formats_environment.add_extension('jinja2.ext.do')
template = formats_environment.get_template(self.FORMAT)
document_text = template.render(
document_dictionary=self._document_dictionary
)
return '{}{}'.format(self.header, document_text)