The methods for checking of the packages collision is added. Started developing of the ._cfg logic.

packages
Иванов Денис 4 years ago
parent a2dfb57e81
commit 49a97a87a7

@ -12,6 +12,7 @@ from pyparsing import originalTextFor, OneOrMore, Word, alphanums, Literal,\
class BINDFormat(BaseFormat):
FORMAT = 'bind'
EXECUTABLE = False
def __init__(self, document_text: str,
ignore_comments=False,

@ -9,6 +9,7 @@ from pyparsing import originalTextFor, Literal, ZeroOrMore, Word, printables,\
class CompizFormat(BaseFormat):
FORMAT = 'compiz'
EXECUTABLE = False
_initialized = False
_comment_symbol = ''

@ -9,6 +9,7 @@ from pyparsing import Literal, Regex, Word, nums, alphanums, Optional,\
class ContentsFormat(BaseFormat):
FORMAT = 'contents'
EXECUTABLE = False
_initialized = False

@ -8,6 +8,7 @@ from os import path
class DiffFormat(BaseFormat):
FORMAT = 'diff'
EXECUTABLE = True
def __init__(self, document_text: str, comment_symbol=''):
self._patch_text = document_text

@ -26,6 +26,6 @@ class JSONFormat(BaseFormat):
object_pairs_hook=OrderedDict)
@property
def get_document_text(self):
def document_text(self):
json_file_text = json.dumps(self._document_dictionary, indent=4)
return json_file_text

@ -27,6 +27,10 @@ class VersionError(Exception):
pass
class PackageNotFound(Exception):
pass
class Version:
'''Временный класс для работы со значениями версий.'''
def __init__(self, version_value=None):
@ -201,6 +205,10 @@ class PackageAtomName:
def category(self):
return os.path.basename(os.path.dirname(self._package_directory))
@property
def atom(self):
return "{}/{}".format(self.name, self.category)
@property
def version(self):
return self._version
@ -253,12 +261,17 @@ class PackageAtomName:
class PackageAtomParser:
package_name_pattern =\
r'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-\d[^\s:]*)?'
atom_regex = re.compile(r'''(?P<category>[^\s/]*)/
(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)
(?P<version>-\d[^\s:]*)?
{0}
(?P<slot>:[^\s\[]*)?\s*
(?P<use_flags>\[\S*(?:\s+\S*)*\])?''',
(?P<use_flags>\[\S*(?:\s+\S*)*\])?'''.format(
package_name_pattern
),
re.VERBOSE)
package_name_regex = re.compile(package_name_pattern)
def __init__(self, pkg_path='/var/db/pkg',
chroot_path='/'):
@ -429,7 +442,7 @@ class PackageAtomParser:
category)):
yield path
def get_file_package(self, file_path, with_slot=False, with_uses=False):
def get_file_package(self, file_path):
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
@ -439,14 +452,24 @@ class PackageAtomParser:
with open(contents_path, 'r') as contents_file:
for file_line in contents_file.readlines():
contents_name = file_line.split(' ')[1].strip()
if contents_name == file_path:
package = '/'.join(os.path.dirname(
contents_path).split('/')[-2:])
return package
package_path = os.path.dirname(contents_path)
package_name = os.path.basename(package_path)
parsing_result = self.package_name_regex.\
search(package_name)
version = parsing_result.groupdict()['version']
version = Version(version)
package_atom = PackageAtomName(
{'pkg_path': package_path,
'version': version})
return package_atom
except (OSError, IOError):
continue
else:
raise PackageAtomError("The file does not belong to any package")
raise PackageNotFound("The file does not belong to any package")
@property
def atom_dictionary(self):
@ -522,7 +545,7 @@ class Package:
package_atom)
return os.path.join(atom_name.pkg_path,
'CONTENTS')
if isinstance(package_atom, PackageAtomName):
elif isinstance(package_atom, PackageAtomName):
return os.path.join(package_atom.pkg_path,
'CONTENTS')
else:
@ -544,6 +567,7 @@ class Package:
contents_text = read_file(self.contents_file_path)
except FilesError as error:
raise PackageError(str(error))
if contents_text:
contents_format = ContentsFormat(contents_text,
template_parser=False)
@ -558,7 +582,7 @@ class Package:
def render_contents_file(self):
contents_format = ContentsFormat('', template_parser=False)
contents_format._document_dictionary = self.contents_dictionary
return contents_format.get_document_text()
return contents_format.document_text
def add_dir(self, file_name):
file_name = self.remove_cfg_prefix(file_name)
@ -635,5 +659,19 @@ class Package:
def remove_empty_directories(self):
pass
def check_file_md5(self, file_path):
pass
def check_md5(self, file_path):
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
if file_path not in self.contents_dictionary:
return False
try:
file_text = read_file(file_path).encode()
except FilesError as error:
raise PackageError(str(error))
file_md5 = hashlib.md5(file_text).hexdigest()
contents_md5 = self.contents_dictionary[file_path]['md5']
return file_md5 == contents_md5

@ -1,8 +1,8 @@
from calculate.templates.template_engine import TemplateEngine, Variables,\
FILE, DIR, LINK,\
ParametersProcessor
FILE, DIR, ParametersProcessor
from calculate.templates.template_processor import TemplateAction
from calculate.utils.package import PackageAtomParser
from calculate.utils.package import PackageAtomParser, Package, PackageNotFound
import glob
import shutil
import os
@ -42,25 +42,40 @@ class TemplateTypeConflict(Exception):
pass
class TemplateCollisionError(Exception):
pass
class TemplateWrapper:
type_checks = {DIR: os.path.isdir,
FILE: os.path.isfile}
chroot_path = '/'
package_atom_parser = PackageAtomParser(chroot_path = chroot_path)
package_atom_parser = PackageAtomParser(chroot_path=chroot_path)
_protected_is_set = False
_protected_set = {'/etc'}
_unprotected_set = set()
def __init__(self, target_file_path, parameters, template_type,
template_text=''):
self._target_path = target_file_path
self.target_path = target_file_path
self.target_package_name = None
self.parameters = parameters
self.template_type = template_type
self.template_text = template_text
self.remove_original = False
if self.parameters.format:
self.format_class = ParametersProcessor.\
available_formats[self.parameters.format]
else:
# Здесь будет детектор форматов.
pass
# Если по этому пути что-то есть -- проверяем конфликты.
if os.path.exists(target_file_path):
for file_type, checker in self.type_checks.items():
@ -71,7 +86,19 @@ class TemplateWrapper:
else:
self.target_type = None
self.check_conflicts(self)
self.check_conflicts()
self.check_package_collision()
# Если целью является файл -- проверяем наличие ._cfg0000_filename
# файлов.
if self.target_type is FILE:
self._cfg_pattern = os.path.join(
os.path.dirname(self.target_path),
"._cfg????_{}".format(
os.path.basename(self.target_path)))
self._cfg_list = glob.glob(self._cfg_pattern)
def check_conflicts(self):
'''Проверка конфликтов типов.'''
@ -81,8 +108,8 @@ class TemplateWrapper:
return
elif self.target_type == DIR:
raise TemplateTypeConflict(
"The target is a directory while the"
" template has append = 'link'.")
"The target is a directory while the"
" template has append = 'link'.")
else:
raise TemplateTypeConflict(
"The target is a file while the"
@ -96,7 +123,7 @@ class TemplateWrapper:
raise TemplateTypeConflict("The target is a file while the"
" template is a directory.")
elif self.target_is_link and self.target_type == DIR:
self._target_path = os.readlink(self._target_path)
self.target_path = os.readlink(self.target_path)
return
else:
return
@ -107,7 +134,7 @@ class TemplateWrapper:
self.remove_original = True
return
elif self.target_is_link and self.target_type == FILE:
self._target_path = os.readlink(self._target_path)
self.target_path = os.readlink(self.target_path)
return
else:
return
@ -117,25 +144,87 @@ class TemplateWrapper:
raise TemplateTypeConflict("The target file is a directory"
" while the template is a file.")
def check_original_file(self):
def check_package_collision(self):
'''Проверка на предмет коллизии, то есть конфликта пакета шаблона и
целевого файла.'''
if self.parameters.package:
package_atom = self.parameters.package
self.target_package_name = self.parameters.package
if self.target_type is None:
return
try:
file_package = self.package_atom_parser.get_file_package(
self.target_path)
except PackageNotFound:
return
if self.target_package_name is None:
self.target_package_name = file_package
elif file_package != self.target_package_name:
raise TemplateCollisionError(
"The template package is {0} while target"
" file package is {1}").format(
self.target_package_name.atom,
file_package.atom
)
self.target_package = Package(self.target_package_name,
chroot_path=self.chroot_path)
def check_user_changes(self):
if (self.template_type is None or
self.target_package is None or
self.target_type != FILE):
return
if self.target_package.check_md5(self.target_path):
pass
else:
# пока что определяем значение package для всех файлов.
package_atom = package_atom_parser.get_file_package()
pass
def join_template(self):
if self.template_type is not None:
original_file_text = ''
def _update_contents(self):
pass
def use_format(self):
if self.format_class.EXECUTABLE:
# Для исполняемых форматов.
return
original_file = open(self._target_path, 'w')
if self.target_type is None:
original_file_text = ''
else:
original_file = open(self._target_path, 'r')
original_file = open(target_path, 'r')
original_file_text = original_file.read()
original_file.close()
original_file = open(self._target_path, 'w')
original_file = open(target_path, 'w')
original_object = self.format_class(original_file_text)
template_object = self.format_class(self.template_text)
original_object.join_template(template_object)
original_file.write(original_object.document_text)
original_file.close()
def accept_changes(self):
pass
@classmethod
def _set_protected(cls):
if cls._protected_is_set:
return
config_protect_env = os.environ.get('CONFIG_PROTECT', False)
if config_protect_env:
for protected_path in config_protect_env.split():
cls._protected_set.add(protected_path.strip())
config_protect_mask_env = os.environ.get('CONFIG_PROTECT_MASK', False)
if config_protect_mask_env:
for unprotected_path in config_protect_mask_env.split():
cls._unprotected_set.add(protected_path.strip())
cls._protected_is_set = True
class TemplateExecutor:
@ -157,6 +246,7 @@ class TemplateExecutor:
self.file_appends = {'join': self._append_join_file}
self.formats_classes = ParametersProcessor.available_formats
TemplateWrapper.chroot_path = self.chroot_path
def use_template(self, target_path, parameters, template_type,
template_text=''):

@ -359,7 +359,7 @@ options {
with open('./tests/templates/format/testfiles/bind_result.conf', 'r') as result_file:
result_text = result_file.read()
assert bind_original_object.get_document_text() == result_text
assert bind_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''
@ -402,4 +402,4 @@ options {
document_2_object = BINDFormat(document_2)
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -254,7 +254,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/compiz_result', 'r') as result_file:
result_text = result_file.read()
assert compiz_original_object.get_document_text() == result_text
assert compiz_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''[Added Associations]
@ -293,4 +293,4 @@ video/vnd.mpegurl=smplayer.desktop;
document_2_object = CompizFormat(document_2)
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -79,4 +79,4 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/contents_result', 'r') as result_file:
result_text = result_file.read()
assert contents_original_object.get_document_text() == result_text
assert contents_original_object.document_text == result_text

@ -252,7 +252,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/dovecot_result.conf', 'r') as result_file:
result_text = result_file.read()
assert dovecot_original_object.get_document_text() == result_text
assert dovecot_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''auth_default_realm = domain.com
@ -306,4 +306,4 @@ namespace inbox {
document_2_object = DovecotFormat(document_2)
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -41,7 +41,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/json_result.json', 'r') as resultFile:
resultText = resultFile.read()
assert jsonOriginalObject.get_document_text() == resultText
assert jsonOriginalObject.document_text == resultText
def test_make_template(self):
document_1 = '''
@ -80,4 +80,4 @@ class TestParsingMethods:
document_2_object = JSONFormat(document_2)
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -219,7 +219,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/kde_result', 'r') as result_file:
result_text = result_file.read()
assert kde_original_object.get_document_text() == result_text
assert kde_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''[PlasmaViews][Panel 69][Horizontal 1024]
@ -257,4 +257,4 @@ Width 1920=747
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -109,7 +109,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/kernel_result', 'r') as result_file:
result_text = result_file.read()
assert kernel_original_object.get_document_text() == result_text
assert kernel_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''
@ -134,4 +134,4 @@ CONFIG_EXT3_FS_SECURITY=n
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -361,4 +361,4 @@ database bdb
with open('./tests/templates/format/testfiles/ldap_result.conf', 'r') as result_file:
result_text = result_file.read()
assert ldap_original_object.get_document_text() == result_text
assert ldap_original_object.document_text == result_text

@ -143,7 +143,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/openrc_result', 'r') as result_file:
result_text = result_file.read()
assert openrc_original_object.get_document_text() == result_text
assert openrc_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''rc_tty_number=12
@ -193,4 +193,4 @@ rc_send_sighup="YES"
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -121,7 +121,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/postfix_result', 'r') as result_file:
result_text = result_file.read()
assert postfix_original_object.get_document_text() == result_text
assert postfix_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''
@ -176,4 +176,4 @@ mail_owner = postfix
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -127,7 +127,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/procmail_result', 'r') as result_file:
result_text = result_file.read()
assert procmail_original_object.get_document_text() == result_text
assert procmail_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''# port for HTTP (descriptions, SOAP, media transfer) traffic
@ -159,4 +159,4 @@ net.ipv4.icmp_echo_ignore_broadcasts=1
template = document_1_object.make_template(document_2_object)
document_1_object.join_template(template)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -234,4 +234,4 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/proftpd_result.conf', 'r') as result_file:
result_text = result_file.read()
assert proftpd_original_object.get_document_text() == result_text
assert proftpd_original_object.document_text == result_text

@ -226,7 +226,7 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/samba_result', 'r') as result_file:
result_text = result_file.read()
assert samba_original_object.get_document_text() == result_text
assert samba_original_object.document_text == result_text
def test_make_template(self):
document_1 = '''
@ -255,4 +255,4 @@ class TestParsingMethods:
document_2_object = SambaFormat(document_2)
template_object = document_1_object.make_template(document_2_object)
document_1_object.join_template(template_object)
assert document_1_object.get_document_text() == document_2
assert document_1_object.document_text == document_2

@ -86,4 +86,4 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/xml_gconf_result.xml', 'r') as result_file:
result_text = result_file.read()
assert xml_gconf_original_object.get_document_text() == result_text
assert xml_gconf_original_object.document_text == result_text

@ -49,4 +49,4 @@ class TestParsingMethods:
with open('./tests/templates/format/testfiles/xml_xfce_result.xml', 'r') as result_file:
result_text = result_file.read()
assert xml_xfce_original_object.get_document_text() == result_text
assert xml_xfce_original_object.document_text == result_text

@ -1,10 +1,9 @@
import pytest
import os
from calculate.utils.package import Package, PackageAtomParser,\
PackageAtomError, Version
PackageAtomError, Version, PackageNotFound
from collections import OrderedDict
from calculate.utils.files import join_paths
import re
CHROOT_PATH = os.path.join(os.getcwd(), 'tests/utils/testfiles/')
@ -224,15 +223,14 @@ sym /etc/test_dir_2/symlink -> file_2.cfg 1587117567
'/var/db/pkg/dev-lang/python-3.6.9'))
def test_if_the_get_file_package_method_of_the_PackageAtom_object_is_called_with_a_name_of_a_file_that_belongs_to_any_package__the_PackageAtom_object_contains_dictionary_with_an_owner_package(self):
package_atom_regex = re.compile(r'dev-lang/python-3(\.\d+)+')
try:
file_package = package_atom.get_file_package('/usr/bin/python3.6')
assert package_atom_regex.search(file_package)
package_atom_name = package_atom.get_file_package('/usr/bin/python3.6')
assert package_atom_name.version > '2.7.16'
except Exception as error:
pytest.fail('Unexpected exception: {0}'.
format(str(error)))
def test_if_the_get_file_package_method_of_the_PackageAtom_object_is_called_with_a_name_of_a_file_that_does_not_belong_to_any_package__the_PackageAtom_object_throws_the_PackageAtomError_exception(self):
with pytest.raises(PackageAtomError):
with pytest.raises(PackageNotFound):
file_package = package_atom.get_file_package('/etc/shadow')
print('package = {}'.format(file_package))

Loading…
Cancel
Save