Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

619 řádky
23 KiB

Tento soubor obsahuje nejednoznačné znaky Unicode!

Tento soubor obsahuje nejednoznačné znaky Unicode, které mohou být zaměněny s ostatními v aktuálním prostředí. Pokud je váš případ úmyslný a legitimní, můžete toto varování bezpečně ignorovat. Použijte tlačítko Escape sekvence pro zvýraznění těchto znaků.

import os
import re
from . import files
from .tools import Cachable, GenericFS, unique
from time import sleep
def get_uuid_dict(reverse=False, devices=()):
'''Получить словарь со значениями UUID блочных устройств.'''
blkid_process = files.Process('/sbin/blkid', '-s', 'UUID',
'-c', '/dev/null', *devices)
re_split = re.compile('^([^:]+):.*UUID="([^"]+)"', re.S)
output_items = {}
search_results = []
lines = blkid_process.read_lines()
for line in lines:
search_result = re_split.search(line)
if search_result:
search_results.append(search_result.groups())
output_items = (("UUID={}".format(uuid), udev.get_devname(name=dev,
fallback=dev))
for dev, uuid in search_results)
if reverse:
return {v: k for k, v in output_items}
else:
return {k: v for k, v in output_items}
def find_device_by_partition(device):
'''Найти устройство, к которому относится данный раздел.'''
info = udev.get_device_info(name=device)
if info.get('DEVTYPE', '') != 'partition':
return ''
parent_path = os.path.dirname(info.get('DEVPATH', ''))
if parent_path:
device_info = udev.get_device_info(path=parent_path)
return device_info.get('DEVNAME', '')
return False
def count_partitions(device_name):
'''Посчитать количество разделов данного устройства.'''
syspath = udev.get_device_info(name=device_name).get('DEVPATH', '')
if not syspath:
return 0
device_name = os.path.basename(syspath)
return len([x for x in sysfs.listdir(syspath)
if x.startswith(device_name)])
def get_lspci_output(filter_name=None, short_info=False):
'''Функция для чтения вывода lspci. Возвращает словарь с идентификаторами
устройств и информацией о них.'''
re_data = re.compile(r'(\S+)\s"([^"]+)"\s+"([^"]+)"\s+"([^"]+)"', re.S)
lspci_column_names = ('type', 'vendor', 'name')
if filter_name:
if callable(filter_name):
filter_function = filter_name
else:
def filter_function(input):
return filter_name in input
else:
def filter_function(input):
return input
lspci_path = files.check_utils('/usr/sbin/lspci')
lspci_output = files.Process(lspci_path, '-m').read_lines()
output = {}
lspci_output = list(filter(filter_function, lspci_output))
for line in lspci_output:
search_result = re_data.search(line)
if search_result:
search_result = search_result.groups()
device_id = search_result[0]
output[device_id] = {key: value for key, value in zip(
lspci_column_names,
search_result[1:])}
return output
class LvmCommand:
'''Класс для работы с командой lvm и выполнения с ее помощью различных
действий.'''
@property
def lvm_command(self):
'''Возвращает кешированный путь к lvm.'''
return files.get_program_path('/sbin/lvm')
def get_physical_extent_size(self):
if not self.lvm_command:
return ''
pv_data = files.Process(self.lvm_command, 'lvmconfig', '--type',
'full', 'allocation/physical_extent_size')
if pv_data.success():
return pv_data.read()
return ''
def get_pvdisplay_output(self, option):
'''Получить вывод pv_display c указанной option.'''
if not self.lvm_command:
return ''
pv_data = files.Process(self.lvm_command, 'pvdisplay', '-C', '-o',
option, '--noh')
if pv_data.success():
return pv_data.read()
return False
def vg_scan(self):
'''Найти существующие в системе группы томов.'''
if self.lvm_command:
return files.Process(self.lvm_command, 'vgscan').success()
return False
def vg_change(self):
'''Изменить атрибуты группы томов.'''
if self.lvm_command:
failed = files.Process('vgchange', '-ay').failed()
failed |= files.Process('vgchange', '--refresh').failed()
return not failed
return False
def lv_change(self, groups):
'''Изменить атрибуты логического тома.'''
if self.lvm_command:
failed = False
for group in groups:
failed |= files.Process('lvchange', '-ay', group).failed()
failed |= files.Process('lvchange', '--refresh',
group).failed()
return failed
return False
def execute(self, *command):
'''Выполнить указанную LVM команду.'''
if self.lvm_command:
return files.Process(self.lvm_command, *command).success()
return False
def double_execute(self, *command):
'''Выполнить дважды указанную LVM команду.'''
if not self.execute(*command):
sleep(2)
return self.execute(*command)
return False
def remove_lv(self, vg, lv):
'''Удалить указанный логических томов из системы.'''
return self.double_execute('lvremove', '{0}/{1}'.format(vg, lv), '-f')
def remove_vg(self, vg):
'''Удалить указанную группу томов из системы.'''
return self.double_execute('vgremove', vg, '-f')
def remove_pv(self, pv):
'''Удалить LVM метку с физического тома.'''
return self.double_execute('pvremove', pv, '-ffy')
class Lvm(Cachable):
'''Класс для получения информации о lvm.'''
def __init__(self, lvm_command):
super().__init__()
self.lvm_command = lvm_command
def get_pvdisplay_full(self):
'''Получить полный вывод команды pvdisplay.'''
get_pvdisplay_output = self.get_pvdisplay_output(
'vg_name,lv_name,pv_name'
).strip()
if get_pvdisplay_output:
output = (line.split() for line in
get_pvdisplay_output.split('\n'))
return [tuple(y.strip() for y in x)
for x in output if x and len(x) == 3]
@property
def get_volume_groups(self):
'''Получить имеющиеся в системе группы томов.'''
return sorted({vg for vg, lv, pv in self.get_pvdisplay_full()})
@Cachable.method_cached()
def get_physical_extent_size(self):
'''Получить размер физического диапазона.'''
return self.lvm_command.get_physical_extent_size()
@Cachable.method_cached()
def get_pvdisplay_output(self, option):
'''Получить с помощью pvdisplay информацию .'''
return self.lvm_command.get_pvdisplay_output(option)
def get_used_partitions(self, vg_condition, lv_condition):
'''Получить испоьзуемые разделы.'''
return list(sorted({part for vg, lv, part in self.get_pvdisplay_full()
if vg == vg_condition and lv == lv_condition}))
def refresh(self):
if not os.environ.get('EBUILD_PHASE', False):
self.lvm_command.vg_scan()
self.lvm_command.vg_change()
self.lvm_command.lv_change(lvm)
def remove_lv(self, vg, lv):
return self.lvm_command.remove_lv(vg, lv)
def remove_vg(self, vg):
return self.lvm_command.remove_vg(vg)
def remove_pv(self, pv):
return self.lvm_command.remove_pv(pv)
class MdadmCommand:
'''Класс для работы с командой mdadm.'''
@property
def mdadm_command(self):
'''Возвращает кешированный путь к mdamd.'''
return files.get_program_path('/sbin/mdadm')
def stop_raid(self, raid_device):
'''Остановить устройство из RAID-массива.'''
if not self.mdadm_command:
return False
return files.Process(self.mdadm_command, '-S', raid_device).success()
def zero_superblock(self, device):
'''Затереть superblock для составляющей RAID-массива.'''
if not self.mdadm_command:
return False
return files.Process(self.mdadm_command, '--zero_superblock',
device).success()
class RAID:
'''Класс для работы с RAID-массивами.'''
def __init__(self, mdadm_command):
self.mdadm_command = mdadm_command
def get_devices_info(self, raid_device):
'''Получить информацию о RAID-массиве.'''
device = udev.get_device_info(path=raid_device)
if udev.is_raid(device):
for raid_block in sysfs.glob(raid_device, 'md/dev-*', 'block'):
yield udev.get_device_info(sysfs.join_path(raid_block))
def get_devices(self, raid_device, path_name='DEVPATH'):
'''Получить устройства /dev, из которых состоит RAID-массив.
Не возвращает список этих устройств для раздела сделанного для RAID
(/dev/md0p1).'''
for device_info in self.get_devices_info(raid_device):
device_name = device_info.get(path_name, '')
if device_name:
yield device_name
def get_devices_syspath(self, raid_device):
'''Получить sysfs пути устройств, из которых состоит RAID-массив.'''
for device in self.get_devices(raid_device, path_name='DEVPATH'):
yield device
def remove_raid(self, raid_name):
'''Удалить RAID-массив.'''
raid_parts = list(self.get_devices(udev.get_syspath(name=raid_name)))
failed = False
failed |= not (self.mdadm_command.stop_raid(raid_name) or
self.mdadm_command.stop_raid(raid_name))
for device in raid_parts:
failed |= not (self.mdadm_command.zero_superblock(device) or
self.mdadm_command.zero_superblock(device))
return not failed
class DeviceFS(GenericFS):
'''Базовый класс для классов предназначенных для работы с sysfs и /dev'''
def __init__(self, filesystem=None):
if isinstance(filesystem, GenericFS):
self.filesystem = filesystem
else:
self.filesystem = files.RealFS('/')
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
return output
def exists(self, *paths):
return self.filesystem.exists(self.join_path(*paths))
def read(self, *paths):
return self.filesystem.read(self.join_path(*paths))
def realpath(self, *paths):
return self.filesystem.realpath(self.join_path(*paths))
def write(self, *args):
data = args[-1]
paths = args[:-1]
self.filesystem.write(self.join_path(*paths), data)
def listdir(self, *paths, full_path=False):
return self.filesystem.listdir(self.join_path(*paths),
full_path=full_path)
def glob(self, *paths):
for file_path in self.filesystem.glob(self.join_path(*paths)):
yield file_path
class SysFS(DeviceFS):
'''Класс для работы с sysfs.'''
BASE_DIRECTORY = '/sys'
PATH = {'firmware': 'firmware',
'efi': 'firmware/efi',
'efivars': 'firmware/efi/efivars',
'classnet': 'class/net',
'input': 'class/input',
'block': 'block',
'dmi': 'class/dmi/id',
'module': 'module',
'block_scheduler': 'queue/scheduler'}
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
if output.startswith(self.BASE_DIRECTORY):
return output
else:
return os.path.join(self.BASE_DIRECTORY, output[1:])
def glob(self, *paths):
for _path in self.filesystem.glob(self.join_path(*paths)):
yield _path[len(self.BASE_DIRECTORY):]
class DevFS(DeviceFS):
'''Класс для работы с /dev.'''
BASE_DIRECTORY = '/dev'
def join_path(self, *paths):
output = os.path.join('/', *(_path[1:] if _path.startswith('/')
else _path for _path in paths))
if output.startswith(self.BASE_DIRECTORY):
return output
else:
return os.path.join(self.BASE_DIRECTORY, output[1:])
def glob(self, *paths):
for _path in self.filesystem.glob(self.join_path(*paths)):
yield _path[len(self.BASE_DIRECTORY):]
class UdevAdmNull:
def info_property(self, path=None, name=None):
return {}
def info_export(self):
return ''
def settle(self, timeout=15):
pass
def trigger(self, subsystem=None):
pass
@property
def broken(self):
return False
class UdevAdmCommand(UdevAdmNull):
def __init__(self):
self.first_run = True
@property
def udevadm_cmd(self):
return files.get_program_path('/sbin/udevadm')
@property
def broken(self):
return not bool(self.udevadm_cmd)
def info_property(self, path=None, name=None):
if self.first_run:
self.trigger("block")
self.settle()
self.first_run = False
if name is not None:
type_query = "--name"
value = name
else:
type_query = "--path"
value = path
udevadm_output = files.Process(self.udevadm_cmd, "info",
"--query", "property",
type_query, value).read_lines()
output_items = []
for line in udevadm_output:
if '=' in line:
output_items.append(line.partition('=')[0::2])
return dict(output_items)
def info_export(self):
return files.Process(self.udevadm_cmd, 'info', '-e').read().strip()
def trigger(self, subsystem=None):
if subsystem:
files.Process(self.udevadm_cmd, 'trigger', '--subsystem-match',
subsystem).success()
else:
files.Process(self.udevadm_cmd, 'trigger').success()
def settle(self, timeout=15):
files.Process(self.udevadm_cmd, 'settle', '--timeout={}'.
format(timeout)).success()
class Udev:
'''Класс возвращающий преобразованную или кэшированную информацию о системе
из udev.'''
def __init__(self, udevadm=None):
self.path_cache = {}
self.name_cache = {}
if isinstance(udevadm, UdevAdmCommand):
self.udevadm = udevadm
else:
self.udevadm = UdevAdmCommand()
if self.udevadm.broken:
self.udevadm = UdevAdmNull()
def clear_cache(self):
self.path_cache = {}
self.name_cache = {}
self.udevadm = UdevAdmCommand()
def get_device_info(self, path=None, name=None):
if name is not None:
cache = self.name_cache
value = devfs.realpath(name)
name = value
else:
cache = self.path_cache
value = sysfs.realpath(path)
path = value
if value not in cache:
data = self.udevadm.info_property(path, name)
devname = data.get('DEVNAME', '')
devpath = data.get('DEVPATH', '')
if devname:
self.name_cache[devname] = data
if devpath:
devpath = '/sys{}'.format(devpath)
self.path_cache[devname] = data
return data
else:
return cache[value]
def get_block_devices(self):
for block in sysfs.glob(sysfs.PATH['block'], '*'):
yield block
blockname = os.path.basename(block)
for part in sysfs.glob(block, '{}*'.format(blockname)):
yield part
def syspath_to_devname(self, devices, drop_empty=True):
for device_path in devices:
info = self.get_device_info(path=device_path)
if 'DEVNAME' in info:
yield info['DEVNAME']
elif not drop_empty:
yield ''
def devname_to_syspath(self, devices, drop_empty=True):
for device_name in devices:
info = self.get_device_info(name=device_name)
if 'DEVPATH' in info:
yield info['DEVPATH']
elif not drop_empty:
yield ''
def get_devname(self, path=None, name=None, fallback=None):
if fallback is None:
if name:
fallback = name
else:
fallback = ''
return self.get_device_info(path, name).get('DEVNAME', fallback)
def get_syspath(self, path=None, name=None, fallback=None):
if fallback is None:
if path:
fallback = path
else:
fallback = ''
return self.get_device_info(path, name).get('DEVPATH', fallback)
def refresh(self, trigger_only=False):
if not trigger_only:
self.clear_cache()
files.quite_unlink('/etc/blkid.tab')
if not os.environ.get('EBUILD_PHASE'):
self.udevadm.trigger(subsystem='block')
self.udevadm.settle(15)
def is_cdrom(self, udev_data):
return udev_data.get('ID_CDROM', '') == '1'
def is_device(self, udev_data):
if 'DEVPATH' not in udev_data:
return False
return (sysfs.exists(udev_data.get('DEVPATH', ''), 'device') and
udev_data.get('DEVTYPE', '') == 'disk')
def is_raid(self, udev_data):
return (udev_data.get('MD_LEVEL', '').startswith('raid') and
udev_data.get('DEVTYPE', '') == 'disk')
def is_raid_partition(self, udev_data):
return (udev_data.get('MD_LEVEL', '').startswith('raid') and
udev_data.get('DEVTYPE', '') == 'partition')
def is_lvm(self, udev_data):
return 'DM_LV_NAME' in udev_data and 'DM_VG_NAME' in udev_data
def is_partition(self, udev_data):
return udev_data.get('DEVTYPE', '') == 'partition'
def is_block_device(self, udev_data):
return udev_data.get('SUBSYSTEM', '') == 'block'
def get_device_type(self, path=None, name=None):
info = self.get_device_info(path, name)
syspath = info.get('DEVPATH', '')
if self.is_cdrom(info):
return 'cdrom'
if self.is_device(info):
return 'disk'
if self.is_partition(info):
return '{}-partition'.format(
self.get_device_type(os.path.dirname(syspath)))
if self.is_raid(info):
for raid_device in raid.get_devices_syspath(syspath):
return '{}-{}'.format(self.get_device_type(raid_device),
info['MD_LEVEL'])
else:
return 'loop'
if self.is_lvm(info):
for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'],
info['DM_LV_NAME']):
return '{}-lvm'.format(self.get_device_type(name=lv_device))
if self.is_block_device(info):
return 'loop'
return ''
def get_partition_type(self, path=None, name=None):
info = self.get_device_info(path, name)
if info.get('ID_PART_ENTRY_SCHEME') == 'dos':
part_id = info.get('ID_PART_ENTRY_TYPE', '')
part_number = info.get('ID_PART_ENTRY_NUMBER', '')
if part_id and part_number:
if part_id == '0x5':
return 'extended'
elif int(part_number) > 4:
return 'logical'
else:
return 'primary'
return info.get('ID_PART_TABLE_TYPE', '')
def _get_disk_devices(self, path=None, name=None):
info = udev.get_device_info(path=path, name=name)
syspath = info.get('DEVPATH', '')
if syspath:
# real device
if self.is_device(info):
yield True, info.get('DEVNAME', '')
# partition
elif self.is_partition(info):
for device in self._get_disk_devices(
path=os.path.dirname(syspath)):
yield device
# md raid
elif udev.is_raid(info):
yield False, info.get('DEVNAME', '')
for raid_device in sorted(raid.get_devices_syspath(syspath)):
for device in self._get_disk_devices(path=raid_device):
yield device
# lvm
elif udev.is_lvm(info):
yield False, info.get('DEVNAME', '')
for lv_device in lvm.get_used_partitions(info['DM_VG_NAME'],
info['DM_LV_NAME']):
for device in self._get_disk_devices(name=lv_device):
yield device
def get_disk_devices(self, path=None, name=None):
return sorted({
device for real_device, device in
self._get_disk_devices(path, name) if real_device
})
def get_all_base_devices(self, path=None, name=None):
try:
devices = (device for real_device, device in
self._get_disk_devices(path, name)
if not device.startswith('/dev/loop'))
if not self.is_partition(self.get_device_info(path, name)):
next(devices)
return list(unique(devices))
except StopIteration:
return []
sysfs = SysFS()
devfs = DevFS()
udev = Udev(UdevAdmCommand())
lvm = Lvm(LvmCommand())
raid = RAID(MdadmCommand())