import os import re from . import files from .tools import Cachable, GenericFS, unique from time import sleep def get_uuid_dict(reverse=False, devices=()): '''Получить словарь со значениями UUID блочных устройств.''' blkid_process = files.Process('/sbin/blkid', '-s', 'UUID', '-c', '/dev/null', *devices) re_split = re.compile('^([^:]+):.*UUID="([^"]+)"', re.S) output_items = {} search_results = [] lines = blkid_process.read_lines() for line in lines: search_result = re_split.search(line) if search_result: search_results.append(search_result.groups()) output_items = (("UUID={}".format(uuid), udev.get_devname(name=dev, fallback=dev)) for dev, uuid in search_results) if reverse: return {v: k for k, v in output_items} else: return {k: v for k, v in output_items} def find_device_by_partition(device): '''Найти устройство, к которому относится данный раздел.''' info = udev.get_device_info(name=device) if info.get('DEVTYPE', '') != 'partition': return '' parent_path = os.path.dirname(info.get('DEVPATH', '')) if parent_path: device_info = udev.get_device_info(path=parent_path) return device_info.get('DEVNAME', '') return False def count_partitions(device_name): '''Посчитать количество разделов данного устройства.''' syspath = udev.get_device_info(name=device_name).get('DEVPATH', '') if not syspath: return 0 device_name = os.path.basename(syspath) return len([x for x in sysfs.listdir(syspath) if x.startswith(device_name)]) def get_lspci_output(filter_name=None, short_info=False): '''Функция для чтения вывода lspci. Возвращает словарь с идентификаторами устройств и информацией о них.''' re_data = re.compile(r'(\S+)\s"([^"]+)"\s+"([^"]+)"\s+"([^"]+)"', re.S) lspci_column_names = ('type', 'vendor', 'name') if filter_name: if callable(filter_name): filter_function = filter_name else: def filter_function(input): return filter_name in input else: def filter_function(input): return input lspci_path = files.check_utils('/usr/sbin/lspci') lspci_output = files.Process(lspci_path, '-m').read_lines() output = {} lspci_output = list(filter(filter_function, lspci_output)) for line in lspci_output: search_result = re_data.search(line) if search_result: search_result = search_result.groups() device_id = search_result[0] output[device_id] = {key: value for key, value in zip( lspci_column_names, search_result[1:])} return output class LvmCommand: '''Класс для работы с командой lvm и выполнения с ее помощью различных действий.''' @property def lvm_command(self): '''Возвращает кешированный путь к lvm.''' return files.get_program_path('/sbin/lvm') def get_physical_extent_size(self): if not self.lvm_command: return '' pv_data = files.Process(self.lvm_command, 'lvmconfig', '--type', 'full', 'allocation/physical_extent_size') if pv_data.success(): return pv_data.read() return '' def get_pvdisplay_output(self, option): '''Получить вывод pv_display c указанной option.''' if not self.lvm_command: return '' pv_data = files.Process(self.lvm_command, 'pvdisplay', '-C', '-o', option, '--noh') if pv_data.success(): return pv_data.read() return False def vg_scan(self): '''Найти существующие в системе группы томов.''' if self.lvm_command: return files.Process(self.lvm_command, 'vgscan').success() return False def vg_change(self): '''Изменить атрибуты группы томов.''' if self.lvm_command: failed = files.Process('vgchange', '-ay').failed() failed |= files.Process('vgchange', '--refresh').failed() return not failed return False def lv_change(self, groups): '''Изменить атрибуты логического тома.''' if self.lvm_command: failed = False for group in groups: failed |= files.Process('lvchange', '-ay', group).failed() failed |= files.Process('lvchange', '--refresh', group).failed() return failed return False def execute(self, *command): '''Выполнить указанную LVM команду.''' if self.lvm_command: return files.Process(self.lvm_command, *command).success() return False def double_execute(self, *command): '''Выполнить дважды указанную LVM команду.''' if not self.execute(*command): sleep(2) return self.execute(*command) return False def remove_lv(self, vg, lv): '''Удалить указанный логических томов из системы.''' return self.double_execute('lvremove', '{0}/{1}'.format(vg, lv), '-f') def remove_vg(self, vg): '''Удалить указанную группу томов из системы.''' return self.double_execute('vgremove', vg, '-f') def remove_pv(self, pv): '''Удалить LVM метку с физического тома.''' return self.double_execute('pvremove', pv, '-ffy') class Lvm(Cachable): '''Класс для получения информации о lvm.''' def __init__(self, lvm_command): super().__init__() self.lvm_command = lvm_command def get_pvdisplay_full(self): '''Получить полный вывод команды pvdisplay.''' get_pvdisplay_output = self.get_pvdisplay_output( 'vg_name,lv_name,pv_name' ).strip() if get_pvdisplay_output: output = (line.split() for line in get_pvdisplay_output.split('\n')) return [tuple(y.strip() for y in x) for x in output if x and len(x) == 3] @property def get_volume_groups(self): '''Получить имеющиеся в системе группы томов.''' return sorted({vg for vg, lv, pv in self.get_pvdisplay_full()}) @Cachable.method_cached() def get_physical_extent_size(self): '''Получить размер физического диапазона.''' return self.lvm_command.get_physical_extent_size() @Cachable.method_cached() def get_pvdisplay_output(self, option): '''Получить с помощью pvdisplay информацию .''' return self.lvm_command.get_pvdisplay_output(option) def get_used_partitions(self, vg_condition, lv_condition): '''Получить испоьзуемые разделы.''' return list(sorted({part for vg, lv, part in self.get_pvdisplay_full() if vg == vg_condition and lv == lv_condition})) def refresh(self): if not os.environ.get('EBUILD_PHASE', False): self.lvm_command.vg_scan() self.lvm_command.vg_change() self.lvm_command.lv_change(lvm) def remove_lv(self, vg, lv): return self.lvm_command.remove_lv(vg, lv) def remove_vg(self, vg): return self.lvm_command.remove_vg(vg) def remove_pv(self, pv): return self.lvm_command.remove_pv(pv) class MdadmCommand: '''Класс для работы с командой mdadm.''' @property def mdadm_command(self): '''Возвращает кешированный путь к mdamd.''' return files.get_program_path('/sbin/mdadm') def stop_raid(self, raid_device): '''Остановить устройство из RAID-массива.''' if not self.mdadm_command: return False return files.Process(self.mdadm_command, '-S', raid_device).success() def zero_superblock(self, device): '''Затереть superblock для составляющей RAID-массива.''' if not self.mdadm_command: return False return files.Process(self.mdadm_command, '--zero_superblock', device).success() class RAID: '''Класс для работы с RAID-массивами.''' def __init__(self, mdadm_command): self.mdadm_command = mdadm_command def get_devices_info(self, raid_device): '''Получить информацию о RAID-массиве.''' device = udev.get_device_info(path=raid_device) if udev.is_raid(device): for raid_block in sysfs.glob(raid_device, 'md/dev-*', 'block'): yield udev.get_device_info(sysfs.join_path(raid_block)) def get_devices(self, raid_device, path_name='DEVPATH'): '''Получить устройства /dev, из которых состоит RAID-массив. Не возвращает список этих устройств для раздела сделанного для RAID (/dev/md0p1).''' for device_info in self.get_devices_info(raid_device): device_name = device_info.get(path_name, '') if device_name: yield device_name def get_devices_syspath(self, raid_device): '''Получить sysfs пути устройств, из которых состоит RAID-массив.''' for device in self.get_devices(raid_device, path_name='DEVPATH'): yield device def remove_raid(self, raid_name): '''Удалить RAID-массив.''' raid_parts = list(self.get_devices(udev.get_syspath(name=raid_name))) failed = False failed |= not (self.mdadm_command.stop_raid(raid_name) or self.mdadm_command.stop_raid(raid_name)) for device in raid_parts: failed |= not (self.mdadm_command.zero_superblock(device) or self.mdadm_command.zero_superblock(device)) return not failed class DeviceFS(GenericFS): '''Базовый класс для классов предназначенных для работы с sysfs и /dev''' def __init__(self, filesystem=None): if isinstance(filesystem, GenericFS): self.filesystem = filesystem else: self.filesystem = files.RealFS('/') def join_path(self, *paths): output = os.path.join('/', *(_path[1:] if _path.startswith('/') else _path for _path in paths)) return output def exists(self, *paths): return self.filesystem.exists(self.join_path(*paths)) def read(self, *paths): return self.filesystem.read(self.join_path(*paths)) def realpath(self, *paths): return self.filesystem.realpath(self.join_path(*paths)) def write(self, *args): data = args[-1] paths = args[:-1] self.filesystem.write(self.join_path(*paths), data) def listdir(self, *paths, full_path=False): return self.filesystem.listdir(self.join_path(*paths), full_path=full_path) def glob(self, *paths): for file_path in self.filesystem.glob(self.join_path(*paths)): yield file_path class SysFS(DeviceFS): '''Класс для работы с sysfs.''' BASE_DIRECTORY = '/sys' PATH = {'firmware': 'firmware', 'efi': 'firmware/efi', 'efivars': 'firmware/efi/efivars', 'classnet': 'class/net', 'input': 'class/input', 'block': 'block', 'dmi': 'class/dmi/id', 'module': 'module', 'block_scheduler': 'queue/scheduler'} def join_path(self, *paths): output = os.path.join('/', *(_path[1:] if _path.startswith('/') else _path for _path in paths)) if output.startswith(self.BASE_DIRECTORY): return output else: return os.path.join(self.BASE_DIRECTORY, output[1:]) def glob(self, *paths): for _path in self.filesystem.glob(self.join_path(*paths)): yield _path[len(self.BASE_DIRECTORY):] class DevFS(DeviceFS): '''Класс для работы с /dev.''' BASE_DIRECTORY = '/dev' def join_path(self, *paths): output = os.path.join('/', *(_path[1:] if _path.startswith('/') else _path for _path in paths)) if output.startswith(self.BASE_DIRECTORY): return output else: return os.path.join(self.BASE_DIRECTORY, output[1:]) def glob(self, *paths): for _path in self.filesystem.glob(self.join_path(*paths)): yield _path[len(self.BASE_DIRECTORY):] class UdevAdmNull: def info_property(self, path=None, name=None): return {} def info_export(self): return '' def settle(self, timeout=15): pass def trigger(self, subsystem=None): pass @property def broken(self): return False class UdevAdmCommand(UdevAdmNull): def __init__(self): self.first_run = True @property def udevadm_cmd(self): return files.get_program_path('/sbin/udevadm') @property def broken(self): return not bool(self.udevadm_cmd) def info_property(self, path=None, name=None): if self.first_run: self.trigger("block") self.settle() self.first_run = False if name is not None: type_query = "--name" value = name else: type_query = "--path" value = path udevadm_output = files.Process(self.udevadm_cmd, "info", "--query", "property", type_query, value).read_lines() output_items = [] for line in udevadm_output: if '=' in line: output_items.append(line.partition('=')[0::2]) return dict(output_items) def info_export(self): return files.Process(self.udevadm_cmd, 'info', '-e').read().strip() def trigger(self, subsystem=None): if subsystem: files.Process(self.udevadm_cmd, 'trigger', '--subsystem-match', subsystem).success() else: files.Process(self.udevadm_cmd, 'trigger').success() def settle(self, timeout=15): files.Process(self.udevadm_cmd, 'settle', '--timeout={}'. format(timeout)).success() class Udev: '''Класс возвращающий преобразованную или кэшированную информацию о системе из udev.''' def __init__(self, udevadm=None): self.path_cache = {} self.name_cache = {} if isinstance(udevadm, UdevAdmCommand): self.udevadm = udevadm else: self.udevadm = UdevAdmCommand() if self.udevadm.broken: self.udevadm = UdevAdmNull() def clear_cache(self): self.path_cache = {} self.name_cache = {} self.udevadm = UdevAdmCommand() def get_device_info(self, path=None, name=None): if name is not None: cache = self.name_cache value = devfs.realpath(name) name = value else: cache = self.path_cache value = sysfs.realpath(path) path = value if value not in cache: data = self.udevadm.info_property(path, name) devname = data.get('DEVNAME', '') devpath = data.get('DEVPATH', '') if devname: self.name_cache[devname] = data if devpath: devpath = '/sys{}'.format(devpath) self.path_cache[devname] = data return data else: return cache[value] def get_block_devices(self): for block in sysfs.glob(sysfs.PATH['block'], '*'): yield block blockname = os.path.basename(block) for part in sysfs.glob(block, '{}*'.format(blockname)): yield part def syspath_to_devname(self, devices, drop_empty=True): for device_path in devices: info = self.get_device_info(path=device_path) if 'DEVNAME' in info: yield info['DEVNAME'] elif not drop_empty: yield '' def devname_to_syspath(self, devices, drop_empty=True): for device_name in devices: info = self.get_device_info(name=device_name) if 'DEVPATH' in info: yield info['DEVPATH'] elif not drop_empty: yield '' def get_devname(self, path=None, name=None, fallback=None): if fallback is None: if name: fallback = name else: fallback = '' return self.get_device_info(path, name).get('DEVNAME', fallback) def get_syspath(self, path=None, name=None, fallback=None): if fallback is None: if path: fallback = path else: fallback = '' return self.get_device_info(path, name).get('DEVPATH', fallback) def refresh(self, trigger_only=False): if not trigger_only: self.clear_cache() files.quite_unlink('/etc/blkid.tab') if not os.environ.get('EBUILD_PHASE'): self.udevadm.trigger(subsystem='block') self.udevadm.settle(15) def is_cdrom(self, udev_data): return udev_data.get('ID_CDROM', '') == '1' def is_device(self, udev_data): if 'DEVPATH' not in udev_data: return False return (sysfs.exists(udev_data.get('DEVPATH', ''), 'device') and udev_data.get('DEVTYPE', '') == 'disk') def is_raid(self, udev_data): return (udev_data.get('MD_LEVEL', '').startswith('raid') and udev_data.get('DEVTYPE', '') == 'disk') def is_raid_partition(self, udev_data): return (udev_data.get('MD_LEVEL', '').startswith('raid') and udev_data.get('DEVTYPE', '') == 'partition') def is_lvm(self, udev_data): return 'DM_LV_NAME' in udev_data and 'DM_VG_NAME' in udev_data def is_partition(self, udev_data): return udev_data.get('DEVTYPE', '') == 'partition' def is_block_device(self, udev_data): return udev_data.get('SUBSYSTEM', '') == 'block' def get_device_type(self, path=None, name=None): info = self.get_device_info(path, name) syspath = info.get('DEVPATH', '') if self.is_cdrom(info): return 'cdrom' if self.is_device(info): return 'disk' if self.is_partition(info): return '{}-partition'.format( self.get_device_type(os.path.dirname(syspath))) if self.is_raid(info): for raid_device in raid.get_devices_syspath(syspath): return '{}-{}'.format(self.get_device_type(raid_device), info['MD_LEVEL']) else: return 'loop' if self.is_lvm(info): for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'], info['DM_LV_NAME']): return '{}-lvm'.format(self.get_device_type(name=lv_device)) if self.is_block_device(info): return 'loop' return '' def get_partition_type(self, path=None, name=None): info = self.get_device_info(path, name) if info.get('ID_PART_ENTRY_SCHEME') == 'dos': part_id = info.get('ID_PART_ENTRY_TYPE', '') part_number = info.get('ID_PART_ENTRY_NUMBER', '') if part_id and part_number: if part_id == '0x5': return 'extended' elif int(part_number) > 4: return 'logical' else: return 'primary' return info.get('ID_PART_TABLE_TYPE', '') def _get_disk_devices(self, path=None, name=None): info = udev.get_device_info(path=path, name=name) syspath = info.get('DEVPATH', '') if syspath: # real device if self.is_device(info): yield True, info.get('DEVNAME', '') # partition elif self.is_partition(info): for device in self._get_disk_devices( path=os.path.dirname(syspath)): yield device # md raid elif udev.is_raid(info): yield False, info.get('DEVNAME', '') for raid_device in sorted(raid.get_devices_syspath(syspath)): for device in self._get_disk_devices(path=raid_device): yield device # lvm elif udev.is_lvm(info): yield False, info.get('DEVNAME', '') for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'], info['DM_LV_NAME']): for device in self._get_disk_devices(name=lv_device): yield device def get_disk_devices(self, path=None, name=None): return sorted({ device for real_device, device in self._get_disk_devices(path, name) if real_device }) def get_all_base_devices(self, path=None, name=None): try: devices = (device for real_device, device in self._get_disk_devices(path, name) if not device.startswith('/dev/loop')) if not self.is_partition(self.get_device_info(path, name)): next(devices) return list(unique(devices)) except StopIteration: return [] sysfs = SysFS() devfs = DevFS() udev = Udev(UdevAdmCommand()) lvm = Lvm(LvmCommand()) raid = RAID(MdadmCommand())