Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

619 rader
23 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import re
from . import files
from .tools import Cachable, GenericFS, unique
from time import sleep
def get_uuid_dict(reverse=False, devices=()):
'''Получить словарь со значениями UUID блочных устройств.'''
blkid_process = files.Process('/sbin/blkid', '-s', 'UUID',
'-c', '/dev/null', *devices)
re_split = re.compile('^([^:]+):.*UUID="([^"]+)"', re.S)
output_items = {}
search_results = []
lines = blkid_process.read_lines()
for line in lines:
search_result = re_split.search(line)
if search_result:
search_results.append(search_result.groups())
output_items = (("UUID={}".format(uuid), udev.get_devname(name=dev,
fallback=dev))
for dev, uuid in search_results)
if reverse:
return {v: k for k, v in output_items}
else:
return {k: v for k, v in output_items}
def find_device_by_partition(device):
'''Найти устройство, к которому относится данный раздел.'''
info = udev.get_device_info(name=device)
if info.get('DEVTYPE', '') != 'partition':
return ''
parent_path = os.path.dirname(info.get('DEVPATH', ''))
if parent_path:
device_info = udev.get_device_info(path=parent_path)
return device_info.get('DEVNAME', '')
return False
def count_partitions(device_name):
'''Посчитать количество разделов данного устройства.'''
syspath = udev.get_device_info(name=device_name).get('DEVPATH', '')
if not syspath:
return 0
device_name = os.path.basename(syspath)
return len([x for x in sysfs.listdir(syspath)
if x.startswith(device_name)])
def get_lspci_output(filter_name=None, short_info=False):
'''Функция для чтения вывода lspci. Возвращает словарь с идентификаторами
устройств и информацией о них.'''
re_data = re.compile(r'(\S+)\s"([^"]+)"\s+"([^"]+)"\s+"([^"]+)"', re.S)
lspci_column_names = ('type', 'vendor', 'name')
if filter_name:
if callable(filter_name):
filter_function = filter_name
else:
def filter_function(input):
return filter_name in input
else:
def filter_function(input):
return input
lspci_path = files.check_utils('/usr/sbin/lspci')
lspci_output = files.Process(lspci_path, '-m').read_lines()
output = {}
lspci_output = list(filter(filter_function, lspci_output))
for line in lspci_output:
search_result = re_data.search(line)
if search_result:
search_result = search_result.groups()
device_id = search_result[0]
output[device_id] = {key: value for key, value in zip(
lspci_column_names,
search_result[1:])}
return output
class LvmCommand:
'''Класс для работы с командой lvm и выполнения с ее помощью различных
действий.'''
@property
def lvm_command(self):
'''Возвращает кешированный путь к lvm.'''
return files.get_program_path('/sbin/lvm')
def get_physical_extent_size(self):
if not self.lvm_command:
return ''
pv_data = files.Process(self.lvm_command, 'lvmconfig', '--type',
'full', 'allocation/physical_extent_size')
if pv_data.success():
return pv_data.read()
return ''
def get_pvdisplay_output(self, option):
'''Получить вывод pv_display c указанной option.'''
if not self.lvm_command:
return ''
pv_data = files.Process(self.lvm_command, 'pvdisplay', '-C', '-o',
option, '--noh')
if pv_data.success():
return pv_data.read()
return False
def vg_scan(self):
'''Найти существующие в системе группы томов.'''
if self.lvm_command:
return files.Process(self.lvm_command, 'vgscan').success()
return False
def vg_change(self):
'''Изменить атрибуты группы томов.'''
if self.lvm_command:
failed = files.Process('vgchange', '-ay').failed()
failed |= files.Process('vgchange', '--refresh').failed()
return not failed
return False
def lv_change(self, groups):
'''Изменить атрибуты логического тома.'''
if self.lvm_command:
failed = False
for group in groups:
failed |= files.Process('lvchange', '-ay', group).failed()
failed |= files.Process('lvchange', '--refresh',
group).failed()
return failed
return False
def execute(self, *command):
'''Выполнить указанную LVM команду.'''
if self.lvm_command:
return files.Process(self.lvm_command, *command).success()
return False
def double_execute(self, *command):
'''Выполнить дважды указанную LVM команду.'''
if not self.execute(*command):
sleep(2)
return self.execute(*command)
return False
def remove_lv(self, vg, lv):
'''Удалить указанный логических томов из системы.'''
return self.double_execute('lvremove', '{0}/{1}'.format(vg, lv), '-f')
def remove_vg(self, vg):
'''Удалить указанную группу томов из системы.'''
return self.double_execute('vgremove', vg, '-f')
def remove_pv(self, pv):
'''Удалить LVM метку с физического тома.'''
return self.double_execute('pvremove', pv, '-ffy')
class Lvm(Cachable):
'''Класс для получения информации о lvm.'''
def __init__(self, lvm_command):
super().__init__()
self.lvm_command = lvm_command
def get_pvdisplay_full(self):
'''Получить полный вывод команды pvdisplay.'''
get_pvdisplay_output = self.get_pvdisplay_output(
'vg_name,lv_name,pv_name'
).strip()
if get_pvdisplay_output:
output = (line.split() for line in
get_pvdisplay_output.split('\n'))
return [tuple(y.strip() for y in x)
for x in output if x and len(x) == 3]
@property
def get_volume_groups(self):
'''Получить имеющиеся в системе группы томов.'''
return sorted({vg for vg, lv, pv in self.get_pvdisplay_full()})
@Cachable.method_cached()
def get_physical_extent_size(self):
'''Получить размер физического диапазона.'''
return self.lvm_command.get_physical_extent_size()
@Cachable.method_cached()
def get_pvdisplay_output(self, option):
'''Получить с помощью pvdisplay информацию .'''
return self.lvm_command.get_pvdisplay_output(option)
def get_used_partitions(self, vg_condition, lv_condition):
'''Получить испоьзуемые разделы.'''
return list(sorted({part for vg, lv, part in self.get_pvdisplay_full()
if vg == vg_condition and lv == lv_condition}))
def refresh(self):
if not os.environ.get('EBUILD_PHASE', False):
self.lvm_command.vg_scan()
self.lvm_command.vg_change()
self.lvm_command.lv_change(lvm)
def remove_lv(self, vg, lv):
return self.lvm_command.remove_lv(vg, lv)
def remove_vg(self, vg):
return self.lvm_command.remove_vg(vg)
def remove_pv(self, pv):
return self.lvm_command.remove_pv(pv)
class MdadmCommand:
'''Класс для работы с командой mdadm.'''
@property
def mdadm_command(self):
'''Возвращает кешированный путь к mdamd.'''
return files.get_program_path('/sbin/mdadm')
def stop_raid(self, raid_device):
'''Остановить устройство из RAID-массива.'''
if not self.mdadm_command:
return False
return files.Process(self.mdadm_command, '-S', raid_device).success()
def zero_superblock(self, device):
'''Затереть superblock для составляющей RAID-массива.'''
if not self.mdadm_command:
return False
return files.Process(self.mdadm_command, '--zero_superblock',
device).success()
class RAID:
'''Класс для работы с RAID-массивами.'''
def __init__(self, mdadm_command):
self.mdadm_command = mdadm_command
def get_devices_info(self, raid_device):
'''Получить информацию о RAID-массиве.'''
device = udev.get_device_info(path=raid_device)
if udev.is_raid(device):
for raid_block in sysfs.glob(raid_device, 'md/dev-*', 'block'):
yield udev.get_device_info(sysfs.join_path(raid_block))
def get_devices(self, raid_device, path_name='DEVPATH'):
'''Получить устройства /dev, из которых состоит RAID-массив.
Не возвращает список этих устройств для раздела сделанного для RAID
(/dev/md0p1).'''
for device_info in self.get_devices_info(raid_device):
device_name = device_info.get(path_name, '')
if device_name:
yield device_name
def get_devices_syspath(self, raid_device):
'''Получить sysfs пути устройств, из которых состоит RAID-массив.'''
for device in self.get_devices(raid_device, path_name='DEVPATH'):
yield device
def remove_raid(self, raid_name):
'''Удалить RAID-массив.'''
raid_parts = list(self.get_devices(udev.get_syspath(name=raid_name)))
failed = False
failed |= not (self.mdadm_command.stop_raid(raid_name) or
self.mdadm_command.stop_raid(raid_name))
for device in raid_parts:
failed |= not (self.mdadm_command.zero_superblock(device) or
self.mdadm_command.zero_superblock(device))
return not failed
class DeviceFS(GenericFS):
'''Базовый класс для классов предназначенных для работы с sysfs и /dev'''
def __init__(self, filesystem=None):
if isinstance(filesystem, GenericFS):
self.filesystem = filesystem
else:
self.filesystem = files.RealFS('/')
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
return output
def exists(self, *paths):
return self.filesystem.exists(self.join_path(*paths))
def read(self, *paths):
return self.filesystem.read(self.join_path(*paths))
def realpath(self, *paths):
return self.filesystem.realpath(self.join_path(*paths))
def write(self, *args):
data = args[-1]
paths = args[:-1]
self.filesystem.write(self.join_path(*paths), data)
def listdir(self, *paths, full_path=False):
return self.filesystem.listdir(self.join_path(*paths),
full_path=full_path)
def glob(self, *paths):
for file_path in self.filesystem.glob(self.join_path(*paths)):
yield file_path
class SysFS(DeviceFS):
'''Класс для работы с sysfs.'''
BASE_DIRECTORY = '/sys'
PATH = {'firmware': 'firmware',
'efi': 'firmware/efi',
'efivars': 'firmware/efi/efivars',
'classnet': 'class/net',
'input': 'class/input',
'block': 'block',
'dmi': 'class/dmi/id',
'module': 'module',
'block_scheduler': 'queue/scheduler'}
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
if output.startswith(self.BASE_DIRECTORY):
return output
else:
return os.path.join(self.BASE_DIRECTORY, output[1:])
def glob(self, *paths):
for _path in self.filesystem.glob(self.join_path(*paths)):
yield _path[len(self.BASE_DIRECTORY):]
class DevFS(DeviceFS):
'''Класс для работы с /dev.'''
BASE_DIRECTORY = '/dev'
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
if output.startswith(self.BASE_DIRECTORY):
return output
else:
return os.path.join(self.BASE_DIRECTORY, output[1:])
def glob(self, *paths):
for _path in self.filesystem.glob(self.join_path(*paths)):
yield _path[len(self.BASE_DIRECTORY):]
class UdevAdmNull:
def info_property(self, path=None, name=None):
return {}
def info_export(self):
return ''
def settle(self, timeout=15):
pass
def trigger(self, subsystem=None):
pass
@property
def broken(self):
return False
class UdevAdmCommand(UdevAdmNull):
def __init__(self):
self.first_run = True
@property
def udevadm_cmd(self):
return files.get_program_path('/sbin/udevadm')
@property
def broken(self):
return not bool(self.udevadm_cmd)
def info_property(self, path=None, name=None):
if self.first_run:
self.trigger("block")
self.settle()
self.first_run = False
if name is not None:
type_query = "--name"
value = name
else:
type_query = "--path"
value = path
udevadm_output = files.Process(self.udevadm_cmd, "info",
"--query", "property",
type_query, value).read_lines()
output_items = []
for line in udevadm_output:
if '=' in line:
output_items.append(line.partition('=')[0::2])
return dict(output_items)
def info_export(self):
return files.Process(self.udevadm_cmd, 'info', '-e').read().strip()
def trigger(self, subsystem=None):
if subsystem:
files.Process(self.udevadm_cmd, 'trigger', '--subsystem-match',
subsystem).success()
else:
files.Process(self.udevadm_cmd, 'trigger').success()
def settle(self, timeout=15):
files.Process(self.udevadm_cmd, 'settle', '--timeout={}'.
format(timeout)).success()
class Udev:
'''Класс возвращающий преобразованную или кэшированную информацию о системе
из udev.'''
def __init__(self, udevadm=None):
self.path_cache = {}
self.name_cache = {}
if isinstance(udevadm, UdevAdmCommand):
self.udevadm = udevadm
else:
self.udevadm = UdevAdmCommand()
if self.udevadm.broken:
self.udevadm = UdevAdmNull()
def clear_cache(self):
self.path_cache = {}
self.name_cache = {}
self.udevadm = UdevAdmCommand()
def get_device_info(self, path=None, name=None):
if name is not None:
cache = self.name_cache
value = devfs.realpath(name)
name = value
else:
cache = self.path_cache
value = sysfs.realpath(path)
path = value
if value not in cache:
data = self.udevadm.info_property(path, name)
devname = data.get('DEVNAME', '')
devpath = data.get('DEVPATH', '')
if devname:
self.name_cache[devname] = data
if devpath:
devpath = '/sys{}'.format(devpath)
self.path_cache[devname] = data
return data
else:
return cache[value]
def get_block_devices(self):
for block in sysfs.glob(sysfs.PATH['block'], '*'):
yield block
blockname = os.path.basename(block)
for part in sysfs.glob(block, '{}*'.format(blockname)):
yield part
def syspath_to_devname(self, devices, drop_empty=True):
for device_path in devices:
info = self.get_device_info(path=device_path)
if 'DEVNAME' in info:
yield info['DEVNAME']
elif not drop_empty:
yield ''
def devname_to_syspath(self, devices, drop_empty=True):
for device_name in devices:
info = self.get_device_info(name=device_name)
if 'DEVPATH' in info:
yield info['DEVPATH']
elif not drop_empty:
yield ''
def get_devname(self, path=None, name=None, fallback=None):
if fallback is None:
if name:
fallback = name
else:
fallback = ''
return self.get_device_info(path, name).get('DEVNAME', fallback)
def get_syspath(self, path=None, name=None, fallback=None):
if fallback is None:
if path:
fallback = path
else:
fallback = ''
return self.get_device_info(path, name).get('DEVPATH', fallback)
def refresh(self, trigger_only=False):
if not trigger_only:
self.clear_cache()
files.quite_unlink('/etc/blkid.tab')
if not os.environ.get('EBUILD_PHASE'):
self.udevadm.trigger(subsystem='block')
self.udevadm.settle(15)
def is_cdrom(self, udev_data):
return udev_data.get('ID_CDROM', '') == '1'
def is_device(self, udev_data):
if 'DEVPATH' not in udev_data:
return False
return (sysfs.exists(udev_data.get('DEVPATH', ''), 'device') and
udev_data.get('DEVTYPE', '') == 'disk')
def is_raid(self, udev_data):
return (udev_data.get('MD_LEVEL', '').startswith('raid') and
udev_data.get('DEVTYPE', '') == 'disk')
def is_raid_partition(self, udev_data):
return (udev_data.get('MD_LEVEL', '').startswith('raid') and
udev_data.get('DEVTYPE', '') == 'partition')
def is_lvm(self, udev_data):
return 'DM_LV_NAME' in udev_data and 'DM_VG_NAME' in udev_data
def is_partition(self, udev_data):
return udev_data.get('DEVTYPE', '') == 'partition'
def is_block_device(self, udev_data):
return udev_data.get('SUBSYSTEM', '') == 'block'
def get_device_type(self, path=None, name=None):
info = self.get_device_info(path, name)
syspath = info.get('DEVPATH', '')
if self.is_cdrom(info):
return 'cdrom'
if self.is_device(info):
return 'disk'
if self.is_partition(info):
return '{}-partition'.format(
self.get_device_type(os.path.dirname(syspath)))
if self.is_raid(info):
for raid_device in raid.get_devices_syspath(syspath):
return '{}-{}'.format(self.get_device_type(raid_device),
info['MD_LEVEL'])
else:
return 'loop'
if self.is_lvm(info):
for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'],
info['DM_LV_NAME']):
return '{}-lvm'.format(self.get_device_type(name=lv_device))
if self.is_block_device(info):
return 'loop'
return ''
def get_partition_type(self, path=None, name=None):
info = self.get_device_info(path, name)
if info.get('ID_PART_ENTRY_SCHEME') == 'dos':
part_id = info.get('ID_PART_ENTRY_TYPE', '')
part_number = info.get('ID_PART_ENTRY_NUMBER', '')
if part_id and part_number:
if part_id == '0x5':
return 'extended'
elif int(part_number) > 4:
return 'logical'
else:
return 'primary'
return info.get('ID_PART_TABLE_TYPE', '')
def _get_disk_devices(self, path=None, name=None):
info = udev.get_device_info(path=path, name=name)
syspath = info.get('DEVPATH', '')
if syspath:
# real device
if self.is_device(info):
yield True, info.get('DEVNAME', '')
# partition
elif self.is_partition(info):
for device in self._get_disk_devices(
path=os.path.dirname(syspath)):
yield device
# md raid
elif udev.is_raid(info):
yield False, info.get('DEVNAME', '')
for raid_device in sorted(raid.get_devices_syspath(syspath)):
for device in self._get_disk_devices(path=raid_device):
yield device
# lvm
elif udev.is_lvm(info):
yield False, info.get('DEVNAME', '')
for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'],
info['DM_LV_NAME']):
for device in self._get_disk_devices(name=lv_device):
yield device
def get_disk_devices(self, path=None, name=None):
return sorted({
device for real_device, device in
self._get_disk_devices(path, name) if real_device
})
def get_all_base_devices(self, path=None, name=None):
try:
devices = (device for real_device, device in
self._get_disk_devices(path, name)
if not device.startswith('/dev/loop'))
if not self.is_partition(self.get_device_info(path, name)):
next(devices)
return list(unique(devices))
except StopIteration:
return []
sysfs = SysFS()
devfs = DevFS()
udev = Udev(UdevAdmCommand())
lvm = Lvm(LvmCommand())
raid = RAID(MdadmCommand())