5
0
Fork 0
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

330 Zeilen
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import re
from calculate.utils.device import udev, get_uuid_dict
from . import files
import xattr
import errno
from contextlib import contextmanager
from .tools import SingletonParam, flat_iterable
import tempfile
def is_mount(device_name):
'''Возвращает путь к точке монтирования, если устройство примонтировано
или '''
def find_names(old_device_name):
device_name = os.path.abspath(old_device_name)
yield device_name
if device_name.startswith('/dev'):
info = udev.get_device_info(name=device_name)
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
yield '/dev/mapper/{vg}-{lv}'.format(vg=info['DM_VG_NAME'],
lv=info['DM_LV_NAME'])
def get_overlay_mounts(line):
mounts = line.split(' ')
yield mounts[1]
for device_name in re.findall(
'(?:lowerdir=|upperdir=|workdir=)([^,]+)',
mounts[3]):
yield device_name
find_data = set(find_names(device_name))
output = ''
for mtab_line in files.read_file_lines('/etc/mtab'):
if 'overlay' not in mtab_line:
if ' ' in mtab_line:
mounts = set(mtab_line.split(' ')[:2])
if mounts & find_data:
output = (mounts - find_data).pop()
else:
mounts = set(get_overlay_mounts(mtab_line))
dest = mtab_line.split(' ')[1]
if mounts & find_data:
if dest in find_data:
output = 'overlay'
else:
return dest
return output
class MountHelperError(Exception):
pass
class MountHelperNotFound(Exception):
pass
class MountHelper:
'''Базовый класс для чтения файлов /etc/fstab и /etc/mtab.'''
DATA_FILE = '/etc/fstab'
NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0, 6)
def __init__(self, data_file=None, devices=()):
if data_file:
self.DATA_FILE = data_file
self.cache = []
self.rotated_cache = []
self.uuid_dictionary = get_uuid_dict(devices=devices)
self.update_cache()
def _read_data(self):
with open(self.DATA_FILE, 'r') as data_file:
return data_file.read()
def update_cache(self):
def setlen(ar):
return ar[:6] + [''] * (6 - len(ar))
self.cache = []
for line in self._read_data().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
separated_line = []
for part in line.split():
part = part.strip()
separated_line.append(part)
self.cache.append(separated_line)
for data in self.cache:
device_name = self.uuid_dictionary.get(data[0], data[0])
if device_name.startswith('/'):
device_name = os.path.realpath(device_name)
data[0] = udev.get_device_info(name=device_name).get(
'DEVNAME', data[0])
data[1] = data[1] if data[2] != 'swap' else 'swap'
self.rotated_cache = list(zip(*self.cache))
def get_from_fstab(self, what=DIR, where=NAME, is_in=None, is_equal=None,
contains=None, is_not_equal=None, all_entry=True):
if is_equal is not None:
def filter_function(tab_line):
return tab_line[where] == is_equal
elif is_in is not None:
def filter_function(tab_line):
return tab_line[where] in is_in
elif contains is not None:
def filter_function(tab_line):
return contains in tab_line[where]
else:
def filter_function(tab_line):
return tab_line[where] != is_not_equal
output = []
for line in filter(filter_function, self.cache):
output.append(line[what])
if all_entry:
return output
else:
return '' if not output else output[-1]
def get_fields(self, *fields):
fstab_fields = [self.rotated_cache[field] for field in fields]
return list(zip(*fstab_fields))
def is_read_only(self, what=DIR, is_equal=None):
tab_to_check = list(filter(lambda fstab_tab:
fstab_tab[what] == is_equal, self.cache))
if tab_to_check:
for data in tab_to_check:
options = data[self.OPTS].split(',')
if 'ro' in options:
return True
else:
return False
else:
raise MountHelperNotFound(is_equal)
def is_exist(self, what=DIR, is_equal=None, is_not_equal=None):
if is_equal is not None:
def filter_function(tab_line):
tab_line[what] == is_equal
else:
def filter_function(tab_line):
tab_line[what] != is_not_equal
return bool(filter(filter_function, self.cache))
@property
def writable(self, directory_path):
return not self.is_read_only(is_equal=directory_path)
@property
def read_only(self, directory_path):
return self.is_read_only(is_equal=directory_path)
@property
def exists(self, directory_path):
return self.is_exist(is_equal=directory_path)\
or self.is_exist(what=self.NAME,
is_equal=directory_path)
class FStab(MountHelper, metaclass=SingletonParam):
'''Класс для чтения содержимого /etc/fstab и его кеширования.'''
DATA_FILE = '/etc/fstab'
class Mounts(MountHelper):
'''Класс для чтения содержимого /etc/mtab и его кеширования.'''
DATA_FILE = '/etc/mtab'
class DiskSpaceError(Exception):
pass
class DiskSpace:
def __init__(self):
self.df_command = files.get_program_path('/bin/df')
def get_free(self, device=None, device_path=None):
if device:
mount_path = is_mount(device)
if not mount_path:
raise DiskSpaceError('Device {} must be mounted.'.
format(device))
device_path = device
df_process = files.Process(self.df_command, device_path, '-B1')
if df_process.success():
df_data = df_process.read().strip()
df_lines = df_data.split('\n')
if len(df_lines) >= 2:
columns = df_lines[1].split()
if len(columns) == 6:
return int(columns[3])
raise DiskSpaceError('Wrong df output:\n{}'.format(df_data))
else:
raise DiskSpaceError(str(df_process.read_error()))
def get_child_mounts(path_name):
'''Получить все точки монтирования, содержащиеся в пути.'''
mtab_file_path = '/etc/mtab'
if not os.access(mtab_file_path, os.R_OK):
return ''
with open(mtab_file_path) as mtab_file:
output = []
mtab = list(map(lambda line: line.split(' '), mtab_file))
mtab = [[line[0], line[1]] for line in mtab]
if path_name != 'none':
abs_path = os.path.abspath(path_name)
for tab_line in mtab:
if os.path.commonpath([abs_path, tab_line[1]]) == abs_path:
output.append(tab_line)
else:
abs_path = path_name
for tab_line in mtab:
if tab_line[0] == abs_path:
output.append(tab_line)
return output
class MountError(Exception):
pass
def mount(source, target, fstype=None, options=None):
parameters = [source, target]
if options is not None:
if isinstance(options, list) or isinstance(options, tuple):
options = ','.join(flat_iterable(options))
parameters.insert(0, '-o {}'.format(options))
if fstype is not None:
parameters.insert(0, '-t {}'.format(fstype))
mount_process = files.Process('/bin/mount', *parameters)
if mount_process.success():
return True
else:
raise MountError('Failed to mount {source} to {target}: {stderr}'
.format(source=source, target=target,
stderr=str(mount_process.read_error())))
def umount(target):
umount_process = files.Process('/bin/umount', target)
if umount_process.success():
return True
else:
raise MountError('Failed to umount {target}: {stderr}'
.format(target=target,
stderr=umount_process.read_error()))
class BtrfsError(Exception):
pass
class Btrfs:
check_path = None
def __init__(self, block_device):
self.block_device = block_device
if not os.path.exists(block_device):
raise BtrfsError('Device is not found.')
@contextmanager
def mount(self):
tempfile_path = None
try:
files.make_directory(self.check_path)
tempfile_path = tempfile.mkdtemp(prefix='btrfscheck-',
dir=self.check_path)
mount(self.block_device, tempfile_path, 'btrfs')
yield tempfile_path
except KeyboardInterrupt:
raise
except MountError as error:
if 'wrong fs type' in str(error):
raise BtrfsError('{} is not btrfs'.format(self.block))
else:
raise BtrfsError(str(error))
finally:
if tempfile_path:
if os.path.ismount(tempfile_path):
umount(tempfile_path)
os.rmdir(tempfile_path)
def get_compression(self, relative_path):
relative_path = relative_path.lstrip('/')
with self.mount() as device_path:
try:
absolute_path = os.path.join(device_path, relative_path)
if os.path.exists(absolute_path):
return xattr.get(absolute_path, 'btrfs.compression')
else:
return ''
except IOError as error:
if error.errno == errno.ENODATA:
return ''
raise BtrfsError('Failed to get btrfs compression.')
@property
def compression(self):
return self.get_compression('')
def set_compression(self, relative_path, value):
relative_path = relative_path.lstrip('/')
with self.mount() as device_path:
try:
absolute_path = os.path.join(device_path, relative_path)
if not os.path.exists(absolute_path):
files.make_directory(absolute_path)
return xattr.set(absolute_path, 'btrfs.compression', value)
except IOError as error:
if error.errno == errno.ENODATA:
return ''
raise BtrfsError('Failed to set btrfs compression.')
@compression.setter
def compression(self, value):
self.set_compression('', value)