"X" value is available in symbolic chmod values now. fixed #65

master
root 3 years ago
parent 0d269bbadd
commit dca33902d9

@ -47,9 +47,7 @@ class Format:
к каждой строке применяет парсеры, определенные для некоторого формата.
Первый парсер, которому удается разобрать строку используется для
формирования словаря.'''
# print('Lines processing...')
for line in document_lines:
# print(self._line_timer, '\t', line)
for processing_method in self._processing_methods:
try:
processing_method(line)

@ -6,9 +6,9 @@
from .base_format import Format
from ..template_engine import ParametersContainer
from collections import OrderedDict
from pyparsing import originalTextFor, Literal, Word, printables,\
OneOrMore, alphanums, ParseException, Regex,\
Group, Optional, alphas, lineEnd, lineStart, Keyword
from pyparsing import originalTextFor, Literal, Word, printables, alphanums,\
ParseException, Regex, Group, Optional, alphas, lineEnd,\
lineStart, Keyword
class DovecotFormat(Format):
@ -25,12 +25,12 @@ class DovecotFormat(Format):
return super().__new__(cls)
def __init__(self, document_text: str,
template_path,
ignore_comments=False,
join_before=False,
add_header=False,
already_changed=False,
parameters=ParametersContainer()):
template_path: str,
ignore_comments: bool = False,
join_before: bool = False,
add_header: bool = False,
already_changed: bool = False,
parameters: ParametersContainer = ParametersContainer()):
processing_methods = [self._parse_comment_line,
self._parse_section_start_line,
self._parse_include_line,
@ -39,16 +39,17 @@ class DovecotFormat(Format):
self._parse_parameter_to_delete_line]
super().__init__(processing_methods)
self._ignore_comments = ignore_comments
self._comments_processing = True
self._need_finish = True
self._join_before = join_before
self._ignore_comments: bool = ignore_comments
self._comments_processing: bool = True
self._need_finish: bool = True
self._join_before: bool = join_before
self._section_stack = OrderedDict()
self._current_section_name = ''
self._section_stack: OrderedDict = OrderedDict()
self._current_section_name: str = ''
self._last_comments_list = []
self._last_comments_list: list = []
self.header: str
if add_header and not ignore_comments:
self.header, document_text = self._get_header_and_document_text(
document_text,

@ -510,7 +510,6 @@ class LDAPFormat(Format):
try:
parsing_result = self._comment_line.parseString(line)
self._match = True
print('comment is found')
if not self._ignore_comments:
# До того, как первый элемент встречен -- все комментарии

@ -18,8 +18,12 @@ import copy
import re
import os
import stat
from typing import Union, Any, List
from typing import (
Union,
Any,
List,
Tuple
)
from ..utils.package import (
PackageAtomName,
PackageAtomParser,
@ -151,7 +155,7 @@ class ParametersProcessor:
format_is_inspected: bool = False
chmod_value_regular = re.compile(
r'([r-][w-][x-])([r-][w-][x-])([r-][w-][x-])')
r'([r-][w-][Xx-])([r-][w-][Xx-])([r-][w-][Xx-])')
def __init__(self,
parameters_container: Union["ParametersContainer",
@ -398,17 +402,17 @@ class ParametersProcessor:
raise IncorrectParameter(
"'restart' parameter value is not correct")
def check_stop_parameter(self, parameter_value):
def check_stop_parameter(self, parameter_value: Any):
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'stop' parameter value is empty")
return parameter_value
def check_start_parameter(self, parameter_value):
def check_start_parameter(self, parameter_value: Any):
if not parameter_value and isinstance(parameter_value, bool):
raise IncorrectParameter("'start' parameter value is empty")
return parameter_value
def check_run_parameter(self, parameter_value):
def check_run_parameter(self, parameter_value: Any):
if self.template_type == DIR:
raise IncorrectParameter("'run' parameter is not available in"
" directory templates")
@ -421,7 +425,7 @@ class ParametersProcessor:
" found")
return interpreter_path
def check_exec_parameter(self, parameter_value):
def check_exec_parameter(self, parameter_value: Any):
if self.template_type == DIR:
raise IncorrectParameter("'exec' parameter is not available in"
" directory templates")
@ -434,38 +438,47 @@ class ParametersProcessor:
" found")
return interpreter_path
def check_chown_parameter(self, parameter_value):
def check_chown_parameter(self, parameter_value: Any):
if not parameter_value or isinstance(parameter_value, bool):
raise IncorrectParameter("'chown' parameter value is empty.")
parameter_value = self.get_chown_values(parameter_value)
return parameter_value
def check_chmod_parameter(self, parameter_value):
print('check chmod value:', parameter_value)
def check_chmod_parameter(self, parameter_value: Any) -> Union[int, tuple]:
result = self.chmod_value_regular.search(parameter_value)
if result:
print("chmod is alphas")
parameter_value = ''
for group_number in range(3):
current_group = result.groups()[group_number]
num = ''
for sym_number in range(3):
if current_group[sym_number] != '-':
num = num + '1'
else:
num = num + '0'
parameter_value = parameter_value + num
return int(parameter_value, 2)
return self._translate_symbol_chmod(result)
elif parameter_value.isdigit():
parameter_value = int(parameter_value, 8)
print("chmod is digits:", parameter_value)
return parameter_value
else:
raise IncorrectParameter("'chmod' parameter value is not correct")
def check_source_parameter(self, parameter_value):
def _translate_symbol_chmod(self, result) -> Tuple[int, int]:
'''Метод для перевода буквенного значения chmod в числовое.
Возвращает кортеж (chmod, x_mask):
chmod -- число, полученное из последовательности битов, где
"r", "w" и "x" -> 1, "-" и "X" -> 0;
x_mask -- маска, полученная из последовательности битов, где
"X" -> 1, "r", "w", "-" и "x" -> 0. Она необходима для получения
значения chmod для файлов.'''
chmod = ''
x_mask = ''
for group_index in range(3):
group = result.groups()[group_index]
for sym_index in range(3):
if group[sym_index] in {'-', 'X'}:
chmod = chmod + '0'
else:
chmod = chmod + '1'
if group[sym_index] == 'X':
x_mask = x_mask + "1"
else:
x_mask = x_mask + "0"
return (int(chmod, 2), int(x_mask, 2))
def check_source_parameter(self, parameter_value: Any):
if not parameter_value or isinstance(parameter_value, bool):
raise IncorrectParameter("'source' parameter value is empty")
@ -497,7 +510,7 @@ class ParametersProcessor:
return os.path.normpath(real_path)
def check_env_parameter(self, parameter_value):
def check_env_parameter(self, parameter_value: Any):
env_set = set()
for env_value in parameter_value.split(','):
@ -525,20 +538,20 @@ class ParametersProcessor:
return env_set
def check_force_parameter(self, parameter_value):
def check_force_parameter(self, parameter_value: Any):
if isinstance(parameter_value, bool):
return parameter_value
else:
raise IncorrectParameter("'force' parameter value is not bool")
def check_autoupdate_parameter(self, parameter_value):
def check_autoupdate_parameter(self, parameter_value: Any):
if isinstance(parameter_value, bool):
return parameter_value
else:
raise IncorrectParameter(
"'autoupdate' parameter value is not bool.")
def check_format_parameter(self, parameter_value):
def check_format_parameter(self, parameter_value: Any):
if self.template_type == DIR:
raise IncorrectParameter("'format' parameter is redundant for"
" directory templates.")
@ -551,13 +564,13 @@ class ParametersProcessor:
raise IncorrectParameter("'format' parameter must be string value not"
f" {type(parameter_value)}.")
def check_handler_parameter(self, parameter_value):
def check_handler_parameter(self, parameter_value: Any):
if not isinstance(parameter_value, str):
raise IncorrectParameter("'handler' parameter must be string"
f" value not {type(parameter_value)}.")
return parameter_value
def check_notify_parameter(self, parameter_value):
def check_notify_parameter(self, parameter_value: Any):
if isinstance(parameter_value, list):
return parameter_value
elif isinstance(parameter_value, str):
@ -627,11 +640,11 @@ class ParametersProcessor:
def check_postparse_handler(self, parameter_value):
if self._parameters_container.merge:
raise IncorrectParameter("'merge' parameter is not available"
" in handler templates.")
" in handler templates")
elif (self._parameters_container.package and
not self._parameters_container.is_inherited('package')):
raise IncorrectParameter("'package' parameter is not available"
" in handler templates.")
" in handler templates")
def check_postparse_package(self, parameter_value):
groups = []

@ -405,7 +405,7 @@ class TemplateWrapper:
and not self.parameters.handler):
raise TemplateCollisionError(
"'package' parameter is not defined for"
" template with 'append' parameter.")
" template with 'append' parameter")
else:
return
elif parameter_package is None:
@ -1707,7 +1707,11 @@ class TemplateExecutor:
'''Метод для смены прав доступа к директории.'''
try:
if os.path.exists(target_path):
os.chmod(target_path, chmod_value)
if isinstance(chmod_value, int):
os.chmod(target_path, chmod_value)
else:
chmod_value = self._use_chmod_x_mask(chmod_value)
os.chmod(target_path, chmod_value)
else:
raise TemplateExecutorError(
'The target directory does not exist: {0}'.
@ -1743,13 +1747,32 @@ class TemplateExecutor:
raise TemplateExecutorError(
'The target file does not exist: {0}'.
format(target_path))
os.chmod(target_path, chmod_value)
if isinstance(chmod_value, int):
os.chmod(target_path, chmod_value)
else:
chmod_value = self._use_chmod_x_mask(
chmod_value,
current_mode=self._get_file_mode(
target_path))
os.chmod(target_path, chmod_value)
except (OSError, Exception) as error:
if not self._check_os_error(error, target_path):
raise TemplateExecutorError(
'Can not chmod file: {0}, reason: {1}'.
format(target_path, str(error)))
def _use_chmod_x_mask(self, chmod: Tuple[int, int],
current_mode: Union[int, None] = None) -> int:
'''Метод для наложения X-маски, необходимой для получения значения
chmod, c учетом возможности наличия в нем значения "X".'''
if not chmod[1]:
return chmod[0]
if current_mode is None:
return chmod[0] ^ chmod[1]
else:
return chmod[0] ^ (current_mode & chmod[1])
def _get_file_mode(self, file_path: str) -> int:
'''Метод для получения прав доступа для указанного файла.'''
if not os.path.exists(file_path):
@ -2794,15 +2817,15 @@ class DirectoryProcessor:
return False
except TemplateExecutorError as error:
self.output.set_error('Template execution error: {} Template: {}'.
self.output.set_error('Template execution error: {}. Template: {}'.
format(str(error),
template_path))
return False
except FormatError as error:
if error.executable:
msg = 'Format execution error: {} Template: {}'
msg = 'Format execution error: {}. Template: {}'
else:
msg = 'Format joining error: {} Template: {}'
msg = 'Format joining error: {}. Template: {}'
self.output.set_error(msg.format(str(error), template_path))
return False

@ -637,6 +637,9 @@ class TestDirectoryProcessor:
'/etc/dir_11'))
assert not os.path.exists(join_paths(CHROOT_PATH,
'/etc/dir_11/file_0'))
print("MESSAGES:")
for msg_type, msg in io_module.messages:
print(f"{msg_type} -> {msg}")
assert ('error', error_message) in io_module.messages
assert directory_processor.template_executor.\
@ -1568,7 +1571,7 @@ class TestDirectoryProcessor:
assert io.messages[-1] ==\
("error",
"Format execution error: correction failed Template:"
"Format execution error: correction failed. Template:"
f" {join_paths(CHROOT_PATH, 'templates_53/install/patch')}")
def test_templates_with_append_link_force_and_source_paths_to_an_unexisting_file_or_directory(self):
@ -1639,8 +1642,8 @@ class TestDirectoryProcessor:
datavars.main['cl_template_path'] = os.path.join(CHROOT_PATH,
'templates_58')
mode = os.stat(join_paths(CHROOT_PATH, '/etc/dir_76')).st_mode
if mode != 0o100755:
os.chmod(join_paths(CHROOT_PATH, '/etc/dir_76'), 0o100755)
if mode != 0o40755:
os.chmod(join_paths(CHROOT_PATH, '/etc/dir_76'), 0o40755)
directory_processor = DirectoryProcessor('install',
datavars_module=datavars
@ -1679,6 +1682,39 @@ class TestDirectoryProcessor:
"Eluveitie -- Siraxta\n")
assert (stat.st_uid, stat.st_gid) == (0, 0)
def test_different_symbol_chmod_values_for_dirs_and_files(self):
datavars.main['cl_template_path'] = os.path.join(CHROOT_PATH,
'templates_60')
mode = os.stat(join_paths(CHROOT_PATH, '/etc/dir_78')).st_mode
if mode != 0o40755:
os.chmod(join_paths(CHROOT_PATH, '/etc/dir_78'), 0o40755)
mode = os.stat(join_paths(CHROOT_PATH,
'/etc/dir_78/file_0')).st_mode
if mode != 0o100744:
os.chmod(join_paths(CHROOT_PATH, '/etc/dir_78/file_0'), 0o100744)
mode = os.stat(join_paths(CHROOT_PATH,
'/etc/dir_78/file_1')).st_mode
if mode != 0o100555:
os.chmod(join_paths(CHROOT_PATH, '/etc/dir_78/file_1'), 0o100555)
directory_processor = DirectoryProcessor('install',
datavars_module=datavars
)
directory_processor.process_template_directories()
mode = os.stat(join_paths(CHROOT_PATH, '/etc/dir_78')).st_mode
assert int(mode) == 0o40655
mode = os.stat(join_paths(CHROOT_PATH,
'/etc/dir_78/file_0')).st_mode
assert int(mode) == 0o100645
mode = os.stat(join_paths(CHROOT_PATH,
'/etc/dir_78/file_1')).st_mode
assert int(mode) == 0o100655
def test_view_tree(self):
list_path = join_paths(CHROOT_PATH, '/etc')
show_tree(list_path)

@ -241,11 +241,28 @@ class TestTemplateParameters:
FILE, 1)
parameters_processor.check_postparse_parameters()
def test_if_TemplateParameters_object_is_intialized_using_dictionary_with_correct_chmod_parameter__the_object_will_be_initialized_successfully(self):
def test_if_TemplateParameters_object_is_intialized_using_dictionary_with_correct_symbolic_chmod_parameter_without_X_value__the_object_will_be_initialized_successfully(self):
parameters = {'chmod': 'rw-r--r--'}
parameters_object = ParametersContainer()
parameters_processor.set_parameters_container(parameters_object)
try:
for parameter_name, parameter_value in parameters.items():
parameters_processor.check_template_parameter(parameter_name,
parameter_value,
DIR, 1)
parameters_processor.check_postparse_parameters()
except Exception as error:
pytest.fail('Unexpected exception: {0}'.
format(str(error)))
assert parameters_object.chmod == (0o644, 0)
def test_if_TemplateParameters_object_is_intialized_using_dictionary_with_correct_symbolic_chmod_parameter_with_X_value__the_object_will_be_initialized_successfully(self):
parameters = {'chmod': 'rwXr--r-X'}
parameters_object = ParametersContainer()
parameters_processor.set_parameters_container(parameters_object)
try:
for parameter_name, parameter_value in parameters.items():
parameters_processor.check_template_parameter(parameter_name,
@ -256,7 +273,7 @@ class TestTemplateParameters:
pytest.fail('Unexpected exception: {0}'.
format(str(error)))
assert parameters_object.chmod == 0o644
assert parameters_object.chmod == (0o644, 0b1000001)
def test_if_TemplateParameters_object_is_intialized_using_dictionary_with_correct_chmod_parameter_in_its_digital_form__the_object_will_be_initialized_successfully(self):
parameters = {'chmod': '600'}

@ -0,0 +1 @@
{% calculate chmod = "rw-r-Xr-x", package = "test-category/test-package" %}

@ -0,0 +1,2 @@
{% calculate append = "join" %}
Eluveitie -- Slanias song

@ -0,0 +1,2 @@
{% calculate append = "join" %}
Eluveitie -- Thousandfold

@ -12,3 +12,6 @@ obj /etc/dir_76
obj /etc/dir_76/file_0 3fd436479300b04370b97f4bcfdc90f7 1592574626
obj /etc/dir_77
obj /etc/dir_77/file_0 3fd436479300b04370b97f4bcfdc90f7 1592574626
obj /etc/dir_78
obj /etc/dir_78/file_0 3fd436479300b04370b97f4bcfdc90f7 1592574626
obj /etc/dir_78/file_1 3fd436479300b04370b97f4bcfdc90f7 1592574626

Loading…
Cancel
Save