You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/device.py

764 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import re
import os
from os import path
from calculate.lib.utils.tools import (Cachable, GenericFs, unique)
import files
from time import sleep
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
def getUUIDDict(revers=False, devs=()):
"""Get dict UUID -> dev"""
blkidProcess = files.process("/sbin/blkid", "-s", "UUID", "-c", "/dev/null",
*list(devs))
reSplit = re.compile('^([^:]+):.*UUID="([^"]+)"', re.S)
searched = (x.groups() for x in map(reSplit.search, blkidProcess) if x)
mapping = (("UUID=%s" % uuid, udev.get_devname(name=dev, fallback=dev))
for dev, uuid in searched)
if revers:
return {v: k for k, v in mapping}
else:
return {k: v for k, v in mapping}
def detectDeviceForPartition(dev):
"""Detect parent device for partition by udev and return property"""
prop = udev.get_device_info(name=dev)
if prop.get('DEVTYPE', '') != 'partition':
return ''
parentpath = path.dirname(prop.get('DEVPATH', ''))
if parentpath:
devProp = udev.get_device_info(path=parentpath)
return devProp.get('DEVNAME', '')
return None
def countPartitions(devname):
"""Count partition for specified device"""
syspath = udev.get_device_info(name=devname).get('DEVPATH', '')
if not syspath:
return 0
device_name = path.basename(syspath)
return len([x for x in sysfs.listdir(syspath) if x.startswith(device_name)])
class LvmCommand(object):
@property
def lvm_cmd(self):
return files.getProgPath('/sbin/lvm')
def get_physical_extent_size(self):
if not self.lvm_cmd:
return ""
pvdata = files.process(self.lvm_cmd, "lvmconfig", "--type", "full",
"allocation/physical_extent_size")
if pvdata.success():
return pvdata.read().strip().rpartition("=")[-1]
return ""
def get_pvdisplay_output(self, output):
if not self.lvm_cmd:
return ""
pvdata = files.process(self.lvm_cmd, "pvdisplay", "-C", "-o",
output, "--noh")
if pvdata.success():
return pvdata.read()
return ""
def vgscan(self):
if self.lvm_cmd:
return files.process(self.lvm_cmd, "vgscan").success()
return False
def vgchange(self):
failed = True
if self.lvm_cmd:
failed = files.process("vgchange", '-ay').failed()
failed |= files.process("vgchange", '--refresh').failed()
return not failed
def lvchange(self, groups):
failed = True
if self.lvm_cmd:
failed = False
for group in groups:
failed |= files.process("lvchange", '-ay', group).failed()
failed |= files.process("lvchange", '--refresh', group).failed()
return not failed
def execute(self, *command):
if self.lvm_cmd:
return files.process(self.lvm_cmd, *command).success()
return False
def double_execute(self, *command):
if not self.execute(*command):
sleep(2)
return self.execute(*command)
return False
def remove_lv(self, vg, lv):
return self.double_execute("lvremove", "%s/%s" % (vg, lv), "-f")
def remove_vg(self, vg):
return self.double_execute("vgremove", vg, "-f")
def remove_pv(self, pv):
return self.double_execute("pvremove", pv, "-ffy")
class Lvm(Cachable):
"""
LVM информация
"""
def __init__(self, commander):
super(Lvm, self).__init__()
self.commander = commander
@property
def groups(self):
return sorted({vg for vg, lv, pv in self.pvdisplay_full()})
@Cachable.methodcached()
def get_pesize(self):
return self.commander.get_physical_extent_size()
@Cachable.methodcached()
def pvdisplay_output(self, output):
return self.commander.get_pvdisplay_output(output)
def pvdisplay_full(self):
data = self.pvdisplay_output("vg_name,lv_name,pv_name").strip()
if data:
out = (x.split() for x in data.split("\n"))
return [tuple(y.strip() for y in x)
for x in out if x and len(x) == 3]
return []
def used_partitions(self, vg_eq, lv_eq):
return list(sorted({part for vg, lv, part in self.pvdisplay_full()
if vg == vg_eq and lv_eq == lv}))
def refresh(self):
"""Run command which refresh information about LVM"""
if not os.environ.get('EBUILD_PHASE'):
self.commander.vgscan()
self.commander.vgchange()
self.commander.lvchange(lvm.groups)
def remove_lv(self, vg, lv):
return self.commander.remove_lv(vg, lv)
def remove_vg(self, vg):
return self.commander.remove_vg(vg)
def remove_pv(self, pv):
return self.commander.remove_pv(pv)
def lspci(filtername=None, shortInfo=False):
"""Get hash of lspci, filtred by filtername. If shortInfo, then
type,vendor and name get only first word
pcidata(domain,bus,slot,func)
'type'
'vendor'
'name'"""
reData = re.compile(r'(\S+)\s"([^"]+)"\s+"([^"]+)"\s+"([^"]+)"', re.S)
if filtername:
if hasattr(filtername, '__call__'):
filterfunc = filtername
else:
filterfunc = lambda x: filtername in x
else:
filterfunc = lambda x: x
if shortInfo:
sfunc = lambda x: x.partition(" ")[0]
else:
sfunc = lambda x: x
lspciProg = files.checkUtils('/usr/sbin/lspci')
processLsPci = files.process(lspciProg, "-m")
retData = {}
for device in map(lambda x: x.groups(),
filter(lambda x: x,
map(reData.search,
filter(filterfunc,
processLsPci)))):
retData[device[0]] = {'type': sfunc(device[1]),
'vendor': sfunc(device[2]),
'name': sfunc(device[3])}
return retData
class UdevAdmNull(object):
def info_property(self, path=None, name=None):
return {}
def info_export(self):
return ""
def settle(self, timeout=15):
pass
def trigger(self, subsystem=None):
pass
@property
def broken(self):
return False
class UdevAdm(UdevAdmNull):
"""
Объект взаимодействия с udevadm
"""
def __init__(self):
self.first_run = True
@property
def udevadm_cmd(self):
return files.getProgPath('/sbin/udevadm')
@property
def broken(self):
return bool(not self.udevadm_cmd)
def info_property(self, path=None, name=None):
if self.first_run:
self.trigger("block")
self.settle()
self.first_run = False
if name is not None:
type_query = "--name"
value = name
else:
type_query = "--path"
value = path
udev_output = files.process(self.udevadm_cmd, "info", "--query",
"property",
type_query, value).read().split("\n")
return dict(x.partition("=")[0::2] for x in udev_output if "=" in x)
def info_export(self):
return files.process(self.udevadm_cmd, "info", "-e").read().strip()
def trigger(self, subsystem=None):
if subsystem:
files.process(self.udevadm_cmd,
"trigger", "--subsystem-match", subsystem).success()
else:
files.process(self.udevadm_cmd, "trigger").success()
def settle(self, timeout=15):
files.process(self.udevadm_cmd, "settle",
"--timeout=%d" % timeout).success()
class Udev(object):
"""
Объект возвращающий преобразованную или кэшированную информацию о системе
из udev
"""
def clear_cache(self):
self.path_cache = {}
self.name_cache = {}
self.udevadm = UdevAdm()
def __init__(self, udevadm=None):
self.path_cache = {}
self.name_cache = {}
if isinstance(udevadm, UdevAdm):
self.udevadm = udevadm
else:
self.udevadm = UdevAdm()
if self.udevadm.broken:
self.udevadm = UdevAdmNull()
def get_device_info(self, path=None, name=None):
"""Get device info by syspath of name"""
if name is not None:
cache = self.name_cache
value = devfs.realpath(name)
name = value
else:
cache = self.path_cache
value = sysfs.realpath(path)
path = value
if value not in cache:
data = self.udevadm.info_property(path, name)
devname = data.get('DEVNAME', '')
devpath = data.get('DEVPATH', '')
if devpath:
devpath = "/sys%s" % devpath
if devname:
self.name_cache[devname] = data
if devpath:
self.path_cache[devname] = data
return data
else:
# print "fromCache",keyCache
return cache[value]
def get_block_devices(self):
"""
Получить все устройства SUBSYSTEM=block устройства
"""
for block in sysfs.glob(sysfs.Path.Block, "*"):
yield block
blockname = path.basename(block)
for part in sysfs.glob(block, "%s*" % blockname):
yield part
#block_devices = {sysfs.realpath(x)[len(sysfs.base_dn):]: x
# for x in sysfs.listdir("/block/*", fullpath=True)}
#block_entries = (x for x in self.udevadm.info_export().split("\n\n")
# if "SUBSYSTEM=block" in x)
#for block in block_entries:
# for entry in (x[3:] for x in block.split("\n")
# if x.startswith("P:")):
# if "/block/" in entry:
# yield entry[entry.rindex("/block/"):]
# break
def syspath_to_devname(self, it, dropempty=True):
"""
Преобразовать sysfs имя устройства в /dev
"""
for x in it:
info = self.get_device_info(path=x)
if "DEVNAME" in info:
yield info['DEVNAME']
elif not dropempty:
yield ""
def devname_to_syspath(self, it, dropempty=True):
"""
Преобразовать /dev имя устройства в sysfs
"""
for x in it:
info = self.get_device_info(name=x)
if "DEVPATH" in info:
yield info['DEVPATH']
elif not dropempty:
yield ""
def get_devname(self, path=None, name=None, fallback=None):
if fallback is None:
if name:
fallback = name
else:
fallback = ""
return self.get_device_info(path, name).get("DEVNAME", fallback)
def get_syspath(self, path=None, name=None, fallback=None):
if fallback is None:
if path:
fallback = path
else:
fallback = ""
return self.get_device_info(path, name).get("DEVPATH", fallback)
def refresh(self, trigger_only=False):
"""
Run command which refresh information about device in udev
"""
if not trigger_only:
self.clear_cache()
files.quite_unlink("/etc/blkid.tab")
if not os.environ.get('EBUILD_PHASE'):
self.udevadm.trigger(subsystem="block")
self.udevadm.settle(15)
def is_cdrom(self, prop):
return prop.get("ID_CDROM", '') == "1"
def is_device(self, prop):
if "DEVPATH" not in prop:
return False
return (sysfs.exists(prop.get("DEVPATH", ''), "device") and
prop.get('DEVTYPE', "") == "disk")
def is_raid(self, prop):
return (prop.get("MD_LEVEL", "").startswith("raid") and
prop.get("DEVTYPE", "") == "disk")
def is_raid_partition(self, prop):
return (prop.get("MD_LEVEL", "").startswith("raid") and
prop.get("DEVTYPE", "") == "partition")
def is_lvm(self, prop):
return "DM_LV_NAME" in prop and "DM_VG_NAME" in prop
def is_partition(self, prop):
return prop.get("DEVTYPE") == "partition"
def is_block_device(self, prop):
return prop.get("SUBSYSTEM") == "block"
def get_device_type(self, path=None, name=None):
info = self.get_device_info(path, name)
syspath = info.get("DEVPATH", '')
if self.is_cdrom(info):
return "cdrom"
if self.is_device(info):
return "disk"
if self.is_partition(info):
return "%s-partition" % self.get_device_type(
os.path.dirname(syspath))
if self.is_raid(info):
for rd in raid.devices_syspath(syspath):
return "%s-%s" % (self.get_device_type(rd), info["MD_LEVEL"])
else:
return "loop"
if self.is_lvm(info):
for lvdev in lvm.used_partitions(info["DM_VG_NAME"],
info["DM_LV_NAME"]):
return "%s-lvm" % self.get_device_type(name=lvdev)
if self.is_block_device(info):
return "loop"
return ""
def get_partition_type(self, path=None, name=None):
info = self.get_device_info(path, name)
"""Get type of dos part table (primary,extended or logical)"""
if info.get('ID_PART_ENTRY_SCHEME') == 'dos':
partId = info.get('ID_PART_ENTRY_TYPE', '')
partNumber = info.get('ID_PART_ENTRY_NUMBER', '')
if partId and partNumber:
if partId == "0x5":
return "extended"
elif int(partNumber) > 4:
return "logical"
else:
return "primary"
return info.get('ID_PART_TABLE_TYPE', '')
def _get_disk_devices(self, path=None, name=None):
"""
Возвращает список базовых устройств (признак базовое ли устройство,
название устройства)
:param path:
:param name:
:return:
"""
info = udev.get_device_info(path=path, name=name)
syspath = info.get("DEVPATH", '')
if syspath:
# real device
if self.is_device(info):
yield True, info.get("DEVNAME", "")
# partition
elif self.is_partition(info):
for x in self._get_disk_devices(path=os.path.dirname(syspath)):
yield x
# md raid
elif udev.is_raid(info):
yield False, info.get("DEVNAME", "")
for rd in sorted(raid.devices_syspath(syspath)):
for x in self._get_disk_devices(path=rd):
yield x
# lvm
elif udev.is_lvm(info):
yield False, info.get("DEVNAME", "")
for lvdev in lvm.used_partitions(info["DM_VG_NAME"],
info["DM_LV_NAME"]):
for x in self._get_disk_devices(name=lvdev):
yield x
def get_disk_devices(self, path=None, name=None):
"""Get real parent device by partition,lvm,mdraid"""
return sorted({
dev
for realdevice, dev in self._get_disk_devices(path, name)
if realdevice
})
def get_all_base_devices(self, path=None, name=None):
"""
Получить все устройства (включая RAID и LVM) из которого состоит
указанное устройство, исключая loop устройства
"""
try:
devs = (dev
for realdevice, dev in self._get_disk_devices(path, name)
if not dev.startswith("/dev/loop"))
if not self.is_partition(self.get_device_info(path, name)):
next(devs)
return list(unique(devs))
except StopIteration:
return []
def humanreadableSize(size, compsize=0):
"""
Human readable size (1024 -> 1K and etc)
"""
try:
size = int(size)
except ValueError:
return ""
suffix = (((1024 ** 0), "", False),
((1024 ** 1), "K", False),
((1024 ** 2), "M", False),
((1024 ** 3), "G", True),
((1024 ** 4), "T", True),
((1024 ** 5), "P", True))
suffix = filter(lambda x: abs(size-compsize) > x[0], suffix)
if suffix:
suffix = suffix[-1]
printSize = int(size / (float(suffix[0]) / 10))
printSizeTail = printSize % 10
printSize /= 10
if suffix[2] and printSizeTail:
return "%d.%d%s" % (printSize, printSizeTail, suffix[1])
else:
return "%d%s" % (printSize, suffix[1])
return str(size)
def getPartitionSize(syspath=None, name=None, inBytes=False):
"""
Получить размер раздела
"""
dev = udev.get_syspath(path=syspath, name=name)
SECTORSIZE = 512
if dev and sysfs.exists(dev, "size"):
try:
size = int(sysfs.read(dev, "size").strip()) * SECTORSIZE
except ValueError:
return ""
if inBytes:
return str(size)
else:
return humanreadableSize(size)
return ""
class MdadmCommand(object):
@property
def mdadm_cmd(self):
return files.getProgPath('/sbin/mdadm')
def stop_raid(self, devraid):
if not self.mdadm_cmd:
return ""
return files.process(self.mdadm_cmd, "-S", devraid).success()
def zero_superblock(self, dev):
if not self.mdadm_cmd:
return ""
return files.process(self.mdadm_cmd, "--zero-superblock", dev).success()
class Raid(object):
def __init__(self, commander):
self.commander = commander
def devices_info(self, raid):
prop = udev.get_device_info(path=raid)
if udev.is_raid(prop):
for rdblock in sysfs.glob(raid, "md/dev-*", "block"):
yield udev.get_device_info(sysfs.syspath(rdblock))
def devices_syspath(self, raid):
"""
Получить (syspath) устройств из которых состоит raid
:param raid:
:return:
"""
for x in self.devices(raid, pathname="DEVPATH"):
yield x
def devices(self, raid, pathname="DEVNAME"):
"""
Получить устройства /dev из которых состоит RAID, не возвращает
список этих устройств для раздела сделанного для RAID (/dev/md0p1)
:param raid:
:param pathname:
:return:
"""
for device_info in self.devices_info(raid):
devname = device_info.get(pathname, "")
if devname:
yield devname
def remove_raid(self, raidname):
# итератор не подходит, так как после остановки RAID
# не получится узнать его диски
raidparts = list(self.devices(udev.get_syspath(name=raidname)))
failed = False
failed |= not (self.commander.stop_raid(raidname) or
self.commander.stop_raid(raidname))
for dev in raidparts:
failed |= not (self.commander.zero_superblock(dev) or
self.commander.zero_superblock(dev))
return not failed
def loadEfiVars():
"""
Load module with efivars
"""
import mount
modprobe = files.getProgPath('/sbin/modprobe')
if not sysfs.exists(sysfs.Path.Efi) and modprobe:
files.process(modprobe, 'efivars').success()
mount_cmd = files.getProgPath('/bin/mount')
mtab = mount.Mounts()
efivars_path = sysfs.syspath(sysfs.Path.Efivars)
if mtab.exists(efivars_path) and mtab.readonly(efivars_path):
files.process(mount_cmd, "-o", "rw,remount", efivars_path).success()
mtab.rebuildCache()
class DeviceFs(object):
"""
Файловая система, используемая для описания /sys, /dev
"""
def __init__(self, fs=None):
if isinstance(fs, GenericFs):
self.fs = fs
else:
self.fs = files.RealFs("/")
def pathjoin(self, *dns):
res = path.join("/", *(dn[1:] if dn.startswith("/") else dn
for dn in dns))
return res
def exists(self, *dn):
return self.fs.exists(self.pathjoin(*dn))
def read(self, *fn):
return self.fs.read(self.pathjoin(*fn))
def realpath(self, *fn):
return self.fs.realpath(self.pathjoin(*fn))
def write(self, *args):
fn = args[:-1]
data = args[-1]
self.fs.write(self.pathjoin(*fn), data)
def listdir(self, *dn, **kw):
fullpath = kw.get("fullpath", False)
return self.fs.listdir(self.pathjoin(*dn), fullpath=fullpath)
def glob(self, *dn):
for fn in self.fs.glob(self.pathjoin(*dn)):
yield fn
class Sysfs(DeviceFs):
"""
Объект взаимодействующий с sysfs
"""
base_dn = "/sys"
class Path(object):
Firmware = "firmware"
Efi = path.join(Firmware, "efi")
Efivars = path.join(Efi, "efivars")
ClassNet = "class/net"
Input = "class/input"
Block = "block"
Dmi = "class/dmi/id"
Drm = "class/drm"
Module = "module"
BlockScheduler = "queue/scheduler"
def syspath(self, *dns):
"""
Получить путь до указанного устройства.
:param dns:
:return:
"""
res = path.join("/", *(dn[1:] if dn.startswith("/") else dn
for dn in dns))
if res.startswith(self.base_dn):
return res
else:
return path.join(self.base_dn, res[1:])
def glob(self, *fn):
for fn in self.fs.glob(self.pathjoin(*fn)):
yield fn[len(self.base_dn):]
pathjoin = syspath
class Devfs(DeviceFs):
"""
Объект взаимодействующий с devfs
"""
base_dn = "/dev"
sysfs = Sysfs()
devfs = Devfs()
udev = Udev()
lvm = Lvm(LvmCommand())
raid = Raid(MdadmCommand())
class HwinfoError(Exception):
pass
class HwinfoTimeout(HwinfoError):
pass
class HwinfoNotExists(HwinfoError):
pass
class Hwinfo(object):
"""
Объект получения информации из hwinfo
"""
command_timeout = 20
@property
def hwinfo_cmd(self):
return files.getProgPath("/usr/sbin/hwinfo")
def framebuffer(self):
"""
Получить вывод hwinfo --framebuffer
"""
return self.run("--framebuffer")
def run(self, *command):
if not self.hwinfo_cmd:
raise HwinfoNotExists()
try:
return files.process(*[self.hwinfo_cmd] + list(command),
**{'timeout':self.command_timeout}).read()
except files.ProcessTimeout:
raise HwinfoTimeout()
def resolutions(self):
"""
Получить допустимые разрешения фреймбуффера
"""
re_modes = re.compile("^\s+Mode 0x[0-9a-f]+:\s+(\d+x\d+)\s",re.M)
return tuple(sorted(unique(re_modes.findall(self.framebuffer()))))