|
|
# vim: fileencoding=utf-8
|
|
|
#
|
|
|
import os
|
|
|
import re
|
|
|
import glob
|
|
|
from collections import OrderedDict
|
|
|
from ..templates.format.contents_format import ContentsFormat
|
|
|
from .files import read_file, read_link, join_paths, FilesError
|
|
|
import hashlib
|
|
|
import operator
|
|
|
|
|
|
|
|
|
class PackageError(Exception):
|
|
|
'''Исключение выбрасываемое при ошибках в объектах Package, работающих
|
|
|
с CONTENTS-файлами пакетов.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
# Коды ошибок ATOM-парсера.
|
|
|
DEFAULT, NOTEXIST, NOTCORRECT = range(3)
|
|
|
|
|
|
|
|
|
class PackageAtomError(Exception):
|
|
|
'''Исключение выбрасываемое при ошибках разбора ATOM-названий.'''
|
|
|
def __init__(self, message='Package atom error', errno=DEFAULT):
|
|
|
self.message = message
|
|
|
self.errno = errno
|
|
|
|
|
|
|
|
|
class VersionError(Exception):
|
|
|
'''Исключение выбрасываемое объектами Version.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
class PackageNotFound(Exception):
|
|
|
'''Специальное исключение выбрасываемое, если не удалось найти пакет, к
|
|
|
которому принадлежит файл.'''
|
|
|
pass
|
|
|
|
|
|
|
|
|
class Version:
|
|
|
'''Класс для работы со значениями версий.'''
|
|
|
def __init__(self, version_value=None):
|
|
|
if version_value is None:
|
|
|
self._version_string = '-1'
|
|
|
self._version_value = [-1]
|
|
|
elif isinstance(version_value, Version):
|
|
|
self._version_string = version_value._version_string
|
|
|
self._version_value = version_value._version_value
|
|
|
else:
|
|
|
value = self._get_version_value(version_value)
|
|
|
if not value:
|
|
|
raise VersionError(
|
|
|
"Can't initialize Version object using '{0}'"
|
|
|
" value with type {1}".format(version_value,
|
|
|
type(version_value)))
|
|
|
if isinstance(version_value, str):
|
|
|
self._version_string = version_value.strip('-')
|
|
|
else:
|
|
|
self._version_string = str(version_value)
|
|
|
|
|
|
self._version_value = value
|
|
|
|
|
|
def _get_version_value(self, version):
|
|
|
'''Вспомогательный метод для получения значения версии, представленного
|
|
|
в виде списка.'''
|
|
|
if isinstance(version, Version):
|
|
|
return version._version_value
|
|
|
|
|
|
elif isinstance(version, int):
|
|
|
version_value = [str(version)]
|
|
|
|
|
|
elif isinstance(version, float):
|
|
|
version_value = []
|
|
|
version = str(version).split('.')
|
|
|
for version_part in version:
|
|
|
version_value.append(int(version_part.strip()))
|
|
|
|
|
|
elif isinstance(version, str):
|
|
|
version = version.strip('-')
|
|
|
version_value = []
|
|
|
|
|
|
if '-' in version:
|
|
|
version = version.split('-')[0]
|
|
|
if '_' in version:
|
|
|
version = version.split('_')[0]
|
|
|
|
|
|
for version_part in version.split('.'):
|
|
|
if version_part.isdigit():
|
|
|
version_part = int(version_part)
|
|
|
version_value.append(version_part)
|
|
|
else:
|
|
|
return False
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
return version_value
|
|
|
|
|
|
def _use_compare_operation(self, compare_operator, other_value):
|
|
|
'''Вспомогательный метод для реализации различных операций сравнения.
|
|
|
'''
|
|
|
version_value = self._version_value[:]
|
|
|
|
|
|
other_value_length = len(other_value)
|
|
|
version_value_length = len(version_value)
|
|
|
|
|
|
if other_value_length < version_value_length:
|
|
|
for counter in range(version_value_length - other_value_length):
|
|
|
other_value.append(0)
|
|
|
elif version_value_length < other_value_length:
|
|
|
for counter in range(other_value_length - version_value_length):
|
|
|
version_value.append(0)
|
|
|
|
|
|
for lvalue, rvalue in zip(version_value, other_value):
|
|
|
if lvalue == rvalue:
|
|
|
continue
|
|
|
|
|
|
if compare_operator(lvalue, rvalue):
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
else:
|
|
|
if (compare_operator != operator.lt and
|
|
|
compare_operator != operator.gt and
|
|
|
compare_operator != operator.ne):
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
'''Перегрузка x < y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.lt, other_value)
|
|
|
|
|
|
def __le__(self, other):
|
|
|
'''Перегрузка x <= y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.le, other_value)
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
'''Перегрузка x == y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.eq, other_value)
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
'''Перегрузка x != y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.ne, other_value)
|
|
|
|
|
|
def __gt__(self, other):
|
|
|
'''Перегрузка x > y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.gt, other_value)
|
|
|
|
|
|
def __ge__(self, other):
|
|
|
'''Перегрузка x >= y.'''
|
|
|
other_value = self._get_version_value(other)
|
|
|
if not other_value:
|
|
|
raise VersionError(
|
|
|
"Unable to compare Version object with the '{0}'"
|
|
|
" value of '{1}' type".format(other, type(other)))
|
|
|
|
|
|
return self._use_compare_operation(operator.ge, other_value)
|
|
|
|
|
|
def __hash__(self):
|
|
|
return hash(self._version_string)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Version: {}>'.format(self._version_string)
|
|
|
|
|
|
def __str__(self):
|
|
|
return self._version_string
|
|
|
|
|
|
def __bool__(self):
|
|
|
if self._version_value == [-1]:
|
|
|
return False
|
|
|
else:
|
|
|
return True
|
|
|
|
|
|
|
|
|
class PackageAtomName:
|
|
|
'''Класс для хранения результата определения пакета. Для определения пакета
|
|
|
использует путь к его pkg директории.'''
|
|
|
def __init__(self, atom_dictionary):
|
|
|
self._package_directory = atom_dictionary['pkg_path']
|
|
|
self._version = atom_dictionary['version']
|
|
|
|
|
|
@property
|
|
|
def name(self):
|
|
|
return os.path.basename(self._package_directory)
|
|
|
|
|
|
@property
|
|
|
def category(self):
|
|
|
return os.path.basename(os.path.dirname(self._package_directory))
|
|
|
|
|
|
@property
|
|
|
def atom(self):
|
|
|
return "{}/{}".format(self.category, self.name)
|
|
|
|
|
|
@property
|
|
|
def version(self):
|
|
|
return self._version
|
|
|
|
|
|
@property
|
|
|
def use_flags(self):
|
|
|
use_path = os.path.join(self._package_directory, 'USE')
|
|
|
try:
|
|
|
return read_file(use_path).strip('\n').split(' ')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read use flags for 'package'"
|
|
|
" parameter: {}".format(self.package_atom))
|
|
|
|
|
|
@property
|
|
|
def pkg_path(self):
|
|
|
return self._package_directory
|
|
|
|
|
|
@property
|
|
|
def slot(self):
|
|
|
slot_path = os.path.join(self._package_directory, 'SLOT')
|
|
|
try:
|
|
|
return read_file(slot_path).strip('\n')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read slot value for"
|
|
|
" 'package': {}".format(self.package_atom))
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
if isinstance(other, PackageAtomName):
|
|
|
return (self._package_directory ==
|
|
|
other._package_directory)
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
if isinstance(other, PackageAtomName):
|
|
|
return (self._package_directory !=
|
|
|
other._package_directory)
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def __bool__(self):
|
|
|
return bool(self._package_directory)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<PackageAtomName: {}/{}>'.format(self.category,
|
|
|
self.name)
|
|
|
|
|
|
def __hash__(self):
|
|
|
return hash(self._package_directory)
|
|
|
|
|
|
|
|
|
class PackageAtomParser:
|
|
|
'''Класс для парсинга параметра package, его проверки, а также определения
|
|
|
принадлежности файла пакету.'''
|
|
|
package_name_pattern =\
|
|
|
r'(?P<name>\D[\w\d]*(\-\D[\w\d]*)*)(?P<version>-\d[^\s:]*)?'
|
|
|
|
|
|
atom_regex = re.compile(r'''(?P<category>[^\s/]*)/
|
|
|
{0}
|
|
|
(?P<slot>:[^\s\[]*)?\s*
|
|
|
(?P<use_flags>\[\S*(?:\s+\S*)*\])?'''.format(
|
|
|
package_name_pattern
|
|
|
),
|
|
|
re.VERBOSE)
|
|
|
package_name_regex = re.compile(package_name_pattern)
|
|
|
|
|
|
def __init__(self, pkg_path='/var/db/pkg',
|
|
|
chroot_path='/'):
|
|
|
self.chroot_path = chroot_path
|
|
|
|
|
|
if chroot_path != '/':
|
|
|
self.pkg_path = join_paths(chroot_path, pkg_path)
|
|
|
else:
|
|
|
self.pkg_path = pkg_path
|
|
|
|
|
|
self.package_atom = ''
|
|
|
self._atom_dictionary = {}
|
|
|
|
|
|
def parse_package_parameter(self, package_atom):
|
|
|
'''Метод для разбора значения package, после разбора инициирует
|
|
|
проверку полученных значений. Возвращает объект PackageAtomName.'''
|
|
|
self.package_atom = package_atom
|
|
|
self._atom_dictionary = {}
|
|
|
|
|
|
parsing_result = self.atom_regex.search(package_atom)
|
|
|
|
|
|
if (not parsing_result or parsing_result.string != package_atom or
|
|
|
not parsing_result.groupdict()['category'] or
|
|
|
not parsing_result.groupdict()['name']):
|
|
|
raise PackageAtomError("'package' parameter value '{}' is not"
|
|
|
" correct".format(package_atom),
|
|
|
errno=NOTCORRECT)
|
|
|
|
|
|
self._atom_dictionary['category'] = parsing_result.groupdict(
|
|
|
)['category']
|
|
|
|
|
|
self._atom_dictionary['name'] = parsing_result.groupdict()['name']
|
|
|
|
|
|
if parsing_result.groupdict()['version']:
|
|
|
version_value = parsing_result.groupdict()['version'].strip('-')
|
|
|
|
|
|
self._atom_dictionary['version'] = Version(version_value)
|
|
|
|
|
|
if (parsing_result.groupdict()['slot'] and
|
|
|
parsing_result.groupdict()['slot'] != ':'):
|
|
|
self._atom_dictionary['slot'] = parsing_result.groupdict(
|
|
|
)['slot'][1:]
|
|
|
|
|
|
if parsing_result.groupdict()['use_flags']:
|
|
|
self._atom_dictionary['use_flags'] = []
|
|
|
use_flags = parsing_result.groupdict()['use_flags'].strip().\
|
|
|
rstrip(']').lstrip('[')
|
|
|
|
|
|
for use_flag in use_flags.split():
|
|
|
self._atom_dictionary['use_flags'].append(use_flag.strip())
|
|
|
|
|
|
self._check_package_existance()
|
|
|
|
|
|
atom_name_object = PackageAtomName(self._atom_dictionary)
|
|
|
self._atom_dictionary.clear()
|
|
|
return atom_name_object
|
|
|
|
|
|
def _check_package_existance(self, package_atom=''):
|
|
|
'''Метод для проверки существования пакета. Существование пакета
|
|
|
определяется наличием соответствующего CONTENTS файла.'''
|
|
|
if package_atom:
|
|
|
self.parse_package_parameter(package_atom)
|
|
|
return True
|
|
|
else:
|
|
|
# Используем glob-паттерн для поиска.
|
|
|
if 'version' in self._atom_dictionary:
|
|
|
full_name = self._atom_dictionary['name'] + '-' +\
|
|
|
self._atom_dictionary['version']._version_string
|
|
|
else:
|
|
|
full_name = self._atom_dictionary['name']
|
|
|
|
|
|
if 'version' not in self._atom_dictionary:
|
|
|
glob_result = glob.glob(
|
|
|
r'{0}/{1}/{2}-[0-9]*/CONTENTS'.format(
|
|
|
self.pkg_path,
|
|
|
self._atom_dictionary['category'],
|
|
|
full_name))
|
|
|
else:
|
|
|
glob_result = glob.glob(
|
|
|
r'{0}/{1}/{2}*/CONTENTS'.format(
|
|
|
self.pkg_path,
|
|
|
self._atom_dictionary['category'],
|
|
|
full_name))
|
|
|
|
|
|
if not glob_result:
|
|
|
# Если ничего не нашлось.
|
|
|
raise PackageAtomError("Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
if len(glob_result) == 1:
|
|
|
# Если нашелся один пакет.
|
|
|
pkg_path = os.path.dirname(next(iter(glob_result)))
|
|
|
self._check_slot_value(pkg_path)
|
|
|
self._check_use_flags_value(pkg_path)
|
|
|
|
|
|
parsed_name = self._atom_dictionary['name']
|
|
|
full_name = os.path.basename(pkg_path)
|
|
|
self._atom_dictionary['version'] = Version(
|
|
|
full_name[len(parsed_name):])
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
else:
|
|
|
packages = dict()
|
|
|
# Если подходящих пакетов много -- проверяем по use-флагам,
|
|
|
# слотам и версии, если таковые заданы.
|
|
|
for contents_path in glob_result:
|
|
|
pkg_path = os.path.dirname(contents_path)
|
|
|
try:
|
|
|
self._check_slot_value(pkg_path)
|
|
|
self._check_use_flags_value(pkg_path)
|
|
|
parsed_name = self._atom_dictionary['name']
|
|
|
full_name = os.path.basename(pkg_path)
|
|
|
packages[pkg_path] = Version(
|
|
|
full_name[len(parsed_name):])
|
|
|
except PackageAtomError:
|
|
|
continue
|
|
|
|
|
|
if not packages:
|
|
|
# Если после проверки отсеялись все кандидаты.
|
|
|
raise PackageAtomError(
|
|
|
"Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
if len(packages) == 1:
|
|
|
# Если был найден только один кандидат -- выдаем его.
|
|
|
pkg_path = next(iter(packages.keys()))
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
self._atom_dictionary['version'] = packages[pkg_path]
|
|
|
else:
|
|
|
# Если подходящих пакетов много -- берем старшую версию.
|
|
|
pkg_path = sorted(packages.keys(),
|
|
|
key=lambda path: packages[path])[-1]
|
|
|
self._atom_dictionary['pkg_path'] = pkg_path
|
|
|
self._atom_dictionary['version'] = packages[pkg_path]
|
|
|
|
|
|
def _check_slot_value(self, pkg_path):
|
|
|
'''Метод для проверки полученного из параметра package значения slot.
|
|
|
'''
|
|
|
if 'slot' in self._atom_dictionary:
|
|
|
slot = self._get_slot_value(pkg_path)
|
|
|
|
|
|
if slot != self._atom_dictionary['slot']:
|
|
|
raise PackageAtomError("Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
def _check_use_flags_value(self, pkg_path):
|
|
|
'''Метод для проверки полученных из параметра package значений
|
|
|
use-флагов.'''
|
|
|
if 'use_flags' in self._atom_dictionary:
|
|
|
use_flags = self._get_use_flags_value(pkg_path)
|
|
|
|
|
|
for use_flag in self._atom_dictionary['use_flags']:
|
|
|
if use_flag not in use_flags:
|
|
|
raise PackageAtomError(
|
|
|
"Package from 'package' parameter value"
|
|
|
" '{}' does not exist".format(
|
|
|
self.package_atom),
|
|
|
errno=NOTEXIST)
|
|
|
|
|
|
def _get_slot_value(self, pkg_path):
|
|
|
'''Метод для получения значения slot из файла SLOT.'''
|
|
|
slot_path = os.path.join(pkg_path, 'SLOT')
|
|
|
try:
|
|
|
return read_file(slot_path).strip('\n')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read slot value for"
|
|
|
" 'package': {}".format(self.package_atom))
|
|
|
|
|
|
def _get_use_flags_value(self, pkg_path):
|
|
|
'''Метод для получения списка значений use-флагов из файла USE.'''
|
|
|
use_path = os.path.join(pkg_path, 'USE')
|
|
|
try:
|
|
|
return read_file(use_path).strip('\n').split(' ')
|
|
|
except FilesError:
|
|
|
raise PackageAtomError("could not read use flags for 'package'"
|
|
|
" parameter: {}".format(self.package_atom))
|
|
|
|
|
|
def _get_category_packages(self, category):
|
|
|
'''Генератор имен категорий, имеющихся в /var/db/pkg'''
|
|
|
for path in glob.glob('{0}/{1}/*/CONTENTS'.format(self.pkg_path,
|
|
|
category)):
|
|
|
yield path
|
|
|
|
|
|
def get_file_package(self, file_path):
|
|
|
'''Метод для определения пакета, которому принадлежит файл.'''
|
|
|
# Удаляем часть пути соответствующую chroot_path
|
|
|
if self.chroot_path != '/' and file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
|
|
|
for category in os.listdir(self.pkg_path):
|
|
|
for contents_path in self._get_category_packages(category):
|
|
|
try:
|
|
|
with open(contents_path, 'r') as contents_file:
|
|
|
for file_line in contents_file.readlines():
|
|
|
contents_name = file_line.split(' ')[1].strip()
|
|
|
|
|
|
if contents_name == file_path:
|
|
|
package_path = os.path.dirname(contents_path)
|
|
|
|
|
|
package_name = os.path.basename(package_path)
|
|
|
parsing_result = self.package_name_regex.\
|
|
|
search(package_name)
|
|
|
version = parsing_result.groupdict()['version']
|
|
|
version = Version(version)
|
|
|
|
|
|
package_atom = PackageAtomName(
|
|
|
{'pkg_path': package_path,
|
|
|
'version': version})
|
|
|
return package_atom
|
|
|
except (OSError, IOError):
|
|
|
continue
|
|
|
else:
|
|
|
raise PackageNotFound("The file does not belong to any package")
|
|
|
|
|
|
@property
|
|
|
def atom_dictionary(self):
|
|
|
'''Метод для получения ATOM-словаря.'''
|
|
|
return self._atom_dictionary
|
|
|
|
|
|
@property
|
|
|
def atom_name(self):
|
|
|
'''Метод для получения из ATOM-словаря объекта PackageAtomName.'''
|
|
|
return PackageAtomName(self._atom_dictionary)
|
|
|
|
|
|
@atom_dictionary.setter
|
|
|
def set_atom_dictionary(self, atom_dictionary):
|
|
|
'''Метод для установки ATOM-словаря.'''
|
|
|
self._atom_dictionary = atom_dictionary
|
|
|
|
|
|
@property
|
|
|
def category(self):
|
|
|
'''Метод для получения категории пакета из ATOM-словаря.'''
|
|
|
if 'category' in self._atom_dictionary:
|
|
|
return self._atom_dictionary['category']
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
@property
|
|
|
def name(self):
|
|
|
'''Метод для получения имени пакета из ATOM-словаря.'''
|
|
|
if 'name' in self._atom_dictionary:
|
|
|
return self._atom_dictionary['name']
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
@property
|
|
|
def slot(self):
|
|
|
'''Метод для получения значения slot из ATOM-словаря.'''
|
|
|
if 'slot' in self._atom_dictionary:
|
|
|
return self._atom_dictionary['slot']
|
|
|
else:
|
|
|
if 'contents' in self._atom_dictionary:
|
|
|
return self._get_slot_value(self._atom_dictionary['contents'])
|
|
|
return False
|
|
|
|
|
|
@property
|
|
|
def use_flags(self):
|
|
|
'''Метод для получения use-флагов из ATOM-словаря.'''
|
|
|
if 'use_flags' in self._atom_dictionary:
|
|
|
return self._atom_dictionary['use_flags']
|
|
|
else:
|
|
|
if 'contents' in self._atom_dictionary:
|
|
|
return self._get_use_flags_value(
|
|
|
self._atom_dictionary['contents'])
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
|
|
|
class Package:
|
|
|
'''Класс для работы с принадлежностью файлов пакетам.'''
|
|
|
re_cfg = re.compile(r'/\._cfg\d{4}_')
|
|
|
|
|
|
def __init__(self, package_atom, pkg_path='/var/db/pkg', chroot_path='/'):
|
|
|
self.chroot_path = chroot_path
|
|
|
|
|
|
self.contents_file_path = self._get_contents_path(package_atom)
|
|
|
self.package_name = package_atom
|
|
|
if (chroot_path != '/' and
|
|
|
not self.contents_file_path.startswith(chroot_path)):
|
|
|
self.contents_file_path = join_paths(chroot_path,
|
|
|
self.contents_file_path)
|
|
|
|
|
|
if not os.path.exists(self.contents_file_path):
|
|
|
raise PackageError("Can not find CONTENTS file in path: {}".format(
|
|
|
self.contents_file_path
|
|
|
))
|
|
|
self.contents_dictionary = OrderedDict()
|
|
|
self.read_contents_file()
|
|
|
|
|
|
def _get_contents_path(self, package_atom):
|
|
|
'''Метод для получения из ATOM-названия или готового объекта
|
|
|
PackageAtomName пути к файлу CONTENTS.'''
|
|
|
if isinstance(package_atom, str):
|
|
|
package_atom_parser = PackageAtomParser(
|
|
|
chroot_path=self.chroot_path)
|
|
|
atom_name = package_atom_parser.parse_package_parameter(
|
|
|
package_atom)
|
|
|
return os.path.join(atom_name.pkg_path,
|
|
|
'CONTENTS')
|
|
|
elif isinstance(package_atom, PackageAtomName):
|
|
|
return os.path.join(package_atom.pkg_path,
|
|
|
'CONTENTS')
|
|
|
else:
|
|
|
raise PackageError(
|
|
|
"Incorrect 'package_atom' value: '{}', type: '{}''".
|
|
|
format(package_atom, type(package_atom)))
|
|
|
|
|
|
def remove_cfg_prefix(self, file_name):
|
|
|
'''Метод для удаления префикса ._cfg????_.'''
|
|
|
return self.re_cfg.sub('/', file_name)
|
|
|
|
|
|
def remove_chroot_path(self, file_name):
|
|
|
'''Метод для удаления из пути файла корневого пути, если он не
|
|
|
является /.'''
|
|
|
if self.chroot_path != '/' and file_name.startswith(self.chroot_path):
|
|
|
return file_name[len(self.chroot_path):]
|
|
|
else:
|
|
|
return file_name
|
|
|
|
|
|
def read_contents_file(self):
|
|
|
'''Метод для чтения файла CONTENTS.'''
|
|
|
try:
|
|
|
contents_text = read_file(self.contents_file_path)
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
if contents_text:
|
|
|
contents_format = ContentsFormat(contents_text,
|
|
|
None,
|
|
|
template_parser=False)
|
|
|
self.contents_dictionary = contents_format._document_dictionary
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def write_contents_file(self):
|
|
|
'''Метод для записи файла CONTENTS.'''
|
|
|
contents_file = open(self.contents_file_path, 'w')
|
|
|
contents_text = self.render_contents_file()
|
|
|
contents_file.write(contents_text)
|
|
|
contents_file.close()
|
|
|
|
|
|
def render_contents_file(self):
|
|
|
'''Метод для получения текста файла CONTENTS.'''
|
|
|
contents_format = ContentsFormat('', None, template_parser=False)
|
|
|
contents_format._document_dictionary = self.contents_dictionary
|
|
|
return contents_format.document_text
|
|
|
|
|
|
def add_dir(self, file_name):
|
|
|
'''Метод для добавления в CONTENTS директорий.'''
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
|
|
|
if (file_name != '/' and
|
|
|
(file_name not in self.contents_dictionary
|
|
|
or self.contents_dictionary[file_name]['type'] != 'dir')):
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
contents_item = OrderedDict({'type': 'dir'})
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_sym(self, file_name, target_path=None):
|
|
|
'''Метод для добавления в CONTENTS символьных ссылок.'''
|
|
|
file_name = self.remove_cfg_prefix(file_name)
|
|
|
|
|
|
real_path = file_name
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
|
|
|
if real_path == file_name:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
|
|
|
if target_path is None:
|
|
|
target = read_link(real_path)
|
|
|
else:
|
|
|
target = target_path
|
|
|
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
mtime = str(int(os.lstat(real_path).st_mtime))
|
|
|
try:
|
|
|
contents_item = OrderedDict({'type': 'sym',
|
|
|
'target': target,
|
|
|
'mtime': mtime})
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_obj(self, file_name, file_md5=None):
|
|
|
'''Метод для добавления в CONTENTS обычных файлов как obj.'''
|
|
|
real_path = file_name
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
file_name = self.remove_cfg_prefix(file_name)
|
|
|
|
|
|
if real_path == file_name:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
self.add_dir(os.path.dirname(file_name))
|
|
|
if file_md5 is not None:
|
|
|
md5_value = file_md5
|
|
|
else:
|
|
|
try:
|
|
|
file_text = read_file(real_path).encode()
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
md5_value = hashlib.md5(file_text).hexdigest()
|
|
|
|
|
|
contents_item = OrderedDict({'type': 'obj',
|
|
|
'md5': md5_value,
|
|
|
'mtime':
|
|
|
str(int(os.lstat(real_path).st_mtime))})
|
|
|
self.contents_dictionary[file_name] = contents_item
|
|
|
|
|
|
def add_file(self, file_name):
|
|
|
'''Метод для добавления в CONTENTS файла любого типа.'''
|
|
|
if file_name != '/':
|
|
|
real_path = file_name
|
|
|
file_name = self.remove_chroot_path(file_name)
|
|
|
|
|
|
if real_path != file_name:
|
|
|
real_path = join_paths(self.chroot_path, file_name)
|
|
|
if os.path.isdir(real_path):
|
|
|
self.add_dir(file_name)
|
|
|
elif os.path.islink(real_path):
|
|
|
self.add_link(file_name)
|
|
|
elif os.path.isfile(real_path):
|
|
|
self.add_obj(file_name)
|
|
|
|
|
|
def remove_obj(self, file_path):
|
|
|
'''Метод для удаления файлов и ссылок.'''
|
|
|
file_path = self.remove_chroot_path(file_path)
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
|
|
|
if file_path in self.contents_dictionary:
|
|
|
self.contents_dictionary.pop(file_path)
|
|
|
|
|
|
def remove_dir(self, file_path):
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся
|
|
|
внутри удаляемой директории и самой директории.'''
|
|
|
directory_path = self.remove_chroot_path(file_path)
|
|
|
paths_to_remove = []
|
|
|
|
|
|
for file_path in self.contents_dictionary:
|
|
|
if file_path.startswith(directory_path):
|
|
|
paths_to_remove.append(file_path)
|
|
|
|
|
|
for file_path in paths_to_remove:
|
|
|
self.contents_dictionary.pop(file_path)
|
|
|
|
|
|
def clear_dir(self, file_path):
|
|
|
'''Метод для удаления из CONTENTS файлов и директорий находящихся
|
|
|
внутри очищаемой директории.'''
|
|
|
directory_path = self.remove_chroot_path(file_path)
|
|
|
paths_to_remove = []
|
|
|
|
|
|
for file_path in self.contents_dictionary:
|
|
|
if file_path == directory_path:
|
|
|
continue
|
|
|
if file_path.startswith(directory_path):
|
|
|
paths_to_remove.append(file_path)
|
|
|
|
|
|
for file_path in paths_to_remove:
|
|
|
self.contents_dictionary.pop(file_path)
|
|
|
|
|
|
def remove_empty_directories(self):
|
|
|
'''Метод для удаления из CONTENTS директорий, которые после удаления
|
|
|
тех или иных файлов больше не находятся на пути к тем файлам, которые
|
|
|
по-прежнему принадлежат пакету.'''
|
|
|
used_directories = set()
|
|
|
not_directory_list = [path for path, value in
|
|
|
self.contents_dictionary.items()
|
|
|
if value['type'] != 'dir']
|
|
|
for filepath in not_directory_list:
|
|
|
file_directory = os.path.dirname(filepath)
|
|
|
while file_directory != '/':
|
|
|
used_directories.add(file_directory)
|
|
|
file_directory = os.path.dirname(file_directory)
|
|
|
|
|
|
paths_to_delete = [file_path for file_path, value in
|
|
|
self.contents_dictionary.items()
|
|
|
if value['type'] == 'dir' and
|
|
|
file_path not in used_directories]
|
|
|
|
|
|
for file_path in paths_to_delete:
|
|
|
self.contents_dictionary.pop(file_path)
|
|
|
|
|
|
def get_md5(self, file_path):
|
|
|
'''Метод для получения md5 хэш-суммы указанного файла.'''
|
|
|
try:
|
|
|
file_text = read_file(file_path).encode()
|
|
|
except FilesError as error:
|
|
|
raise PackageError(str(error))
|
|
|
|
|
|
file_md5 = hashlib.md5(file_text).hexdigest()
|
|
|
return file_md5
|
|
|
|
|
|
def is_md5_equal(self, file_path, file_md5=None):
|
|
|
'''Метод для проверки соответствия md5 хэш суммы файла той, что указана
|
|
|
в файле CONTENTS.'''
|
|
|
if file_md5 is None:
|
|
|
file_md5 = self.get_md5(file_path)
|
|
|
|
|
|
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
|
|
|
contents_md5 = self.contents_dictionary[file_path]['md5']
|
|
|
return file_md5 == contents_md5
|
|
|
|
|
|
def __contains__(self, file_path):
|
|
|
if self.chroot_path != "/" and file_path.startswith(self.chroot_path):
|
|
|
file_path = file_path[len(self.chroot_path):]
|
|
|
file_path = self.remove_cfg_prefix(file_path)
|
|
|
|
|
|
return file_path in self.contents_dictionary
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Package: {}/{}>'.format(self.package_name.category,
|
|
|
self.package_name.name)
|