You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/debug.py

200 lines
5.2 KiB

# -*- coding: utf-8 -*-
# Copyright 2017 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from calculate.lib.utils.device import (Sysfs, UdevAdm, sysfs, Devfs, Udev, Lvm,
UdevAdmNull, LvmCommand, MdadmCommand,
Raid)
from calculate.lib.utils.files import writeFile
from calculate.lib.utils.vfs import (VFSKeeper, VFSSysfsImporter,
SafeVFS,
VFSDevfsImporter)
import os
import re
class VirtualUdevAdm(UdevAdmNull):
"""
Виртуальный udevadm
"""
subsystems = ("block",)
def __init__(self, data, parent):
super(VirtualUdevAdm, self).__init__()
self.export_data = data
self.path_map = {}
self.name_map = {}
self.prepare_cache()
self.parent = parent
def _filter_udev_data(self, info):
return info.get("SUBSYSTEM") in self.subsystems
def prepare_cache(self):
re_envblock = re.compile("^E: ([^=]+)=(.*)$", re.M)
for block in (x for x in self.export_data.split("\n\n") if x.strip()):
d = {x.group(1): x.group(2) for x in re_envblock.finditer(block)}
if not self._filter_udev_data(d):
continue
if "DEVNAME" in d:
self.name_map[d["DEVNAME"]] = d
if "DEVPATH" in d:
self.path_map[d["DEVPATH"]] = d
def info_property(self, path=None, name=None):
if path is not None:
path = self.parent.sysfs.realpath(path)
cache_path = os.path.join("/", os.path.relpath(path, sysfs.base_dn))
return self.path_map.get(cache_path, {})
else:
name = self.parent.devfs.realpath(name)
return self.name_map.get(name, {})
def info_export(self):
return self.export_data
def trigger(self, subsystem=None):
pass
def settle(self, timeout=15):
pass
class VirtualLvmCommand(object):
def __init__(self, output):
self.output = output
def get_physical_extent_size(self):
return "4096"
def get_pvdisplay_output(self, output):
return self.output
def vgscan(self):
return True
def vgchange(self):
return True
def lvchange(self, groups):
return True
def execute(self, *command):
return True
def double_execute(self, *command):
return True
def remove_lv(self, vg, lv):
return True
def remove_vg(self, vg):
return True
def remove_pv(self, pv):
return True
class VirtualMdadmCommand(object):
def zero_superblock(self, devices):
pass
def stop_raid(self, devraid):
pass
class VirtualDeviceModule(object):
def __init__(self):
self.sysfs = Sysfs()
self.devfs = Devfs()
self.udev = Udev()
self.lvm = Lvm(LvmCommand())
self.raid = Raid(MdadmCommand())
def set_sysfs(self, f):
self.sysfs = Sysfs(SafeVFS(VFSKeeper.load(f)))
def set_devfs(self, f):
self.devfs = Devfs(SafeVFS(VFSKeeper.load(f)))
def set_udev(self, f):
self.udev.udevadm = VirtualUdevAdm(f.read(), self)
def set_raid(self):
self.raid = Raid(VirtualMdadmCommand())
def set_lvm(self, f):
if f is None:
self.lvm = Lvm(VirtualLvmCommand(""))
else:
self.lvm = Lvm(VirtualLvmCommand(f.read()))
class DumpError(Exception):
pass
def dump_real_sysfs(fn):
"""
Сохранить снимок sysfs в файл
:param fn:
:return:
"""
try:
with writeFile(fn) as f:
VFSKeeper.save(f, VFSSysfsImporter().importfs())
except IOError as e:
raise DumpError(str(e))
def dump_real_udev(fn):
"""
Сохранить снимок udev в файл
:param fn:
:return:
"""
try:
with writeFile(fn) as f:
udevadm = UdevAdm()
udevadm.trigger("block")
f.write(udevadm.info_export())
except IOError as e:
raise DumpError(str(e))
def dump_real_devfs(fn):
"""
Сохранить снимок /dev в файл
:param fn:
:return:
"""
try:
with writeFile(fn) as f:
VFSKeeper.save(f, VFSDevfsImporter().importfs())
except IOError as e:
raise DumpError(str(e))
def dump_real_lvm(fn):
"""
Сохранить снимок /dev в файл
:param fn:
:return:
"""
try:
with writeFile(fn) as f:
lvm = Lvm(LvmCommand())
lvm.refresh()
f.write(lvm.pvdisplay_output("vg_name,lv_name,pv_name"))
except IOError as e:
raise DumpError(str(e))