Extended syntax for the package parameter is available now. fixed #28

master
Иванов Денис 3 years ago
parent 9e885e3045
commit 5b76fdfcd9

@ -4,6 +4,7 @@ import os
import re
import glob
from collections import OrderedDict
from typing import Union
from .files import read_file, read_link, join_paths, FilesError
from pyparsing import (
Literal,
@ -149,9 +150,9 @@ class Version:
return version_value
def _compare_lists(self, lversion, rversion, filler=0):
'''Метод для сравнения двух списков, даже если если они не одинаковы.
Возвращает 0, если списки равны, 1 если lversion больше, -1 если
lversion меньше.'''
'''Метод для сравнения двух списков, даже если если они не одинаковы
по длине. Возвращает 0, если списки равны, 1 если lversion больше, -1
если lversion меньше.'''
if lversion == rversion:
return 0
@ -193,7 +194,7 @@ class Version:
return self._revision < other_version['revision']
def __le__(self, other):
def __le__(self, other: Version) -> bool:
'''Перегрузка x <= y.'''
other_version = self._get_version_value(other)
if other_version is None:
@ -212,7 +213,7 @@ class Version:
return cmp_res < 0
return self._revision <= other_version['revision']
def __eq__(self, other):
def __eq__(self, other: Version, ignore_revision: bool = False) -> bool:
'''Перегрузка x == y.'''
other_version = self._get_version_value(other)
if other_version is None:
@ -230,17 +231,21 @@ class Version:
filler=(4, 0))
if cmp_res != 0:
return False
return self._revision == other_version['revision']
def __ne__(self, other):
if ignore_revision:
return True
else:
return self._revision == other_version['revision']
def __ne__(self, other: Version) -> bool:
'''Перегрузка x != y.'''
return not self.__eq__(other)
def __gt__(self, other):
def __gt__(self, other: Version) -> bool:
'''Перегрузка x > y.'''
return not self.__le__(other)
def __ge__(self, other):
def __ge__(self, other: Version) -> bool:
'''Перегрузка x >= y.'''
return not self.__lt__(other)
@ -456,10 +461,11 @@ class PackageAtomParser:
package_name_pattern =\
fr'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-{_version_pattern})?'
atom_name_pattern = r'''(?P<category>[^\s/]*)/
atom_name_pattern = r'''(?P<condition>[=~><])?
(?P<category>[^\s/]*)/
{0}
(?P<slot>:[^\[\s]*)?
(?P<use_flags>\[\S*(?:,\S*)*\])?'''.format(
(?P<use_flags>\[\S*(?:\s+\S*)*\])?'''.format(
package_name_pattern)
atom_regex = re.compile(atom_name_pattern, re.VERBOSE)
@ -476,24 +482,28 @@ class PackageAtomParser:
self.pkg_path = pkg_path
self.package_atom = ''
self._atom_dictionary = {}
def parse_package_parameter(self, package_atom):
def parse_package_parameter(self, package_atom: Union[str, dict]
) -> PackageAtomName:
'''Метод для разбора значения package, после разбора инициирует
проверку полученных значений. Возвращает объект PackageAtomName.'''
self.package_atom = package_atom
if isinstance(package_atom, str):
self._atom_dictionary = self.parse_atom_name(package_atom)
atom_dictionary = self.parse_atom_name(package_atom)
atom_dictionary['package_atom'] = package_atom
elif isinstance(package_atom, dict):
self._atom_dictionary = package_atom
atom_dictionary = package_atom
if 'package_atom' not in atom_dictionary:
atom_dictionary['package_atom'] = (
f'{atom_dictionary["category"]}{atom_dictionary["name"]}')
self._check_package_existance()
atom_dictionary = self._check_package_existance(atom_dictionary)
atom_name_object = PackageAtomName(atom_dictionary)
atom_name_object = PackageAtomName(self._atom_dictionary)
self._atom_dictionary.clear()
return atom_name_object
def is_package_exists(self, package_atom):
def is_package_exists(self, package_atom: Union[str, dict]) -> bool:
try:
self.parse_package_parameter(package_atom)
return True
@ -502,138 +512,187 @@ class PackageAtomParser:
return False
raise
def _check_package_existance(self, package_atom=''):
def _check_package_existance(self, atom_dictionary: dict) -> dict:
'''Метод для проверки существования пакета. Существование пакета
определяется наличием соответствующего CONTENTS файла.'''
if package_atom:
self.parse_package_parameter(package_atom)
return True
# Используем glob-паттерн для поиска.
if atom_dictionary['version'] is not None:
full_name = atom_dictionary['name'] + '-' +\
str(atom_dictionary['version'])
else:
# Используем glob-паттерн для поиска.
if self._atom_dictionary['version'] is not None:
full_name = self._atom_dictionary['name'] + '-' +\
self._atom_dictionary['version']._string
else:
full_name = self._atom_dictionary['name']
if self._atom_dictionary['version'] is None:
glob_result = glob.glob(
r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
self.pkg_path,
self._atom_dictionary['category'],
full_name))
else:
glob_result = glob.glob(
r'{0}/{1}/{2}*/CONTENTS'.format(
self.pkg_path,
self._atom_dictionary['category'],
full_name))
if not glob_result:
# Если ничего не нашлось.
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
full_name = atom_dictionary['name']
# if (atom_dictionary['version'] is None or
# (atom_dictionary['condition'] is not None and
# atom_dictionary['condition'] in {'~', '>', '<'})):
# glob_result = glob.glob(
# r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
# self.pkg_path,
# atom_dictionary['category'],
# full_name))
# else:
# glob_result = glob.glob(
# r'{0}/{1}/{2}*/CONTENTS'.format(
# self.pkg_path,
# atom_dictionary['category'],
# full_name))
glob_result = glob.glob(r'{0}/{1}/{2}*/CONTENTS'.format(
self.pkg_path,
atom_dictionary['category'],
atom_dictionary['name']))
if not glob_result:
# Если ничего не нашлось.
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(glob_result) == 1:
# Если нашелся один пакет.
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
atom_dictionary['version'] = pkg_version
atom_dictionary['pkg_path'] = pkg_path
else:
packages = dict()
# Если подходящих пакетов много -- проверяем по use-флагам,
# слотам и версии, если таковые заданы.
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
self._check_slot_value(pkg_path, atom_dictionary)
self._check_use_flags_value(pkg_path, atom_dictionary)
if len(glob_result) == 1:
# Если нашелся один пакет.
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path)
self._check_use_flags_value(pkg_path)
parsed_name = self._atom_dictionary['name']
full_name = os.path.basename(pkg_path)
self._atom_dictionary['version'] = Version(
full_name[len(parsed_name):])
self._atom_dictionary['pkg_path'] = pkg_path
atom_name = atom_dictionary['name']
real_name = os.path.basename(pkg_path)
pkg_version = Version(real_name[len(atom_name):])
if atom_dictionary['condition'] is not None:
self._check_version(atom_dictionary, pkg_version)
packages[pkg_path] = pkg_version
except PackageAtomError:
continue
if not packages:
# Если после проверки отсеялись все кандидаты.
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
if len(packages) == 1:
# Если был найден только один кандидат -- выдаем его.
pkg_path = next(iter(packages.keys()))
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
else:
packages = dict()
# Если подходящих пакетов много -- проверяем по use-флагам,
# слотам и версии, если таковые заданы.
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
self._check_slot_value(pkg_path)
self._check_use_flags_value(pkg_path)
parsed_name = self._atom_dictionary['name']
full_name = os.path.basename(pkg_path)
packages[pkg_path] = Version(
full_name[len(parsed_name):])
except PackageAtomError:
continue
# Если подходящих пакетов много -- берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
atom_dictionary['pkg_path'] = pkg_path
atom_dictionary['version'] = packages[pkg_path]
if not packages:
# Если после проверки отсеялись все кандидаты.
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
return atom_dictionary
if len(packages) == 1:
# Если был найден только один кандидат -- выдаем его.
pkg_path = next(iter(packages.keys()))
self._atom_dictionary['pkg_path'] = pkg_path
self._atom_dictionary['version'] = packages[pkg_path]
else:
# Если подходящих пакетов много -- берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
self._atom_dictionary['pkg_path'] = pkg_path
self._atom_dictionary['version'] = packages[pkg_path]
def _check_slot_value(self, pkg_path):
def _check_slot_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученного из параметра package значения slot.
'''
if self._atom_dictionary['slot']:
slot = self._get_slot_value(pkg_path)
if atom_dictionary['slot']:
slot = self._get_slot_value(pkg_path,
atom_dictionary['package_atom'])
if slot != self._atom_dictionary['slot']:
if slot != atom_dictionary['slot']:
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _check_use_flags_value(self, pkg_path):
def _check_use_flags_value(self, pkg_path: str,
atom_dictionary: dict) -> None:
'''Метод для проверки полученных из параметра package значений
use-флагов.'''
if self._atom_dictionary['use_flags']:
use_flags = self._get_use_flags_value(pkg_path)
for use_flag in self._atom_dictionary['use_flags']:
if use_flag not in use_flags:
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
if atom_dictionary['use_flags']:
use_flags = self._get_use_flags_value(
pkg_path,
atom_dictionary['package_atom'])
for use_flag in atom_dictionary['use_flags']:
if use_flag.startswith('-'):
if use_flag.strip()[1:] not in use_flags:
continue
elif use_flag in use_flags:
continue
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def _get_slot_value(self, pkg_path):
def _get_slot_value(self, pkg_path: str, package_atom: str) -> str:
'''Метод для получения значения slot из файла SLOT.'''
slot_path = os.path.join(pkg_path, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(self.package_atom))
" 'package': {}".format(package_atom))
def _get_use_flags_value(self, pkg_path):
def _get_use_flags_value(self, pkg_path: str, package_atom: str) -> list:
'''Метод для получения списка значений use-флагов из файла USE.'''
use_path = os.path.join(pkg_path, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags for 'package'"
" parameter: {}".format(self.package_atom))
" parameter: {}".format(package_atom))
def _get_category_packages(self, category):
def _get_category_packages(self, category: str) -> str:
'''Генератор имен категорий, имеющихся в /var/db/pkg'''
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
category)):
yield path
def get_file_package(self, file_path):
def _check_version(self, atom_dictionary: dict, pkg_version: Version):
condition = atom_dictionary['condition']
if condition == '=':
condition_result = (atom_dictionary['version']
== pkg_version)
elif condition == '~':
condition_result = atom_dictionary['version'].__eq__(
pkg_version,
ignore_revision=True)
elif condition == '>':
condition_result = atom_dictionary['version'] < pkg_version
elif condition == '<':
condition_result = atom_dictionary['version'] > pkg_version
else:
condition_result = False
if not condition_result:
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
atom_dictionary['package_atom']),
errno=NOTEXIST)
def get_file_package(self, file_path: str) -> PackageAtomName:
'''Метод для определения пакета, которому принадлежит файл.'''
# Удаляем часть пути соответствующую chroot_path
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
@ -684,6 +743,15 @@ class PackageAtomParser:
else:
version = None
if parsing_result['condition'] is not None:
if parsing_result['version'] is None:
raise PackageAtomError("'package' parameter value"
f" '{atom_name}' is not correct."
" Version value is missed",
errno=NOTCORRECT)
elif parsing_result['version'] is not None:
parsing_result['condition'] = '='
if (parsing_result['slot'] is not None
and parsing_result['slot'] != ':'):
slot = parsing_result['slot'].strip(':')
@ -693,7 +761,7 @@ class PackageAtomParser:
if parsing_result['use_flags'] is not None:
use_flags = [use.strip() for use in
parsing_result['use_flags'].strip().
strip('[]').split(',')]
strip('[]').split(' ')]
else:
use_flags = None
@ -702,7 +770,8 @@ class PackageAtomParser:
'version': version,
'slot': slot,
'use_flags': use_flags,
'with_slot': slot is not None}
'with_slot': slot is not None,
'condition': parsing_result['condition']}
return atom_dict

@ -42,6 +42,16 @@ class TestContents:
version_2 = Version('3.5.6-r1')
assert version_1 > version_2
def test_if_two_Version_objects_compared_using_gt_operation_one_of_the_versions_have_revision_value_but_both_versions_have_the_same_version_number__the_result_of_the_comparing_would_be_True(self):
version_1 = Version('3.5.6')
version_2 = Version('3.5.6-r1')
assert version_1 < version_2
def test_if_two_Version_objects_compared_using_eq_method_with_ignore_revision_flag_one_of_the_versions_have_revision_value_but_both_versions_have_the_same_version_number__the_result_of_the_comparing_would_be_True(self):
version_1 = Version('3.5.6')
version_2 = Version('3.5.6-r1')
assert version_1.__eq__(version_2, ignore_revision=True)
def test_if_two_Version_objects_compared_using_gt_operation_both_of_them_have_p_suffix_and_the_right_version_value_is_less_than_left_version__the_result_of_the_comparing_would_be_True(self):
version_1 = Version('-2.5.6_p2-r1')
version_2 = Version('2.5.6_p1-r1')
@ -123,12 +133,12 @@ dir /etc/test_dir_1
OrderedDict({'type': 'sym',
'target': 'vim',
'mtime': '1573538053'})})
contents_object = Package('category-1/package-name-1.0',
contents_object = Package('test-category/test-package-1.0',
chroot_path=CHROOT_PATH)
assert contents_object.contents_dictionary == result
def test_if_PackageContents_object_contains_contents_dictionary__it_renders_CONTENTS_file_correctly(self):
contents_object = Package('category-1/package-name-1.0',
contents_object = Package('test-category/test-package-1.0',
chroot_path=CHROOT_PATH)
result = '''dir /usr
dir /usr/bin
@ -140,7 +150,7 @@ sym /usr/bin/vimdiff -> vim 1573538053
assert contents_object.render_contents_file() == result
def test_if_new_directory_is_added_in_contents_file_using_add_dir_method__the_PackageContents_object_renders_the_contents_file_with_new_dir(self):
contents_object = Package('category-1/package-name-1.0',
contents_object = Package('test-category/test-package-1.0',
chroot_path=CHROOT_PATH)
result = '''dir /usr
dir /usr/bin
@ -155,7 +165,7 @@ dir /etc/test_dir_1
assert contents_object.render_contents_file() == result
def test_if_new_object_is_added_in_contents_file_using_add_obj_method__the_PackageContents_object_renders_the_contents_file_with_new_obj(self):
contents_object = Package('category-1/package-name-1.0',
contents_object = Package('test-category/test-package-1.0',
chroot_path=CHROOT_PATH)
result = '''dir /usr
dir /usr/bin
@ -175,7 +185,7 @@ obj /etc/test_dir_2/file_2.cfg a371f4d456d471ac0ed0e8befff1cb6d {}
assert contents_object.render_contents_file() == result
def test_if_new_link_is_added_in_contents_file_using_add_sym_method__the_PackageContents_object_renders_the_contents_file_with_new_sym(self):
contents_object = Package('category-1/package-name-1.0',
contents_object = Package('test-category/test-package-1.0',
chroot_path=CHROOT_PATH)
result = '''dir /usr
dir /usr/bin
@ -207,7 +217,7 @@ sym /etc/test_dir_2/symlink -> file_2.cfg {}
def test_if_the_PackageAtom_object_parsed_a_correct_package_atom_name_with_a_slot_value__the_PackageAtom_returns_atom_name_of_package_with_this_slot(self):
atom_name = package_atom.parse_package_parameter(
'dev-lang/python-3.6:3.6/3.6m')
'dev-lang/python-3.6.9:3.6/3.6m')
assert (atom_name.category == 'dev-lang' and
atom_name.fullname == 'python-3.6.9' and
@ -219,7 +229,7 @@ sym /etc/test_dir_2/symlink -> file_2.cfg {}
def test_if_the_PackageAtom_object_parsed_a_correct_package_atom_name_with_an_empty_slot_value__the_PackageAtom_object_returns_atom_name_of_package(self):
atom_name = package_atom.parse_package_parameter(
'dev-lang/python-3.6:')
'dev-lang/python-3.6.9:')
assert (atom_name.category == 'dev-lang' and
atom_name.fullname == 'python-3.6.9' and
atom_name.version == Version('3.6.9') and
@ -229,7 +239,7 @@ sym /etc/test_dir_2/symlink -> file_2.cfg {}
def test_if_the_PackageAtom_object_parsed_a_correct_package_atom_name_with_a_uses_value__the_PackageAtom_object_returns_atom_name_of_package_with_this_use_flags(self):
atom_name = package_atom.parse_package_parameter(
'dev-lang/python-3.6[abi_x86_64,ssl]')
'dev-lang/python-3.6.9[abi_x86_64 ssl]')
assert (atom_name.category == 'dev-lang' and
atom_name.fullname == 'python-3.6.9' and
@ -254,7 +264,7 @@ sym /etc/test_dir_2/symlink -> file_2.cfg {}
def test_if_the_PackageAtom_object_parses_a_correct_package_atom_name_without_version_value_but_with_use_flags_value__the_PackageAtom_object_looks_for_package_with_assigned_use_flags(self):
atom_name = package_atom.parse_package_parameter(
'dev-lang/python[specific_flag]')
'dev-lang/python[specific_flag -strange_flag]')
assert (atom_name.category == 'dev-lang' and
atom_name.fullname == 'python-3.6.9' and
@ -296,3 +306,68 @@ sym /etc/test_dir_2/symlink -> file_2.cfg {}
with pytest.raises(PackageNotFound):
file_package = package_atom.get_file_package('/etc/shadow')
print('package = {}'.format(file_package))
def test_package_equal_versions(self):
package_atom.parse_package_parameter('=test-category/test-package-1.0')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'=test-category/test-package-1.2')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'=test-category/test-package-1.0-r1')
def test_package_check_versions_ignoring_revisions(self):
package_atom.parse_package_parameter('~test-category/test-package-1.0')
package_atom.parse_package_parameter(
'~test-category/test-package-1.0-r1')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'~test-category/test-package-1.2')
def test_package_check_versions_using_greater(self):
package_atom.parse_package_parameter('>test-category/test-package-0.9')
result = package_atom.parse_package_parameter(
'>test-category/test-package-1.1')
assert result.version == '1.1-r1'
package_atom.parse_package_parameter('>test-category/test-package-0.9')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'>test-category/test-package-1.2')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'>test-category/test-package-1.1-r2')
def test_package_check_versions_using_less(self):
package_atom.parse_package_parameter('<test-category/test-package-1.3')
result = package_atom.parse_package_parameter(
'<test-category/test-package-1.1-r2')
assert result.version == '1.1-r1'
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'<test-category/test-package-0.2-r2')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'<test-category/test-package-0.2-r2')
def test_package_check_slot_condition(self):
package_atom.parse_package_parameter('test-category/test-package:0')
package_atom.parse_package_parameter('test-category/test-package:1')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'test-category/test-package:2')
def test_package_check_use_flags_condition(self):
package_atom.parse_package_parameter(
'test-category/test-package-1.0[use_1 -use_3]')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'test-category/test-package-1.0[use_1 -use_2]')
with pytest.raises(PackageAtomError):
package_atom.parse_package_parameter(
'test-category/test-package-1.0[use_3]')

@ -0,0 +1,6 @@
dir /usr
dir /usr/bin
sym /usr/bin/rview -> vim 1573538053
sym /usr/bin/rvim -> vim 1573538053
obj /usr/bin/vim 30acc0f256e11c1ecdb1bd80b688d238 1573538056
sym /usr/bin/vimdiff -> vim 1573538053
Loading…
Cancel
Save