You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

640 lines
24 KiB

# vim: fileencoding=utf-8
#
import os
import re
import glob
from collections import OrderedDict
from ..templates.format.contents_format import ContentsFormat
from .files import read_file, read_link, join_paths, FilesError
import hashlib
import operator
class PackageError(Exception):
pass
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
class PackageAtomError(Exception):
def __init__(self, message='Package atom error', errno=DEFAULT):
self.message = message
self.errno = errno
class VersionError(Exception):
pass
class Version:
'''Временный класс для работы со значениями версий.'''
def __init__(self, version_value=None):
if version_value is None:
self._version_string = '-1'
self._version_value = [-1]
elif isinstance(version_value, Version):
self._version_string = version_value._version_string
self._version_value = version_value._version_value
else:
value = self._get_version_value(version_value)
if not value:
raise VersionError(
"Can't initialize Version object using '{0}'"
" value with type {1}".format(version_value,
type(version_value)))
if isinstance(version_value, str):
self._version_string = version_value.strip('-')
else:
self._version_string = str(version_value)
self._version_value = value
def _get_version_value(self, version):
if isinstance(version, Version):
return version._version_value
elif isinstance(version, int):
version_value = [str(version)]
elif isinstance(version, float):
version_value = []
version = str(version).split('.')
for version_part in version:
version_value.append(int(version_part.strip()))
elif isinstance(version, str):
version = version.strip('-')
version_value = []
if '-' in version:
version = version.split('-')[0]
if '_' in version:
version = version.split('_')[0]
for version_part in version.split('.'):
if version_part.isdigit():
version_part = int(version_part)
version_value.append(version_part)
else:
return False
else:
return False
return version_value
def _use_compare_operation(self, compare_operator, other_value):
version_value = self._version_value[:]
other_value_length = len(other_value)
version_value_length = len(version_value)
if other_value_length < version_value_length:
for counter in range(version_value_length - other_value_length):
other_value.append(0)
elif version_value_length < other_value_length:
for counter in range(other_value_length - version_value_length):
version_value.append(0)
for lvalue, rvalue in zip(version_value, other_value):
if lvalue == rvalue:
continue
if compare_operator(lvalue, rvalue):
return True
else:
return False
else:
if (compare_operator != operator.lt and
compare_operator != operator.gt and
compare_operator != operator.ne):
return True
else:
return False
def __lt__(self, other):
'''Перегрузка x < y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.lt, other_value)
def __le__(self, other):
'''Перегрузка x <= y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.le, other_value)
def __eq__(self, other):
'''Перегрузка x == y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.eq, other_value)
def __ne__(self, other):
'''Перегрузка x != y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.ne, other_value)
def __gt__(self, other):
'''Перегрузка x > y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.gt, other_value)
def __ge__(self, other):
'''Перегрузка x >= y.'''
other_value = self._get_version_value(other)
if not other_value:
raise VersionError(
"Unable to compare Version object with the '{0}'"
" value of '{1}' type".format(other, type(other)))
return self._use_compare_operation(operator.ge, other_value)
def __hash__(self):
return hash(self._version_string)
def __repr__(self):
return '<Version: {}>'.format(self._version_string)
def __str__(self):
return self._version_string
def __bool__(self):
if self._version_value == [-1]:
return False
else:
return True
class PackageAtomName:
def __init__(self, atom_dictionary):
self._package_directory = atom_dictionary['pkg_path']
self._version = atom_dictionary['version']
@property
def name(self):
return os.path.basename(self._package_directory)
@property
def category(self):
return os.path.basename(os.path.dirname(self._package_directory))
@property
def version(self):
return self._version
@property
def use_flags(self):
use_path = os.path.join(self._package_directory, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags for 'package'"
" parameter: {}".format(self.package_atom))
@property
def pkg_path(self):
return self._package_directory
@property
def slot(self):
slot_path = os.path.join(self._package_directory, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(self.package_atom))
def __eq__(self, other):
if isinstance(other, PackageAtomName):
return (self._package_directory ==
other._package_directory)
else:
return False
def __ne__(self, other):
if isinstance(other, PackageAtomName):
return (self._package_directory !=
other._package_directory)
else:
return False
def __bool__(self):
return bool(self._package_directory)
def __repr__(self):
return '<PackageAtomName: {}/{}>'.format(self.category,
self.name)
def __hash__(self):
return hash(self._package_directory)
class PackageAtomParser:
atom_regex = re.compile(r'''(?P<category>[^\s/]*)/
(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)
(?P<version>-\d[^\s:]*)?
(?P<slot>:[^\s\[]*)?\s*
(?P<use_flags>\[\S*(?:\s+\S*)*\])?''',
re.VERBOSE)
def __init__(self, pkg_path='/var/db/pkg',
chroot_path='/'):
self.chroot_path = chroot_path
if chroot_path != '/':
self.pkg_path = join_paths(chroot_path, pkg_path)
else:
self.pkg_path = pkg_path
self.package_atom = ''
self._atom_dictionary = {}
def parse_package_parameter(self, package_atom):
self.package_atom = package_atom
self._atom_dictionary = {}
parsing_result = self.atom_regex.search(package_atom)
if (not parsing_result or parsing_result.string != package_atom or
not parsing_result.groupdict()['category'] or
not parsing_result.groupdict()['name']):
raise PackageAtomError("'package' parameter value '{}' is not"
" correct".format(package_atom),
errno=NOTCORRECT)
self._atom_dictionary['category'] = parsing_result.groupdict(
)['category']
self._atom_dictionary['name'] = parsing_result.groupdict()['name']
if parsing_result.groupdict()['version']:
version_value = parsing_result.groupdict()['version'].strip('-')
self._atom_dictionary['version'] = Version(version_value)
if (parsing_result.groupdict()['slot'] and
parsing_result.groupdict()['slot'] != ':'):
self._atom_dictionary['slot'] = parsing_result.groupdict(
)['slot'][1:]
if parsing_result.groupdict()['use_flags']:
self._atom_dictionary['use_flags'] = []
use_flags = parsing_result.groupdict()['use_flags'].strip().\
rstrip(']').lstrip('[')
for use_flag in use_flags.split():
self._atom_dictionary['use_flags'].append(use_flag.strip())
self._check_package_existance()
atom_name_object = PackageAtomName(self._atom_dictionary)
self._atom_dictionary.clear()
return atom_name_object
def _check_package_existance(self, package_atom=''):
if package_atom:
self.parse_package_parameter(package_atom)
return True
else:
if 'version' in self._atom_dictionary:
full_name = self._atom_dictionary['name'] + '-' +\
self._atom_dictionary['version']._version_string
else:
full_name = self._atom_dictionary['name']
if 'version' not in self._atom_dictionary:
glob_result = glob.glob(
r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
self.pkg_path,
self._atom_dictionary['category'],
full_name))
else:
glob_result = glob.glob(
r'{0}/{1}/{2}*/CONTENTS'.format(
self.pkg_path,
self._atom_dictionary['category'],
full_name))
if not glob_result:
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
if len(glob_result) == 1:
pkg_path = os.path.dirname(next(iter(glob_result)))
self._check_slot_value(pkg_path)
self._check_use_flags_value(pkg_path)
parsed_name = self._atom_dictionary['name']
full_name = os.path.basename(pkg_path)
self._atom_dictionary['version'] = Version(
full_name[len(parsed_name):])
self._atom_dictionary['pkg_path'] = pkg_path
else:
packages = dict()
for contents_path in glob_result:
pkg_path = os.path.dirname(contents_path)
try:
self._check_slot_value(pkg_path)
self._check_use_flags_value(pkg_path)
parsed_name = self._atom_dictionary['name']
full_name = os.path.basename(pkg_path)
packages[pkg_path] = Version(
full_name[len(parsed_name):])
except PackageAtomError:
continue
if not packages:
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
if len(packages) == 1:
pkg_path = next(iter(packages.keys()))
self._atom_dictionary['pkg_path'] = pkg_path
self._atom_dictionary['version'] = packages[pkg_path]
else:
# Берем старшую версию.
pkg_path = sorted(packages.keys(),
key=lambda path: packages[path])[-1]
self._atom_dictionary['pkg_path'] = pkg_path
self._atom_dictionary['version'] = packages[pkg_path]
def _check_slot_value(self, pkg_path):
if 'slot' in self._atom_dictionary:
slot = self._get_slot_value(pkg_path)
if slot != self._atom_dictionary['slot']:
raise PackageAtomError("Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
def _check_use_flags_value(self, pkg_path):
if 'use_flags' in self._atom_dictionary:
use_flags = self._get_use_flags_value(pkg_path)
for use_flag in self._atom_dictionary['use_flags']:
if use_flag not in use_flags:
raise PackageAtomError(
"Package from 'package' parameter value"
" '{}' does not exist".format(
self.package_atom),
errno=NOTEXIST)
def _get_slot_value(self, pkg_path):
slot_path = os.path.join(pkg_path, 'SLOT')
try:
return read_file(slot_path).strip('\n')
except FilesError:
raise PackageAtomError("could not read slot value for"
" 'package': {}".format(self.package_atom))
def _get_use_flags_value(self, pkg_path):
use_path = os.path.join(pkg_path, 'USE')
try:
return read_file(use_path).strip('\n').split(' ')
except FilesError:
raise PackageAtomError("could not read use flags for 'package'"
" parameter: {}".format(self.package_atom))
def _get_category_packages(self, category):
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
category)):
yield path
def get_file_package(self, file_path, with_slot=False, with_uses=False):
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
file_path = file_path[len(self.chroot_path):]
for category in os.listdir(self.pkg_path):
for contents_path in self._get_category_packages(category):
try:
with open(contents_path, 'r') as contents_file:
for file_line in contents_file.readlines():
contents_name = file_line.split(' ')[1].strip()
if contents_name == file_path:
package = '/'.join(os.path.dirname(
contents_path).split('/')[-2:])
return package
except (OSError, IOError):
continue
else:
raise PackageAtomError("The file does not belong to any package")
@property
def atom_dictionary(self):
return self._atom_dictionary
@property
def atom_name(self):
return PackageAtomName(self._atom_dictionary)
@atom_dictionary.setter
def set_atom_dictionary(self, atom_dictionary):
self._atom_dictionary = atom_dictionary
@property
def category(self):
if 'category' in self._atom_dictionary:
return self._atom_dictionary['category']
else:
return False
@property
def name(self):
if 'name' in self._atom_dictionary:
return self._atom_dictionary['name']
else:
return False
@property
def slot(self):
if 'slot' in self._atom_dictionary:
return self._atom_dictionary['slot']
else:
if 'contents' in self._atom_dictionary:
return self._get_slot_value(self._atom_dictionary['contents'])
return False
@property
def use_flags(self):
if 'use_flags' in self._atom_dictionary:
return self._atom_dictionary['use_flags']
else:
if 'contents' in self._atom_dictionary:
return self._get_use_flags_value(
self._atom_dictionary['contents'])
else:
return False
class Package:
re_cfg = re.compile(r'/\._cfg\d{4}_')
def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'):
self.chroot_path = chroot_path
self.contents_file_path = self._get_contents_path(package_atom)
if (chroot_path != '/' and
not self.contents_file_path.startswith(chroot_path)):
self.contents_file_path = join_paths(chroot_path,
self.contents_file_path)
if not os.path.exists(self.contents_file_path):
raise PackageError("Can not find CONTENTS file in path: {}".format(
self.contents_file_path
))
self.contents_dictionary = OrderedDict()
self.read_contents_file()
def _get_contents_path(self, package_atom):
if isinstance(package_atom, str):
package_atom_parser = PackageAtomParser(
chroot_path=self.chroot_path)
atom_name = package_atom_parser.parse_package_parameter(
package_atom)
return os.path.join(atom_name.pkg_path,
'CONTENTS')
if isinstance(package_atom, PackageAtomName):
return os.path.join(package_atom.pkg_path,
'CONTENTS')
else:
raise PackageError(
"Incorrect 'pakage_atom' value: '{}', type: '{}''".
format(package_atom, type(package_atom)))
def remove_cfg_prefix(self, file_name):
return self.re_cfg.sub('/', file_name)
def remove_chroot_path(self, file_name):
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
return file_name[len(self.chroot_path):]
else:
return file_name
def read_contents_file(self):
try:
contents_text = read_file(self.contents_file_path)
except FilesError as error:
raise PackageError(str(error))
if contents_text:
contents_format = ContentsFormat(contents_text,
template_parser=False)
self.contents_dictionary = contents_format._document_dictionary
return True
else:
return False
def write_contents_file(self):
pass
def render_contents_file(self):
contents_format = ContentsFormat('', template_parser=False)
contents_format._document_dictionary = self.contents_dictionary
return contents_format.get_document_text()
def add_dir(self, file_name):
file_name = self.remove_cfg_prefix(file_name)
file_name = self.remove_chroot_path(file_name)
if (file_name != '/' and
(file_name not in self.contents_dictionary
or self.contents_dictionary[file_name]['type'] != 'dir')):
self.add_dir(os.path.dirname(file_name))
contents_item = OrderedDict({'type': 'dir'})
self.contents_dictionary[file_name] = contents_item
def add_sym(self, file_name):
file_name = self.remove_cfg_prefix(file_name)
real_path = file_name
file_name = self.remove_chroot_path(file_name)
if real_path == file_name:
real_path = join_paths(self.chroot_path, file_name)
self.add_dir(os.path.dirname(file_name))
mtime = str(int(os.lstat(real_path).st_mtime))
try:
contents_item = OrderedDict({'type': 'sym',
'target': read_link(real_path),
'mtime': mtime})
except FilesError as error:
raise PackageError(str(error))
self.contents_dictionary[file_name] = contents_item
def add_obj(self, file_name):
file_name = self.remove_cfg_prefix(file_name)
real_path = file_name
file_name = self.remove_chroot_path(file_name)
if real_path == file_name:
real_path = join_paths(self.chroot_path, file_name)
self.add_dir(os.path.dirname(file_name))
try:
file_text = read_file(real_path).encode()
except FilesError as error:
raise PackageError(str(error))
contents_item = OrderedDict({'type': 'obj',
'md5': hashlib.md5(file_text).hexdigest(),
'mtime':
str(int(os.lstat(real_path).st_mtime))})
self.contents_dictionary[file_name] = contents_item
def add_file(self, file_name):
if file_name != '/':
real_path = file_name
file_name = self.remove_chroot_path(file_name)
if real_path != file_name:
real_path = join_paths(self.chroot_path, file_name)
if os.path.isdir(real_path):
self.add_dir(file_name)
elif os.path.islink(real_path):
self.add_link(file_name)
elif os.path.isfile(real_path):
self.add_obj(file_name)
def modify_contents_item(self, file_name, item_value):
pass
def remove_file(self, file_name):
pass
def remove_empty_directories(self):
pass
def check_file_md5(self, file_path):
pass