Вы не можете выбрать более 25 тем
Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
330 строки
11 KiB
330 строки
11 KiB
import os
|
|
import re
|
|
from calculate.utils.device import udev, get_uuid_dict
|
|
from . import files
|
|
import xattr
|
|
import errno
|
|
from contextlib import contextmanager
|
|
from .tools import SingletonParam, flat_iterable
|
|
import tempfile
|
|
|
|
|
|
def is_mount(device_name):
|
|
'''Возвращает путь к точке монтирования, если устройство примонтировано
|
|
или '''
|
|
def find_names(old_device_name):
|
|
device_name = os.path.abspath(old_device_name)
|
|
yield device_name
|
|
if device_name.startswith('/dev'):
|
|
info = udev.get_device_info(name=device_name)
|
|
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
|
|
yield '/dev/mapper/{vg}-{lv}'.format(vg=info['DM_VG_NAME'],
|
|
lv=info['DM_LV_NAME'])
|
|
|
|
def get_overlay_mounts(line):
|
|
mounts = line.split(' ')
|
|
yield mounts[1]
|
|
for device_name in re.findall(
|
|
'(?:lowerdir=|upperdir=|workdir=)([^,]+)',
|
|
mounts[3]):
|
|
yield device_name
|
|
|
|
find_data = set(find_names(device_name))
|
|
output = ''
|
|
for mtab_line in files.read_file_lines('/etc/mtab'):
|
|
if 'overlay' not in mtab_line:
|
|
if ' ' in mtab_line:
|
|
mounts = set(mtab_line.split(' ')[:2])
|
|
if mounts & find_data:
|
|
output = (mounts - find_data).pop()
|
|
else:
|
|
mounts = set(get_overlay_mounts(mtab_line))
|
|
dest = mtab_line.split(' ')[1]
|
|
if mounts & find_data:
|
|
if dest in find_data:
|
|
output = 'overlay'
|
|
else:
|
|
return dest
|
|
return output
|
|
|
|
|
|
class MountHelperError(Exception):
|
|
pass
|
|
|
|
|
|
class MountHelperNotFound(Exception):
|
|
pass
|
|
|
|
|
|
class MountHelper:
|
|
'''Базовый класс для чтения файлов /etc/fstab и /etc/mtab.'''
|
|
DATA_FILE = '/etc/fstab'
|
|
NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0, 6)
|
|
|
|
def __init__(self, data_file=None, devices=()):
|
|
if data_file:
|
|
self.DATA_FILE = data_file
|
|
self.cache = []
|
|
self.rotated_cache = []
|
|
self.uuid_dictionary = get_uuid_dict(devices=devices)
|
|
self.update_cache()
|
|
|
|
def _read_data(self):
|
|
with open(self.DATA_FILE, 'r') as data_file:
|
|
return data_file.read()
|
|
|
|
def update_cache(self):
|
|
def setlen(ar):
|
|
return ar[:6] + [''] * (6 - len(ar))
|
|
|
|
self.cache = []
|
|
for line in self._read_data().split('\n'):
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
separated_line = []
|
|
for part in line.split():
|
|
part = part.strip()
|
|
separated_line.append(part)
|
|
self.cache.append(separated_line)
|
|
|
|
for data in self.cache:
|
|
device_name = self.uuid_dictionary.get(data[0], data[0])
|
|
|
|
if device_name.startswith('/'):
|
|
device_name = os.path.realpath(device_name)
|
|
|
|
data[0] = udev.get_device_info(name=device_name).get(
|
|
'DEVNAME', data[0])
|
|
data[1] = data[1] if data[2] != 'swap' else 'swap'
|
|
self.rotated_cache = list(zip(*self.cache))
|
|
|
|
def get_from_fstab(self, what=DIR, where=NAME, is_in=None, is_equal=None,
|
|
contains=None, is_not_equal=None, all_entry=True):
|
|
if is_equal is not None:
|
|
def filter_function(tab_line):
|
|
return tab_line[where] == is_equal
|
|
|
|
elif is_in is not None:
|
|
def filter_function(tab_line):
|
|
return tab_line[where] in is_in
|
|
|
|
elif contains is not None:
|
|
def filter_function(tab_line):
|
|
return contains in tab_line[where]
|
|
|
|
else:
|
|
def filter_function(tab_line):
|
|
return tab_line[where] != is_not_equal
|
|
|
|
output = []
|
|
|
|
for line in filter(filter_function, self.cache):
|
|
output.append(line[what])
|
|
if all_entry:
|
|
return output
|
|
else:
|
|
return '' if not output else output[-1]
|
|
|
|
def get_fields(self, *fields):
|
|
fstab_fields = [self.rotated_cache[field] for field in fields]
|
|
return list(zip(*fstab_fields))
|
|
|
|
def is_read_only(self, what=DIR, is_equal=None):
|
|
tab_to_check = list(filter(lambda fstab_tab:
|
|
fstab_tab[what] == is_equal, self.cache))
|
|
if tab_to_check:
|
|
for data in tab_to_check:
|
|
options = data[self.OPTS].split(',')
|
|
if 'ro' in options:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
raise MountHelperNotFound(is_equal)
|
|
|
|
def is_exist(self, what=DIR, is_equal=None, is_not_equal=None):
|
|
if is_equal is not None:
|
|
def filter_function(tab_line):
|
|
tab_line[what] == is_equal
|
|
else:
|
|
def filter_function(tab_line):
|
|
tab_line[what] != is_not_equal
|
|
return bool(filter(filter_function, self.cache))
|
|
|
|
@property
|
|
def writable(self, directory_path):
|
|
return not self.is_read_only(is_equal=directory_path)
|
|
|
|
@property
|
|
def read_only(self, directory_path):
|
|
return self.is_read_only(is_equal=directory_path)
|
|
|
|
@property
|
|
def exists(self, directory_path):
|
|
return self.is_exist(is_equal=directory_path)\
|
|
or self.is_exist(what=self.NAME,
|
|
is_equal=directory_path)
|
|
|
|
|
|
class FStab(MountHelper, metaclass=SingletonParam):
|
|
'''Класс для чтения содержимого /etc/fstab и его кеширования.'''
|
|
DATA_FILE = '/etc/fstab'
|
|
|
|
|
|
class Mounts(MountHelper):
|
|
'''Класс для чтения содержимого /etc/mtab и его кеширования.'''
|
|
DATA_FILE = '/etc/mtab'
|
|
|
|
|
|
class DiskSpaceError(Exception):
|
|
pass
|
|
|
|
|
|
class DiskSpace:
|
|
def __init__(self):
|
|
self.df_command = files.get_program_path('/bin/df')
|
|
|
|
def get_free(self, device=None, device_path=None):
|
|
if device:
|
|
mount_path = is_mount(device)
|
|
if not mount_path:
|
|
raise DiskSpaceError('Device {} must be mounted.'.
|
|
format(device))
|
|
device_path = device
|
|
df_process = files.Process(self.df_command, device_path, '-B1')
|
|
if df_process.success():
|
|
df_data = df_process.read().strip()
|
|
df_lines = df_data.split('\n')
|
|
if len(df_lines) >= 2:
|
|
columns = df_lines[1].split()
|
|
if len(columns) == 6:
|
|
return int(columns[3])
|
|
raise DiskSpaceError('Wrong df output:\n{}'.format(df_data))
|
|
else:
|
|
raise DiskSpaceError(str(df_process.read_error()))
|
|
|
|
|
|
def get_child_mounts(path_name):
|
|
'''Получить все точки монтирования, содержащиеся в пути.'''
|
|
mtab_file_path = '/etc/mtab'
|
|
if not os.access(mtab_file_path, os.R_OK):
|
|
return ''
|
|
|
|
with open(mtab_file_path) as mtab_file:
|
|
output = []
|
|
mtab = list(map(lambda line: line.split(' '), mtab_file))
|
|
mtab = [[line[0], line[1]] for line in mtab]
|
|
|
|
if path_name != 'none':
|
|
abs_path = os.path.abspath(path_name)
|
|
for tab_line in mtab:
|
|
if os.path.commonpath([abs_path, tab_line[1]]) == abs_path:
|
|
output.append(tab_line)
|
|
else:
|
|
abs_path = path_name
|
|
for tab_line in mtab:
|
|
if tab_line[0] == abs_path:
|
|
output.append(tab_line)
|
|
return output
|
|
|
|
|
|
class MountError(Exception):
|
|
pass
|
|
|
|
|
|
def mount(source, target, fstype=None, options=None):
|
|
parameters = [source, target]
|
|
if options is not None:
|
|
if isinstance(options, list) or isinstance(options, tuple):
|
|
options = ','.join(flat_iterable(options))
|
|
parameters.insert(0, '-o {}'.format(options))
|
|
if fstype is not None:
|
|
parameters.insert(0, '-t {}'.format(fstype))
|
|
mount_process = files.Process('/bin/mount', *parameters)
|
|
if mount_process.success():
|
|
return True
|
|
else:
|
|
raise MountError('Failed to mount {source} to {target}: {stderr}'
|
|
.format(source=source, target=target,
|
|
stderr=str(mount_process.read_error())))
|
|
|
|
|
|
def umount(target):
|
|
umount_process = files.Process('/bin/umount', target)
|
|
if umount_process.success():
|
|
return True
|
|
else:
|
|
raise MountError('Failed to umount {target}: {stderr}'
|
|
.format(target=target,
|
|
stderr=umount_process.read_error()))
|
|
|
|
|
|
class BtrfsError(Exception):
|
|
pass
|
|
|
|
|
|
class Btrfs:
|
|
check_path = None
|
|
|
|
def __init__(self, block_device):
|
|
self.block_device = block_device
|
|
if not os.path.exists(block_device):
|
|
raise BtrfsError('Device is not found.')
|
|
|
|
@contextmanager
|
|
def mount(self):
|
|
tempfile_path = None
|
|
try:
|
|
files.make_directory(self.check_path)
|
|
tempfile_path = tempfile.mkdtemp(prefix='btrfscheck-',
|
|
dir=self.check_path)
|
|
mount(self.block_device, tempfile_path, 'btrfs')
|
|
yield tempfile_path
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except MountError as error:
|
|
if 'wrong fs type' in str(error):
|
|
raise BtrfsError('{} is not btrfs'.format(self.block))
|
|
else:
|
|
raise BtrfsError(str(error))
|
|
finally:
|
|
if tempfile_path:
|
|
if os.path.ismount(tempfile_path):
|
|
umount(tempfile_path)
|
|
os.rmdir(tempfile_path)
|
|
|
|
def get_compression(self, relative_path):
|
|
relative_path = relative_path.lstrip('/')
|
|
with self.mount() as device_path:
|
|
try:
|
|
absolute_path = os.path.join(device_path, relative_path)
|
|
if os.path.exists(absolute_path):
|
|
return xattr.get(absolute_path, 'btrfs.compression')
|
|
else:
|
|
return ''
|
|
except IOError as error:
|
|
if error.errno == errno.ENODATA:
|
|
return ''
|
|
raise BtrfsError('Failed to get btrfs compression.')
|
|
|
|
@property
|
|
def compression(self):
|
|
return self.get_compression('')
|
|
|
|
def set_compression(self, relative_path, value):
|
|
relative_path = relative_path.lstrip('/')
|
|
with self.mount() as device_path:
|
|
try:
|
|
absolute_path = os.path.join(device_path, relative_path)
|
|
if not os.path.exists(absolute_path):
|
|
files.make_directory(absolute_path)
|
|
return xattr.set(absolute_path, 'btrfs.compression', value)
|
|
except IOError as error:
|
|
if error.errno == errno.ENODATA:
|
|
return ''
|
|
raise BtrfsError('Failed to set btrfs compression.')
|
|
|
|
@compression.setter
|
|
def compression(self, value):
|
|
self.set_compression('', value)
|