import os import re from calculate.utils.device import udev, get_uuid_dict from . import files import xattr import errno from contextlib import contextmanager from .tools import SingletonParam, flat_iterable import tempfile def is_mount(device_name): '''Возвращает путь к точке монтирования, если устройство примонтировано или ''' def find_names(old_device_name): device_name = os.path.abspath(old_device_name) yield device_name if device_name.startswith('/dev'): info = udev.get_device_info(name=device_name) if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info: yield '/dev/mapper/{vg}-{lv}'.format(vg=info['DM_VG_NAME'], lv=info['DM_LV_NAME']) def get_overlay_mounts(line): mounts = line.split(' ') yield mounts[1] for device_name in re.findall( '(?:lowerdir=|upperdir=|workdir=)([^,]+)', mounts[3]): yield device_name find_data = set(find_names(device_name)) output = '' for mtab_line in files.read_file_lines('/etc/mtab'): if 'overlay' not in mtab_line: if ' ' in mtab_line: mounts = set(mtab_line.split(' ')[:2]) if mounts & find_data: output = (mounts - find_data).pop() else: mounts = set(get_overlay_mounts(mtab_line)) dest = mtab_line.split(' ')[1] if mounts & find_data: if dest in find_data: output = 'overlay' else: return dest return output class MountHelperError(Exception): pass class MountHelperNotFound(Exception): pass class MountHelper: '''Базовый класс для чтения файлов /etc/fstab и /etc/mtab.''' DATA_FILE = '/etc/fstab' NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0, 6) def __init__(self, data_file=None, devices=()): if data_file: self.DATA_FILE = data_file self.cache = [] self.rotated_cache = [] self.uuid_dictionary = get_uuid_dict(devices=devices) self.update_cache() def _read_data(self): with open(self.DATA_FILE, 'r') as data_file: return data_file.read() def update_cache(self): def setlen(ar): return ar[:6] + [''] * (6 - len(ar)) self.cache = [] for line in self._read_data().split('\n'): line = line.strip() if line and not line.startswith('#'): separated_line = [] for part in line.split(): part = part.strip() separated_line.append(part) self.cache.append(separated_line) for data in self.cache: device_name = self.uuid_dictionary.get(data[0], data[0]) if device_name.startswith('/'): device_name = os.path.realpath(device_name) data[0] = udev.get_device_info(name=device_name).get( 'DEVNAME', data[0]) data[1] = data[1] if data[2] != 'swap' else 'swap' self.rotated_cache = list(zip(*self.cache)) def get_from_fstab(self, what=DIR, where=NAME, is_in=None, is_equal=None, contains=None, is_not_equal=None, all_entry=True): if is_equal is not None: def filter_function(tab_line): return tab_line[where] == is_equal elif is_in is not None: def filter_function(tab_line): return tab_line[where] in is_in elif contains is not None: def filter_function(tab_line): return contains in tab_line[where] else: def filter_function(tab_line): return tab_line[where] != is_not_equal output = [] for line in filter(filter_function, self.cache): output.append(line[what]) if all_entry: return output else: return '' if not output else output[-1] def get_fields(self, *fields): fstab_fields = [self.rotated_cache[field] for field in fields] return list(zip(*fstab_fields)) def is_read_only(self, what=DIR, is_equal=None): tab_to_check = list(filter(lambda fstab_tab: fstab_tab[what] == is_equal, self.cache)) if tab_to_check: for data in tab_to_check: options = data[self.OPTS].split(',') if 'ro' in options: return True else: return False else: raise MountHelperNotFound(is_equal) def is_exist(self, what=DIR, is_equal=None, is_not_equal=None): if is_equal is not None: def filter_function(tab_line): tab_line[what] == is_equal else: def filter_function(tab_line): tab_line[what] != is_not_equal return bool(filter(filter_function, self.cache)) @property def writable(self, directory_path): return not self.is_read_only(is_equal=directory_path) @property def read_only(self, directory_path): return self.is_read_only(is_equal=directory_path) @property def exists(self, directory_path): return self.is_exist(is_equal=directory_path)\ or self.is_exist(what=self.NAME, is_equal=directory_path) class FStab(MountHelper, metaclass=SingletonParam): '''Класс для чтения содержимого /etc/fstab и его кеширования.''' DATA_FILE = '/etc/fstab' class Mounts(MountHelper): '''Класс для чтения содержимого /etc/mtab и его кеширования.''' DATA_FILE = '/etc/mtab' class DiskSpaceError(Exception): pass class DiskSpace: def __init__(self): self.df_command = files.get_program_path('/bin/df') def get_free(self, device=None, device_path=None): if device: mount_path = is_mount(device) if not mount_path: raise DiskSpaceError('Device {} must be mounted.'. format(device)) device_path = device df_process = files.Process(self.df_command, device_path, '-B1') if df_process.success(): df_data = df_process.read().strip() df_lines = df_data.split('\n') if len(df_lines) >= 2: columns = df_lines[1].split() if len(columns) == 6: return int(columns[3]) raise DiskSpaceError('Wrong df output:\n{}'.format(df_data)) else: raise DiskSpaceError(str(df_process.read_error())) def get_child_mounts(path_name): '''Получить все точки монтирования, содержащиеся в пути.''' mtab_file_path = '/etc/mtab' if not os.access(mtab_file_path, os.R_OK): return '' with open(mtab_file_path) as mtab_file: output = [] mtab = list(map(lambda line: line.split(' '), mtab_file)) mtab = [[line[0], line[1]] for line in mtab] if path_name != 'none': abs_path = os.path.abspath(path_name) for tab_line in mtab: if os.path.commonpath([abs_path, tab_line[1]]) == abs_path: output.append(tab_line) else: abs_path = path_name for tab_line in mtab: if tab_line[0] == abs_path: output.append(tab_line) return output class MountError(Exception): pass def mount(source, target, fstype=None, options=None): parameters = [source, target] if options is not None: if isinstance(options, list) or isinstance(options, tuple): options = ','.join(flat_iterable(options)) parameters.insert(0, '-o {}'.format(options)) if fstype is not None: parameters.insert(0, '-t {}'.format(fstype)) mount_process = files.Process('/bin/mount', *parameters) if mount_process.success(): return True else: raise MountError('Failed to mount {source} to {target}: {stderr}' .format(source=source, target=target, stderr=str(mount_process.read_error()))) def umount(target): umount_process = files.Process('/bin/umount', target) if umount_process.success(): return True else: raise MountError('Failed to umount {target}: {stderr}' .format(target=target, stderr=umount_process.read_error())) class BtrfsError(Exception): pass class Btrfs: check_path = None def __init__(self, block_device): self.block_device = block_device if not os.path.exists(block_device): raise BtrfsError('Device is not found.') @contextmanager def mount(self): tempfile_path = None try: files.make_directory(self.check_path) tempfile_path = tempfile.mkdtemp(prefix='btrfscheck-', dir=self.check_path) mount(self.block_device, tempfile_path, 'btrfs') yield tempfile_path except KeyboardInterrupt: raise except MountError as error: if 'wrong fs type' in str(error): raise BtrfsError('{} is not btrfs'.format(self.block)) else: raise BtrfsError(str(error)) finally: if tempfile_path: if os.path.ismount(tempfile_path): umount(tempfile_path) os.rmdir(tempfile_path) def get_compression(self, relative_path): relative_path = relative_path.lstrip('/') with self.mount() as device_path: try: absolute_path = os.path.join(device_path, relative_path) if os.path.exists(absolute_path): return xattr.get(absolute_path, 'btrfs.compression') else: return '' except IOError as error: if error.errno == errno.ENODATA: return '' raise BtrfsError('Failed to get btrfs compression.') @property def compression(self): return self.get_compression('') def set_compression(self, relative_path, value): relative_path = relative_path.lstrip('/') with self.mount() as device_path: try: absolute_path = os.path.join(device_path, relative_path) if not os.path.exists(absolute_path): files.make_directory(absolute_path) return xattr.set(absolute_path, 'btrfs.compression', value) except IOError as error: if error.errno == errno.ENODATA: return '' raise BtrfsError('Failed to set btrfs compression.') @compression.setter def compression(self, value): self.set_compression('', value)