You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/disk.py

2639 lines
94 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
import pty
import fcntl
from subprocess import Popen
from os import path
from itertools import *
from calculate.install.distr import (FlashDistributive, DistributiveError,
IsoDistributive)
from calculate.lib.datavars import (TableVariable, Variable, VariableError,
ReadonlyVariable, ReadonlyTableVariable,
SourceReadonlyVariable, VariableInterface,
HumanReadable)
import calculate.lib.utils.device as device
from calculate.lib.utils.device import (getPartitionSize,
humanreadableSize,
getUUIDDict)
from calculate.install.variables.autopartition import Sizes
from calculate.lib.utils.files import getProgPath
from calculate.lib.utils.mount import isMount, FStab, DiskSpace, Btrfs, \
BtrfsError, try_umount
from calculate.install.fs_manager import FileSystemManager
from calculate.lib.cl_lang import setLocalTranslate, _
from calculate.lib.variables.system import RootType
setLocalTranslate('cl_install3', sys.modules[__name__])
class DeviceHelper(VariableInterface):
rePassDevice = re.compile(r"^/block/(?!%s)" % r"|".join([r'sr', r'fd',
r'ram', r'loop']))
def getBlockDevices(self):
"""Get interest devices from sys block path"""
return list(filter(self.rePassDevice.search,
device.udev.get_block_devices()))
def separateDevice(self, dev):
"""
Separate device word and number on tuple
Using for sort. (Example: sda2 ("sda",2), md5p1 ("md",5,"p",1)
"""
return list(map(lambda x: int(x) if x.isdigit() else x,
re.findall(r'\d+|\D+', dev)))
def mapUdevProperty(self, var, prop, default):
"""Get each element from var through udev [prop]"""
return [device.udev.get_device_info(name=x).get(prop, default)
for x in self.Get(var)]
def getPerfectName(self, dev, defaultValue=None):
"""
Get dev name or human-readable lvm name
"""
info = device.udev.get_device_info(name=dev)
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
lvmDeviceName = '/dev/{vg}/{lv}'.format(vg=info['DM_VG_NAME'],
lv=info['DM_LV_NAME'])
if path.exists(lvmDeviceName):
return lvmDeviceName
if defaultValue is None:
return info.get('DEVNAME', '')
else:
return defaultValue
def getLvmName(self, dev):
"""
Get lvm name
"""
return self.getPerfectName(dev, defaultValue="")
#######################################################
# Devices variables
#######################################################
class VariableOsDeviceData(ReadonlyTableVariable):
"""
Information about disk devices
"""
type = 'table'
source = ['os_device_dev',
'os_device_table',
'os_device_type',
'os_device_parent',
'os_device_ssd_set',
'os_device_virtual_set',
'os_device_map',
'os_device_syspath',
'os_device_name',
'os_device_size',
'os_device_mbr',
'os_device_efi',
'os_device_fulltype']
class VariableOsDeviceInvalidator(ReadonlyVariable):
"""
Переменная используемая для автоматического сброса значений переменных
если во время работы программы произошли изменения среди блочных устройств
"""
master = None
def get(self):
"""Get device /dev name"""
if self.master is None and not self.Get('cl_ebuild_phase'):
try:
self.master, slave = pty.openpty()
except OSError:
raise VariableError('Failed to create PTY')
udevAdm = getProgPath('/sbin/udevadm')
self.monitor = Popen([udevAdm, "monitor", "--kernel",
"--subsystem-match=block"], stdout=slave,
close_fds=True)
os.close(slave)
fl = fcntl.fcntl(self.master, fcntl.F_GETFL)
fcntl.fcntl(self.master, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return "Device invalidator"
def close(self):
try:
if self.monitor:
self.monitor.kill()
self.monitor.wait()
except Exception:
pass
def refresh(self):
try:
if self.monitor:
res = os.read(self.master, 65535)
if res:
while len(res) == 65535:
res = os.read(self.master, 65535)
self.parent.Invalidate(self.name)
self.parent.Invalidate('os_install_disk_uuid')
self.parent.Invalidate('os_install_disk_partuuid')
self.parent.Invalidate('os_disk_dev')
device.udev.clear_cache()
except OSError as e:
pass
class VariableOsDeviceDev(DeviceHelper, ReadonlyVariable):
"""
Disk devices
"""
type = "list"
re_disk_raid = re.compile(r"^disk-.*-raid\d+$", re.I)
def init(self):
pass
def get(self):
"""Get device /dev name"""
self.Get('os_device_invalidator')
# get devices from block sys directories(discard mem,sr,loop and other)
devices = (x for x in self.getBlockDevices() if x.count('/') == 2)
devnames = device.udev.syspath_to_devname(
x for x in devices
if device.udev.is_device(device.udev.get_device_info(x)) or
self.re_disk_raid.match(device.udev.get_device_type(path=x)))
return list(sorted((x for x in devnames),
key=self.separateDevice))
class VariableOsDeviceFulltype(ReadonlyVariable):
"""
Полный тип
"""
type = "list"
def get(self):
"""Get device /dev name"""
return [
device.udev.get_device_type(x)
for x in self.Get('os_device_syspath')
]
class VariableOsDeviceType(ReadonlyVariable):
"""
Device type (hdd,cdrom,usb-flash)
"""
type = "list"
def getType(self, dev):
info = device.udev.get_device_info(name=dev)
if device.udev.is_raid(info):
return info["MD_LEVEL"]
device_name = path.basename(dev)
if device_name in self.usbdevices:
if device.sysfs.read(device.sysfs.Path.Block, device_name,
"removable").strip() == "1":
return "flash"
else:
return "usb-hdd"
else:
return "hdd"
def get(self):
# get usb device by '/dev/disk/by-id'(usb devices contain 'usb' in name)
diskIdPath = '/dev/disk/by-id'
if device.devfs.exists(diskIdPath):
self.usbdevices = \
map(lambda x: \
device.devfs.realpath(diskIdPath, x).rpartition('/')[2],
filter(lambda x: x.startswith('usb-'),
device.devfs.listdir(diskIdPath, fullpath=False)))
else:
self.usbdevices = []
return list(map(self.getType,
self.Get('os_device_dev')))
class VariableOsDeviceParent(ReadonlyVariable):
"""
Базовые устройства RAID массива
"""
type = "list"
def get(self):
"""Get disk parent"""
return [",".join(device.udev.get_all_base_devices(name=dev))
if "raid" in _type
else ""
for dev, _type in self.ZipVars(
'os_device_dev', 'os_device_type')]
class MbrEfiHelper(VariableInterface):
boottype = ""
def get_boot_partition(self, sysdevice):
basename = path.basename(sysdevice)
for devname in device.udev.syspath_to_devname(
device.sysfs.glob(sysdevice, "%s*" % basename),
dropempty=True):
partid = self.select('os_disk_id', os_disk_dev=devname,
limit=1)
if partid.upper() == self.boottype:
return devname
return ""
class VariableOsDeviceMbr(MbrEfiHelper, ReadonlyVariable):
"""
Разделы на устройстве, которые могут быть bios_boot
"""
type = "list"
boottype = "EF02"
def get(self):
def generator():
for dev, sysdevice, _type, table in self.ZipVars(
'os_device_dev', 'os_device_syspath', 'os_device_type',
'os_device_table'):
if "raid" in _type:
yield ""
elif table == "dos":
yield dev
else:
if self.get_boot_partition(sysdevice):
yield dev
else:
yield ""
return list(generator())
class VariableOsDeviceEfi(MbrEfiHelper, ReadonlyVariable):
"""
Разделы на устройстве, которые могут быть EFI
"""
boottype = "EF00"
type = "list"
def get(self):
def generator():
for dev, sysdevice, _type, table in self.ZipVars(
'os_device_dev', 'os_device_syspath', 'os_device_type',
'os_device_table'):
if "raid" in _type or table == "dos":
yield ""
else:
yield self.get_boot_partition(sysdevice) or ""
return list(generator())
class VariableOsDeviceMap(ReadonlyVariable):
"""
Map number for grub
Using for legecy grub (DEPRECATATED)
"""
type = "list"
def get(self):
return list(map(lambda x: str(x[0]),
enumerate(self.Get('os_device_dev'))))
class VariableOsDeviceArraySet(ReadonlyVariable):
"""
Диски массивы (при создании разделов на таких дисках перед
номером раздела добавляется "p": nvme0n1p1 вместо nvme0n11
"""
type = "list"
devnames = ("nvme", "mmcblk")
def get(self):
"""Get device partition table"""
def isArray(device, name):
if any(x in device for x in self.devnames):
return "on"
else:
return "off"
return list(map(lambda x: isArray(*x),
zip(self.Get('os_device_dev'),
self.Get('os_device_name'))))
class VariableOsDeviceSsdSet(ReadonlyVariable):
"""
Ssd property
"""
type = "list"
ssd_names = ("SSD", "OCZ", "PLEXTOR")
udev_property = 'ID_ATA_ROTATION_RATE_RPM'
# считаем, что nvme диски - SSD
devnames = ("nvme",)
def get(self):
"""Get device partition table"""
def isSsd(dev, name):
prop = device.udev.get_device_info(name=dev)
rpm = prop.get(self.udev_property, None)
if (any(x in dev for x in self.devnames)
or rpm == "0" or any(x in name for x in self.ssd_names)):
return "on"
else:
return "off"
return list(map(lambda x: isSsd(*x),
zip(self.Get('os_device_dev'),
self.Get('os_device_name'))))
class VariableOsDeviceSyspath(ReadonlyVariable):
"""
Table on device
"""
type = "list"
udev_property = 'DEVPATH'
def get(self):
"""Get device partition table"""
def getSysPath(dev):
prop = device.udev.get_device_info(name=dev)
syspath = prop.get(self.udev_property, "")
return syspath
return [getSysPath(x) for x in self.Get('os_device_dev')]
class VariableOsDeviceVirtualSet(ReadonlyVariable):
"""
Table on device
"""
type = "list"
virtual_names = ("VBOX", "VMWare", "QEMU")
virtual_syspath = ("virtio",)
def get(self):
"""Get device partition table"""
def isVirtual(device, name, syspath):
if any(x in name for x in self.virtual_names):
return "on"
elif any(x in syspath for x in self.virtual_syspath):
return "on"
else:
return "off"
return list(map(lambda x: isVirtual(*x),
zip(self.Get('os_device_dev'),
self.Get('os_device_name'),
self.Get('os_device_syspath'))))
class VariableOsDeviceTable(ReadonlyVariable):
"""
Table on device
"""
type = "list"
def getTableByChild(self, dev):
"""Get table by child partitions"""
syspath = device.udev.get_syspath(name=dev)
shortname = path.basename(dev)
for child in device.sysfs.glob(syspath, "%s*" % shortname):
udevinfo = device.udev.get_device_info(path=child)
map_names = {'mbr': 'dos',
'msdos': 'dos'}
table = (udevinfo.get('ID_PART_ENTRY_SCHEME', '') or
udevinfo.get('UDISKS_PARTITION_SCHEME', ''))
return map_names.get(table, table)
return ""
def get(self):
"""Get device partition table"""
autopartition = self.Get('cl_autopartition_set') == 'on'
autoDevice = self.Get('cl_autopartition_device')
def getTable(dev):
prop = device.udev.get_device_info(name=dev)
return prop.get('ID_PART_TABLE_TYPE',
self.getTableByChild(dev))
def getByAutopartition(dev):
if autopartition and autoDevice == dev:
return self.Get('cl_autopartition_table')
else:
return getTable(dev)
return list(map(getByAutopartition,
self.Get('os_device_dev')))
class VariableOsDeviceName(ReadonlyVariable):
"""
Name of device
"""
type = "list"
nameless_devices = {
'nvme': 'NVME',
'mmcblk': 'Multimedia Card'
}
def getName(self, dev):
devicepath = device.udev.get_syspath(name=dev)
if devicepath:
vendor = device.sysfs.read(devicepath, "device/vendor").strip()
model = device.sysfs.read(devicepath, "device/model").strip()
if vendor or model:
return ("%s %s" %
(vendor, model)).strip()
else:
for k, v in self.nameless_devices.items():
if k in devicepath:
return v
return ""
else:
return ""
def get(self):
return list(map(self.getName,
self.Get('os_device_dev')))
class VariableOsDeviceSize(ReadonlyVariable):
"""
Name of device
"""
type = "list"
def get(self):
"""Get device size"""
return list(map(lambda x: getPartitionSize(name=x, inBytes=True),
self.Get('os_device_dev')))
def humanReadable(self):
return list(map(humanreadableSize,
self.Get()))
#############################################
# Disk variables
#############################################
class VariableOsDiskData(ReadonlyTableVariable):
"""
Information about current system partition and mounts
"""
source = ['os_disk_dev',
'os_disk_uuid',
'os_disk_partuuid',
'os_disk_name',
'os_disk_size',
'os_disk_part',
'os_disk_format',
'os_disk_type',
'os_disk_raid',
'os_disk_lvm',
'os_disk_parent',
'os_disk_id',
'os_disk_grub']
class VariableOsDiskDev(DeviceHelper, ReadonlyVariable):
"""
List of available partition devices
"""
type = "list"
def get(self):
# получить блочные утсройства, в списке устройства с таблицей раздела
# разделены '/'
re_parent = re.compile(r"^/block/[^/]+")
disks = self.getBlockDevices()
parents = {re_parent.search(x).group()
for x in disks if x.count("/") > 2}
dev_names = device.udev.syspath_to_devname(
(x for x in disks if x not in parents),
dropempty=False)
return list(sorted((x for x in dev_names), key=self.separateDevice))
def humanReadable(self):
return list(map(self.getPerfectName,
self.Get()))
class VariableOsDiskMount(DeviceHelper, ReadonlyVariable):
"""
List mounted points for current operation system
"""
type = "list"
def get(self):
disk_hash = self.Get('os_disk_dev')
fstab = FStab('/etc/fstab', devs=disk_hash)
rootdev = self.Get('os_root_dev')
return list(map(lambda x: '/' if x == rootdev else fstab.getBy(eq=x) or "",
self.Get('os_disk_dev')))
class VariableOsDiskContent(ReadonlyVariable):
"""
Partition content
"""
type = "list"
def get(self):
"""
TODO: need to write
"""
return list(map(lambda x: "",
self.Get('os_disk_dev')))
class VariableOsDiskFormat(ReadonlyVariable):
"""
Filesystem on device partitions
"""
type = "list"
def get(self):
"""Get current disk filesystem"""
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
def getFormat(dev):
prop = device.udev.get_device_info(name=dev)
fs = prop.get('FSTAB_TYPE') or \
fstab.getBy(what=fstab.TYPE, eq=dev) or \
prop.get('ID_FS_TYPE', '')
if fs == "btrfs":
if "compress" in fstab.getBy(what=fstab.OPTS,
eq=dev):
return "btrfs-compress"
try:
if Btrfs(dev).compression != "":
return "btrfs-compress"
except BtrfsError:
pass
return fs
return list(map(getFormat,
self.Get('os_disk_dev')))
class VariableOsDiskType(ReadonlyVariable):
"""
List type (lvm,raid,partition,disk)
"""
type = "list"
re_raid = re.compile(r"-raid\d+$")
re_raid_partition = re.compile(r"-raid\d+-partition$")
def get(self):
"""Get partition scheme"""
types = list(map(lambda x: (x, device.udev.get_device_type(name=x)),
self.Get('os_disk_dev')))
lvmUsedDisks = {}
raidUsedDisks = {}
def forMember(typeInfo):
diskName, diskType = typeInfo
if diskName in raidUsedDisks:
diskType = "%s-raidmember(%s)" % (diskType,
raidUsedDisks[diskName])
if diskName in lvmUsedDisks:
diskType = "%s-lvmmember(%s)" % (diskType,
",".join(
lvmUsedDisks[diskName]))
return diskName, diskType
for dev, diskType in types:
prop = device.udev.get_device_info(name=dev)
if self.re_raid.search(diskType):
raiddevice = prop.get('DEVPATH', '')
elif self.re_raid_partition.search(diskType):
raiddevice = path.dirname(prop.get('DEVPATH', ''))
else:
raiddevice = None
if raiddevice:
raiddev = device.udev.get_devname(raiddevice)
for x in device.raid.devices(raiddevice):
raidUsedDisks[x] = raiddev
if diskType.endswith("lvm"):
for x in device.lvm.used_partitions(prop.get('DM_VG_NAME', ''),
prop.get('DM_LV_NAME', '')):
if x in lvmUsedDisks:
lvmUsedDisks[x].append(dev)
else:
lvmUsedDisks[x] = [dev]
return list(map(lambda x: x[1],
map(forMember,
types)))
class VariableOsDiskRaid(ReadonlyVariable):
"""
Raids which this partition constructed
"""
type = "list"
def generateRaid(self):
for disktype in self.Get('os_disk_type'):
if "raidmember" in disktype:
yield disktype.rpartition('(')[2][:-1]
else:
yield ""
def get(self):
return list(self.generateRaid())
class VariableOsDiskLvm(DeviceHelper, ReadonlyVariable):
"""
LVM vgname and lvname
"""
type = "list"
def get(self):
"""Get each element from var through udev [prop]"""
return list(map(self.getLvmName,
self.Get('os_disk_dev')))
class VariableOsDiskUuid(DeviceHelper, ReadonlyVariable):
"""
List uudi for partition devices
"""
type = "list"
def get(self):
return self.mapUdevProperty('os_disk_dev', 'ID_FS_UUID', '')
class VariableOsDiskPartuuid(DeviceHelper, ReadonlyVariable):
"""
List uudi for partition devices
"""
type = "list"
def get(self):
return self.mapUdevProperty('os_disk_dev', 'ID_PART_ENTRY_UUID', '')
class VariableOsDiskParent(ReadonlyVariable):
"""
List parent deivces for partition
"""
type = "list"
def get(self):
"""Get disk parent"""
return [",".join(device.udev.get_disk_devices(name=x))
for x in self.Get('os_disk_dev')]
class VariableOsDiskId(DeviceHelper, ReadonlyVariable):
"""
Partition's system id
"""
type = "list"
def get(self):
"""Get disk id"""
mapTypeUUID = {'ebd0a0a2-b9e5-4433-87c0-68b6b72699c7': '0700',
'0657fd6d-a4ab-43c4-84e5-0933c84b4f4f': '8200',
'a19d880f-05fc-4d3b-a006-743f0f84911e': 'FD00',
'21686148-6449-6e6f-744e-656564454649': 'EF02',
'c12a7328-f81f-11d2-ba4b-00a0c93ec93b': 'EF00',
'0fc63daf-8483-4772-8e79-3d69d8477de4': '8300'}
return list(map(lambda x: mapTypeUUID.get(x, x),
map(lambda x: x.rpartition("x")[2],
self.mapUdevProperty('os_disk_dev', 'ID_PART_ENTRY_TYPE',
''))))
class VariableOsDiskGrub(ReadonlyVariable):
"""
List grub id for partition devices
"""
type = "list"
def get(self):
"""Get disk grub map"""
devicesMap = dict(zip(self.Get('os_device_dev'),
self.Get('os_device_map')))
def getGrubMap(devParent):
dev, disktype, parent = devParent
# grub id вычисляем только для разделов расположенных на диске
# (исключаются lvm, raid и прочие абстракции)
if disktype != "disk-partition":
return ""
prop = device.udev.get_device_info(name=dev)
partnum = int(prop.get('ID_PART_ENTRY_NUMBER', 0))
if parent in devicesMap.keys() and partnum:
return "%s,%d" % (devicesMap[parent], partnum - 1)
else:
return ""
return list(map(getGrubMap,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_type'),
self.Get('os_disk_parent'))))
class VariableOsDiskPart(ReadonlyVariable):
"""
Type of partition devices
If msdos then(primary, extended or logical)
If gpt then gpt
"""
type = "list"
def get(self):
def generator():
for disk_dev, disk_type in self.ZipVars(
'os_disk_dev', 'os_disk_type'):
if disk_type.endswith("-partition"):
yield device.udev.get_partition_type(name=disk_dev)
else:
yield ""
return list(generator())
class VariableOsDiskSize(ReadonlyVariable):
"""
Partition size
"""
type = "list"
def get(self):
"""Get disk size"""
return list(map(lambda x: getPartitionSize(name=x, inBytes=True),
self.Get('os_disk_dev')))
def humanReadable(self):
return list(map(humanreadableSize,
self.Get()))
class VariableOsDiskName(DeviceHelper, ReadonlyVariable):
"""
Label of partitions
"""
type = "list"
def get(self):
"""Get disk label"""
return self.mapUdevProperty('os_disk_dev', 'ID_FS_LABEL', '')
class VariableOsDiskOptions(ReadonlyVariable):
"""
List mount options
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
def getFormat(dev):
return fstab.getBy(what=fstab.OPTS, eq=dev)
return list(map(getFormat,
self.Get('os_disk_dev')))
################################################
# Bind mount points
################################################
class VariableOsBindData(ReadonlyTableVariable):
"""
Table of bind mount points
"""
source = ['os_bind_path',
'os_bind_mountpoint']
class VariableOsBindPath(ReadonlyVariable):
"""
List source bind path
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
return fstab.getBy(what=fstab.NAME, where=fstab.OPTS,
contains="bind", allentry=True)
class VariableOsBindMountpoint(ReadonlyVariable):
"""
Mountpoints for directories bind
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
return fstab.getBy(what=fstab.DIR, where=fstab.OPTS,
contains="bind", allentry=True)
######################################################################
# Userselect partion parameters
######################################################################
class LocationHelper(VariableInterface):
"""
Location variable
"""
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
return ""
class VariableOsLocationBriefData(LocationHelper, TableVariable):
source = ["os_location_source",
"os_location_dest",
"os_location_format",
"os_location_perform_format",
"os_location_size"]
orig_source = [('os_install_disk_dev',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_perform_format',
'os_install_disk_size'),
('os_install_bind_path',
'os_install_bind_mountpoint', '', '', '')]
def init(self):
self.label = _("Mount points")
def get_autopartition(self, hr=HumanReadable.No):
# при авторазметке получаем только информацию о
# /boot/efi разделах
if self.GetBool('cl_autopartition_uefi_set'):
boot = [[dev, mp, fs, _format, size]
for dev, mp, fs, _format, size in self.ZipVars(
'os_install_disk_dev',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_perform_format',
'os_install_disk_size', humanreadable=hr)
if mp.startswith('/boot/efi')]
else:
boot = []
# исключаем из устройств авторазметки информацию о efi разделах
# так как она не достоверная
devices = [[dev, mp, fs, _format, size]
for dev, mp, fs, _format, size in self.ZipVars(
'cl_autopartition_disk_dev_full',
'cl_autopartition_disk_mount_full',
'cl_autopartition_disk_format_full',
'cl_autopartition_disk_perform_format_full',
'cl_autopartition_disk_size_full',
humanreadable=hr)
if not mp.startswith('/boot/efi')
]
binds = [[dev, mp, "", "", ""]
for dev, mp in self.ZipVars(
'cl_autopartition_bind_path',
'cl_autopartition_bind_mountpoint', humanreadable=hr)]
autodevs = self.Get('cl_autopartition_disk_dev_full')
bootdevs = [x[0] for x in boot]
def keysort(dev):
if dev in autodevs:
return autodevs.index(dev), -1
else:
return -1, bootdevs.index(dev)
return sorted(boot + devices,
key=lambda x: keysort(x[0])) + binds or [[]]
def get_manual(self, hr=HumanReadable.No):
devs = self.Get('os_disk_dev')
def keysort(dev):
if dev in devs:
return devs.index(dev), -1
else:
return -1, dev
devices = map(list, self.ZipVars(
'os_install_disk_dev',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_perform_format',
'os_install_disk_size',
humanreadable=hr))
binds = [[dev, mp, "", "", ""]
for dev, mp in self.ZipVars(
'os_install_bind_path',
'os_install_bind_mountpoint', humanreadable=hr)]
return sorted(devices,
key=lambda x: keysort(x[0])) + binds or [[]]
def get(self, hr=HumanReadable.No):
if self.GetBool('cl_autopartition_set'):
return self.get_autopartition(hr)
else:
return self.get_manual(hr)
class VariableOsLocationData(LocationHelper, TableVariable):
"""
Select installation disk variable
"""
opt = ["--disk", "-d"]
metavalue = 'DISK[[:MP[:FS[:FORMAT]]]]'
untrusted = True
source = ["os_location_source",
"os_location_dest",
"os_location_format",
"os_location_perform_format",
"os_location_size"]
check_after = ["os_install_root_type"]
def init(self):
self.help = (_("DISK bound for installation will be mounted to the "
"MP directory. To create a bind mount point, you have "
"to specify the source directory as DISK") + ". " +
_("To change the filesystem, you have to specify it as FS. "
"FORMAT is used for the specifying the need to format "
"partition or not"))
self.label = _("Mount points")
def set(self, value):
return sorted(value, key=lambda x: x and x[0])
class VariableOsLocationSource(LocationHelper, DeviceHelper, Variable):
"""
Source disk or directory
"""
type = "choiceedit-list"
def init(self):
self.label = _("Disk or directory")
def availDevs(self, choice=False):
"""
Available devices
"""
if self.Get('cl_install_type') == 'flash':
flashes = self.Select('os_device_dev',
where='os_device_type',
eq="flash")
return [disk_dev for disk_dev, disk_parent, disk_type in
self.ZipVars("os_disk_dev", "os_disk_parent",
"os_disk_type")
if disk_type == "disk-partition" and disk_parent in flashes]
else:
if choice:
return self.Get('os_disk_dev') + self.Get('os_bind_path')
else:
dev_from = self.Get('cl_install_dev_from')
return [
disk_dev for disk_dev, disk_mount in self.ZipVars(
"os_disk_dev", "os_disk_mount")
if ((disk_mount not in ("", "/")
or disk_dev == dev_from) and
not disk_mount.startswith("/boot/efi"))
] + self.Get('os_bind_path')
def get(self):
if self.Get('cl_autopartition_set') == "on":
return ([device.udev.get_devname(name=x)
for x in self.Get('cl_autopartition_disk_dev')] +
self.Get('cl_autopartition_bind_path'))
else:
return self.availDevs()
def set(self, value):
def normpath(val):
if type(val) == str and val:
return path.normpath(val)
return val
return list(map(normpath, value))
def choice(self):
return list(map(lambda x: (x, self.getPerfectName(x) or x),
self.fixOsDiskDev(self.availDevs(choice=True)))) + [("", "")]
def fixOsDiskDev(self, sourcelist=None):
"""
Fix os_disk_dev by autopartitions
"""
if not sourcelist:
sourcelist = self.Get('os_disk_dev')
scheme = self.Get('cl_autopartition_set') == "on"
if scheme:
autopartition_devices = self.Get('cl_autopartition_device')
exclude = {
disk_dev
for disk_dev, disk_parent in self.ZipVars("os_disk_dev",
"os_disk_parent")
if any(x in autopartition_devices
for x in disk_parent.split(','))
}
appendDisks = (self.Get('cl_autopartition_disk_dev') +
self.Get('cl_autopartition_bind_path'))
return [x for x in sourcelist if x not in exclude] + appendDisks
else:
return sourcelist
def check(self, value):
"""Check set location source"""
################################
# check of device specifing
################################
if not value:
raise VariableError(
_("To install the system, you need to specify the root device"))
###########################
# check wrong dev
###########################
disks = filter(lambda x: x.startswith('/dev/'), value)
# get original /dev names
cnDisks = (device.udev.get_device_info(name=x).get('DEVNAME', x)
for x in disks)
wrongDevices = list(set(cnDisks) -
set(self.fixOsDiskDev()))
if wrongDevices:
raise VariableError(_("Wrong device '%s'") % wrongDevices[0])
wrongSource = filter(lambda x: x and not x.startswith('/'), value)
if wrongSource:
raise VariableError(
_("Wrong bind mount point '%s'") % wrongSource[0])
##########################
# detect duplicate devices
##########################
dupDevices = list(set(filter(lambda x: disks.count(x) > 1,
disks)))
if dupDevices:
raise VariableError(
_("Device '%s' is used more than once") % dupDevices[0])
class VariableClRootSizeMin(Variable):
"""
Минимальнй размер root раздела
"""
value_format = "{cl_autopartition_root_size_min}"
class VariableOsLocationDest(LocationHelper, Variable):
"""
Desination directory of install disk data
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mount point")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return self.Get('cl_autopartition_disk_mount') + \
self.Get('cl_autopartition_bind_mountpoint')
else:
source = self.Get('os_location_source')
installFrom = self.Get('cl_install_dev_from')
singleDevice = self.Get('os_install_disk_single')
def installMountPoint(info):
dev, mount = info
if self.Get('cl_action') == 'system':
if self.Get('cl_install_type') == 'flash':
if dev == singleDevice:
return "/"
else:
return ""
else:
if dev == installFrom:
return "/"
elif mount == "/":
return ""
return mount
return list(map(installMountPoint,
filter(lambda x: x[0] in source,
list(zip(self.Get('os_disk_dev'),
self.Get('os_disk_mount'))) + \
list(zip(self.Get('os_bind_path'),
self.Get('os_bind_mountpoint'))))))
def set(self, value):
"""Add abilitiy not specify root"""
def normpath(val):
if type(val) == str and val:
return path.normpath(val)
return val
value = map(normpath, value)
return list(map(lambda x: x or "/", value))
def choice(self):
if self.Get('cl_install_type') == 'flash':
return ["/", ""]
else:
return ['/', '/boot', '/var/calculate', '/home',
'/usr', '/var', '/tmp', 'swap', '']
def check(self, value):
"""Check set location source"""
if self.Get('cl_autopartition_set') == "on":
return
################################
# check size for root device
################################
minroot = int(self.Get('cl_root_size_min'))
osInstallRootType = self.Get('os_install_root_type')
if osInstallRootType != "flash" and \
not "/usr" in value:
for mp, size in filter(lambda x: x[0] == '/' and x[1].isdigit() and \
int(x[1]) < minroot,
zip(value,
self.Get("os_location_size"))):
raise VariableError(
_("The root partition should be at least %s") % "7 Gb")
source = self.Get("os_location_source")
################################
# check of root device specifing
################################
if not source:
return
if not list(filter(lambda x: x == "/", value)):
raise VariableError(_("To install the system, you need to "
"specify the root device"))
################################
disks = list(filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(source, value)))
disksDevs = list(map(lambda x: x[0], disks))
binds = list(filter(lambda x: not x[0].startswith('/dev/') and x[1],
zip(source, value)))
##########################
# detect efi specifing
##########################
reEfi = re.compile(r"/u?efi", re.I)
if any(reEfi.search(x) for x in value):
if self.Get('cl_client_type') == 'gui':
raise VariableError(
_("Please specify EFI partition by UEFI parameter in "
"advanced options"))
else:
raise VariableError(
_("Please specify EFI partition by UEFI option"))
##########################
# detect duplicate mps
##########################
dupMP = list(set(filter(lambda x: value.count(x) > 1,
filter(lambda x: x and x != "swap",
value))))
if dupMP:
raise VariableError(
_("Mount point '%s' is used more than once") % dupMP[0])
#########################
# detect wrong bind
#########################
wrongBind = filter(lambda x: not x[0].startswith("/") or
not x[1].startswith("/"),
binds)
if wrongBind:
raise VariableError(
_("Incorrect mount point (bind '%(bindSrc)s' to "
"'%(bindDst)s')") \
% {'bindSrc': wrongBind[0][0],
'bindDst': wrongBind[0][1]})
#########################################
# Check '/' in start path of dest pointst
#########################################
wrongMP = filter(lambda x: x and not x.startswith("/") and x != "swap",
value)
if wrongMP:
raise VariableError(_("Wrong mount point '%s'") % wrongMP[0])
#########################################
# Check using current root
#########################################
rootDev = self.Get('os_root_dev')
if rootDev in self.Get('os_install_disk_dev'):
raise VariableError(
_("You may not use the current root partition %s for "
"installation") % rootDev)
#################################
# detect using extended partition
#################################
extendedPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_part',
eq='extended', limit=1)
if extendedPartitions:
raise VariableError(
_("Unable to use extended partition %s for installation") %
extendedPartitions)
##########################
# detect using CDROM disks
##########################
cdromPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_type',
like='cdrom', limit=1)
if cdromPartitions:
raise VariableError(_("Unable to use CDROM %s for installation") %
cdromPartitions)
###############################
# check cross bind mount points
###############################
DEVICE, MP = 0, 1
srcMountPoints = map(lambda x: x[DEVICE], binds)
destMountPoints = map(lambda x: x[MP], binds)
wrongBind = list(filter(lambda x: x in destMountPoints, srcMountPoints))
if wrongBind:
incompBind = list(filter(lambda x: x[1] == wrongBind[0],
zip(srcMountPoints, destMountPoints)))
raise VariableError(
_("Source directory %(src)s is already used "
"for binding '%(bindSrc)s' to '%(bindDst)s'") \
% {'src': wrongBind[0],
'bindSrc': incompBind[0][0],
'bindDst': incompBind[0][1]})
#######################################
# check multipart for flash and builder
#######################################
osInstallRootType = self.Get('os_install_root_type')
if osInstallRootType == "flash":
if filter(lambda x: x and x != '/', value):
raise VariableError(
_("Flash install does not support multipartition mode"))
if filter(lambda x: x == "swap", value):
raise VariableError(
_("Flash install does not support swap disks"))
########################################
# check install on member of RAID or LVM
########################################
installTypes = zip(self.Get('os_install_disk_dev'),
self.Get('os_install_disk_type'))
for checkType in ("raid", "lvm"):
memberData = list(filter(lambda x: checkType + "member" in x[1],
installTypes))
if memberData:
raise VariableError(
_("Unable to use {part} partition used by active "
"{typepart} for installation").format(
typepart=checkType.upper(),
part=memberData[0][0]))
class VariableOsLocationFormat(LocationHelper, Variable):
type = "choice-list"
def init(self):
self.label = _("Filesystem")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return self.Get('cl_autopartition_disk_format') + \
list(map(lambda x: "", self.Get('cl_autopartition_bind_path')))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
value = [""] * len(source)
return list(map(self.defaultFormat(),
zip(source, mount, value)))
def choice(self):
if self.Get('cl_install_type') == "flash":
return ["", "vfat"]
else:
return [""] + self.Get('os_format_type')
def defaultFormat(self):
"""Describe default value for filesystem"""
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
osInstallRootType = self.Get('os_install_root_type')
availFS = set(self.Select('os_format_type',
where='os_format_use',
eq='yes'))
allAvailFS = self.Get('os_format_type')
default_format = None
if self.Get('os_root_type_ext') in RootType.HDD:
root_format = self.select(
'os_disk_format', os_disk_mount="/", limit=1)
autoformat = self.Get('cl_autopartition_default_format')
for _format in (root_format, autoformat):
if _format and self.select(
'os_format_use', os_format_type=_format, limit=1) == "yes":
default_format = _format
break
else:
root_format = None
def wrap(info):
dev, mount, fs = info
if mount and not fs and dev.startswith('/dev/'):
if mount == "swap":
return "swap"
elif mount.startswith('/boot/efi'):
return "vfat"
if mount == "/":
if root_format and root_format in allAvailFS:
return root_format
else:
if dev in diskFormat and diskFormat[dev] in allAvailFS:
if mount.count('/') == 1 or mount == "/var/calculate":
if FileSystemManager.checkFSForTypeMount(
diskFormat[dev],
osInstallRootType, mount):
return diskFormat[dev]
else:
return diskFormat[dev]
if default_format:
return default_format
return FileSystemManager.get_default_fs(self, osInstallRootType)
return fs
return wrap
def set(self, value):
value = map(lambda x: "vfat" if x == "uefi" else x, value)
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
return list(map(self.defaultFormat(),
zip(source, mount, value)))
def check(self, value):
osInstallRootType = self.Get('os_install_root_type')
devMpFs = zip(self.Get('os_location_source'),
self.Get('os_location_dest'), value)
for dev, mp, fs in devMpFs:
if dev.startswith('/') and not dev.startswith('/dev/') and fs:
raise VariableError(
_("The bind mount point does not use filesystem"))
# check compatible fs for mount point only root dirs
if dev.startswith('/dev/') and mp and (mp.count('/') == 1 or
mp in (
'/var/calculate',
'/boot/efi')):
if not FileSystemManager.checkFSForTypeMount(fs,
osInstallRootType,
mp):
raise VariableError(
_("The filesystem for '%(mp)s' should not be '%(opt)s'")
% {'mp': mp, 'opt': fs} + " " +
_("for {typedisk} install").format(
typedisk=osInstallRootType))
if mp == "swap" and fs != "swap":
raise VariableError(
_(
"The swap partition {dev} must be formatted as swap").format(
dev=dev))
class VariableOsLocationPerformFormat(LocationHelper, Variable):
type = "boolauto-list"
def init(self):
self.label = _("Format")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return list(map(lambda x: "on",
self.Get('cl_autopartition_disk_format'))) + \
list(map(lambda x: "",
self.Get('cl_autopartition_bind_path')))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
fs = self.Get("os_location_format")
value = [""] * len(source)
return list(map(self.defaultPerformFormat(),
zip(source, mount, fs, value)))
fixNtfs = lambda self, x: {'ntfs-3g': 'ntfs'}.get(x, x)
def is_force_param(self):
return "--force" in self.Get("cl_console_args")
def check(self, value):
"""Check perform format
Check what format will perform for need partition.
At example on change filesystem on partition.
"""
DEV, MP, FS, FORMAT = 0, 1, 2, 3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
diskMount = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_mount'))))
unavailFS = set(self.Select('os_format_type',
where='os_format_use',
eq="no"))
fixNtfs = self.fixNtfs
for dev, mp, fs, isformat in info:
# should format if change fs or partition is root, but non flash
partitionMustFormat = \
fixNtfs(diskFormat.get(dev, fs)) != fixNtfs(fs) or \
(mp == '/' and
self.Get('os_install_root_type') != 'flash')
# if entry has mount point AND
# partition must was formated
if mp and partitionMustFormat:
# partition use in current system
if diskMount.get(dev, ''):
raise VariableError(
_("{device} must but cannot be formatted, as it is "
"mounted to {mountpoint} on the current system").format(
device=dev, mountpoint=diskMount.get(dev, '')))
if isMount(dev):
if not self.is_force_param() or not try_umount(dev):
raise VariableError(
_("Please unmount {device}, as it will be used for "
"installation").format(device=dev))
# but user select non-format
if not self.isTrue(isformat):
raise VariableError(
_("{device} must be formatted").format(
device=dev))
if self.isTrue(isformat):
if not mp:
raise VariableError(
_("No need to format unused device {dev}").format(
dev=dev))
if fs in unavailFS:
raise VariableError(
_("Filesystem '%s' is not available") % fs)
if not dev.startswith('/dev/'):
raise VariableError(
_("Bind mount points should not be formatted"))
elif diskMount.get(dev, "") and isformat:
raise VariableError(
_(
"{device} must but cannot be formatted, as it is mounted to {mountpoint} on the current system"
).format(
device=dev, mountpoint=diskMount.get(dev, '')))
elif isMount(dev):
if not self.is_force_param() or not try_umount(dev):
raise VariableError(
_("Please unmount disk {device} to "
"use it for install").format(device=dev))
def defaultPerformFormat(self):
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
def wrap(info):
source, dest, fs, isformat = info
fixNtfs = self.fixNtfs
if not isformat and source.startswith('/dev/'):
if dest == '/':
return "on"
if dest and fixNtfs(diskFormat.get(source, fs)) != fixNtfs(fs):
return "on"
return isformat or ("off" if source.startswith('/dev/') else "")
return wrap
def set(self, value):
"""Default values for perform format"""
value = Variable.set(self, value)
DEV, MP, FS, FORMAT = 0, 1, 2, 3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
return list(map(self.defaultPerformFormat(),
map(lambda x: [x[DEV], x[MP], x[FS], ""] \
if x[FORMAT] == "off" and not x[DEV].startswith("/dev/")
else x,
info)))
class VariableOsLocationSize(LocationHelper, SourceReadonlyVariable):
"""
Location size
"""
type = "list"
indexField = "os_location_source"
def init(self):
self.label = _("Size")
def getMap(self):
mapDevSize = dict(self.ZipVars('os_disk_dev', 'os_disk_size'))
mapDevSize.update(
zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_disk_size')))
return mapDevSize
def get(self):
return self.get_sizes(self.getMap().get)
def get_sizes(self, method):
devices = (device.udev.get_devname(name=x)
for x in self.Get(self.indexField))
mapped = (method(x) for x in devices)
return [x or "" for x in mapped]
def getMapHumanReadable(self):
mapDevSize = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_size', humanreadable=True)))
mapDevSize.update(
zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_disk_size', humanreadable=True)))
return mapDevSize
def humanReadable(self):
return self.get_sizes(self.getMapHumanReadable().get)
class VariableClUuidSet(Variable):
"""
Use or not UUID for /etc/fstab
"""
type = "bool"
opt = ["--uuid"]
value = "on"
def init(self):
self.label = _("Use UUID")
self.help = _("use UUID")
def uncompatible(self):
"""
Unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return _("Impossible to use UUID for Flash install")
#############################################################
# Install disk parameters
#############################################################
class VariableOsInstallDiskData(ReadonlyTableVariable):
"""
Table of install disk params
"""
source = ["os_install_disk_dev",
"os_install_disk_mount",
"os_install_disk_format",
"os_install_disk_perform_format",
"os_install_disk_options",
"os_install_disk_id",
"os_install_disk_uuid",
"os_install_disk_use",
"os_install_disk_name",
"os_install_disk_size",
"os_install_disk_type",
"os_install_disk_part",
"os_install_disk_parent"]
class VariableOsInstallDiskParent(SourceReadonlyVariable):
"""
Partition parent devices using for install
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskParent = dict(self.ZipVars('os_disk_dev', 'os_disk_parent'))
# replace value for autopartition
if self.Get('cl_autopartition_set') == 'on':
disk_parent = self.Get('cl_autopartition_parent')
for disk_dev in self.Get('cl_autopartition_disk_dev'):
diskParent[disk_dev] = disk_parent
return diskParent
humanReadable = Variable.humanReadable
class VariableOsInstallDiskDevBase(DeviceHelper, ReadonlyVariable):
"""
Variable using for resolv cyclic deps
"""
type = "list"
def get(self):
if self.Get('cl_install_type') == 'flash':
disk = self.Get('os_install_disk_single')
if disk:
return [device.udev.get_devname(name=disk)]
return []
return [dev
for dev, mount in self.ZipVars('os_location_source',
'os_location_dest')
if (dev.startswith("/dev") and mount and
not mount.startswith("/boot/efi"))]
class VariableOsInstallDiskParentBase(VariableOsInstallDiskParent):
"""
Partition parent devices using for install
"""
type = "list"
indexField = "os_install_disk_dev_base"
humanReadable = Variable.humanReadable
class VariableOsInstallDiskDev(ReadonlyVariable, DeviceHelper):
"""
Disks for installation
"""
type = "list"
def get(self):
return (self.Get('os_install_uefi') +
self.Get('os_install_disk_dev_base'))
def humanReadable(self):
return list(map(lambda x: self.getPerfectName(x, defaultValue=x),
self.Get()))
class VariableOsInstallDiskUuid(ReadonlyVariable):
"""
Uudi for install
"""
type = "list"
def get(self):
diskDev = self.Get('os_install_disk_dev')
hashUUID = getUUIDDict(revers=True)
return list(map(lambda x: hashUUID.get(x, "")[5:], diskDev))
class VariableOsInstallDiskPartuuid(ReadonlyVariable):
"""
Uudi for install
"""
type = "list"
def mapUdevProperty(self, var, prop, default):
"""Get each element from var through udev [prop]"""
return [device.udev.get_device_info(name=x).get(prop, default)
for x in self.Get(var)]
def get(self):
diskDev = self.Get('os_install_disk_dev')
return self.mapUdevProperty('os_install_disk_dev', 'ID_PART_ENTRY_UUID', '')
class VariableOsInstallDiskMountBase(ReadonlyVariable):
"""
List mounted points for installed system
Variable use for resolv cyclic deps by UEFI vars
"""
type = "list"
def get(self):
if self.Get('cl_install_type') == 'flash':
disk = self.Get('os_install_disk_single')
if disk:
return ["/"]
return []
return [mount
for dev, mount in self.ZipVars('os_location_source',
'os_location_dest')
if (dev.startswith("/dev") and mount and
not mount.startswith("/boot/efi"))]
class VariableOsInstallDiskMount(ReadonlyVariable):
"""
List mounted points for installed system
"""
type = "list"
def generate_uefi_mountpoints(self):
yield "/boot/efi"
for i in range(2, 20):
yield "/boot/efi%d" % i
def get(self):
"""Get install disk dest"""
mps = self.generate_uefi_mountpoints()
return ([next(mps) for x in self.Get('os_install_uefi')] +
self.Get('os_install_disk_mount_base'))
class VariableOsInstallDiskUse(ReadonlyVariable):
"""
/dev/sd or UUID= list (by cl_uuid_set)
"""
type = "list"
def get(self):
"""Get real id (by cl_uuid_set) device"""
if self.Get('cl_uuid_set') == "on":
return list(map(lambda x: "UUID=%s" % x[0] if x[0] else x[1],
zip(self.Get('os_install_disk_uuid'),
self.Get('os_install_disk_dev'))))
else:
return self.Get('os_install_disk_dev')
class VariableOsInstallDiskOptions(ReadonlyVariable):
"""
List mount options of installed os
"""
type = "list"
def get(self):
ssd_devices = {
dev for dev, ssd in self.ZipVars('install.os_device_dev',
'install.os_device_ssd_set')
if ssd == 'on'
}
old_options = {
dev: options for dev, options in self.ZipVars('os_disk_dev',
'os_disk_options')
if options
}
def generator():
for disk_dev, disk_format, disk_parent in self.ZipVars(
'os_install_disk_dev',
'os_install_disk_format',
'os_install_disk_parent'):
if disk_dev in old_options:
yield old_options[disk_dev]
else:
all_ssd = all(x in ssd_devices
for x in disk_parent.split(','))
compression = self.Get('os_install_btrfs_compression')
yield FileSystemManager.getDefaultOpt(disk_format, all_ssd,
compression)
return list(generator())
class VariableOsInstallDiskFormat(ReadonlyVariable):
"""
Install list filesystem for partition devices
"""
type = "choice-list"
def get(self):
_format = [fs for dev, mp, fs in self.ZipVars('os_location_source',
'os_location_dest',
'os_location_format')
if dev.startswith('/dev/') and mp]
efiformat = ['vfat' for x in self.Get('os_install_uefi')]
return efiformat + _format
class VariableOsInstallDiskPerformFormat(ReadonlyVariable):
"""
List need for format
"""
type = "bool-list"
def get(self):
_format = list(map(lambda x: x[2],
filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_perform_format')))))
if self.GetBool('cl_autopartition_set'):
efiformat = ['on' for x in self.Get('os_install_uefi')]
res = efiformat + _format
else:
vfatdevs = self.select('os_disk_dev', os_disk_format="vfat")
res = ["off" if dv in vfatdevs else "on"
for dv in self.Get('os_install_uefi')] + _format
return res
class VariableOsInstallDiskId(ReadonlyVariable):
"""
Install partition's system id
"""
type = "list"
def get(self):
def generator():
for (disk_dev, disk_part, disk_mount,
disk_format) in self.ZipVars('os_install_disk_dev',
'os_install_disk_part',
'os_install_disk_mount',
'os_install_disk_format'):
if disk_part in ("gpt", "primary", "extended", "logical"):
if disk_part == "gpt":
if disk_mount.startswith("/boot/efi"):
disk_format = "uefi"
else:
disk_part = "msdos"
fsinfo = FileSystemManager.supportFS.get(
disk_format, FileSystemManager.default_param)
yield fsinfo.get(disk_part)
else:
yield ""
return list(generator())
class VariableOsInstallDiskName(Variable):
"""
New labels for disk
"""
type = "list"
def get(self):
diskLabel = dict(self.ZipVars('os_disk_dev', 'os_disk_name'))
def changeLabel(info):
dev, mount = info
if mount == '/':
return "%s-%s" % (self.Get('os_install_linux_shortname'),
self.Get('os_install_linux_ver'))
else:
return diskLabel.get(dev, '')
return list(map(changeLabel,
self.ZipVars('os_install_disk_dev',
'os_install_disk_mount')))
class VariableOsInstallDiskSize(SourceReadonlyVariable):
"""
New partition sizes (for feature change partition)
"""
type = "list"
indexField = 'os_install_disk_dev'
def getMap(self):
if self.GetBool("cl_autopartition_set"):
return {
dev: size for dev, size in chain(
self.ZipVars('os_disk_dev', 'os_disk_size'),
self.ZipVars('os_location_source', 'os_location_size'),
self.ZipVars('cl_autopartition_disk_dev_full',
'cl_autopartition_disk_size_full'))
}
else:
return {
dev: size for dev, size in chain(
self.ZipVars('os_disk_dev', 'os_disk_size'),
self.ZipVars('os_location_source', 'os_location_size'))
}
def getMapHumanReadable(self):
if self.GetBool("cl_autopartition_set"):
return {
dev: size for dev, size in chain(
self.ZipVars('os_disk_dev', 'os_disk_size',
humanreadable=True),
self.ZipVars('os_location_source', 'os_location_size',
humanreadable=True),
self.ZipVars('cl_autopartition_disk_dev_full',
'cl_autopartition_disk_size_full',
humanreadable=True))
}
else:
return {
device.udev.get_devname(name=dev): size
for dev, size in chain(
zip(self.Get('os_disk_dev'),
self.Get('os_disk_size', humanreadable=True)),
zip(self.Get('os_location_source'),
self.Get('os_location_size', humanreadable=True)))
}
class VariableOsInstallDiskType(SourceReadonlyVariable):
"""
New partition scheme (for feature change partition)
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskType = dict(self.ZipVars('os_disk_dev', 'os_disk_type'))
diskType.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_type'))
return diskType
humanReadable = Variable.humanReadable
class VariableOsInstallDiskPart(SourceReadonlyVariable):
"""
Get new type partitions using for install
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskPart = dict(self.ZipVars('os_disk_dev', 'os_disk_part'))
diskPart.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_part'))
return diskPart
humanReadable = Variable.humanReadable
class VariableOsInstallBindData(ReadonlyTableVariable):
"""
Table of install bind mount points
"""
source = ['os_install_bind_path',
'os_install_bind_mountpoint']
class VariableOsInstallBindPath(ReadonlyVariable):
"""
Install directories for bind
"""
type = "list"
def get(self):
"""Get install bind source"""
return self.Select('os_location_source',
where='os_location_dest',
func=lambda x: not x[1].startswith('/dev/') and x[0])
class VariableOsInstallBindMountpoint(ReadonlyVariable):
"""
Mountpoint for install directories bind
"""
def get(self):
return self.Select('os_location_dest',
where='os_location_source',
func=lambda x: not x[0].startswith('/dev/') and x[1])
class VariableOsInstallBootloader(ReadonlyVariable):
"""
Bootloader for brief information
"""
def init(self):
self.label = _("Bootloader")
def get(self):
if self.Get('os_install_uefi_set') == 'on':
return "uefi"
else:
return ",".join(self.Get('os_install_mbr'))
def humanReadable(self):
if self.Get('os_install_uefi_set') == 'on':
return _("UEFI")
else:
mbrs = self.Get('os_install_mbr', humanreadable=True)
if not mbrs:
return _("no")
return ",".join(mbrs)
class VariableOsInstallBootDevices(ReadonlyVariable):
"""
Физическое устройство с которого будет производиться загрузка системы на
котором находится /boot или /, т.е. если / находится на RAID, расположенном
на двух дисках - будет эти диски
"""
type = "list"
def get(self):
bootDev = (self.Select('os_install_disk_parent',
where='os_install_disk_mount',
_in=('/', '/boot'), sort="DESC", limit=1) or
self.select('os_disk_parent',
os_disk_dev=self.Get('os_install_root_dev'),
limit=1))
if bootDev:
devices = bootDev.split(',')
return [mbr
for dev, mbr in self.ZipVars('os_device_dev',
'os_device_mbr')
if mbr and dev in devices]
return []
class VariableOsUefi(ReadonlyVariable):
"""
UEFI partitions from fstab
"""
def get(self):
return self.select('os_disk_dev', os_disk_mount__startswith="/boot/efi")
class VariableOsInstallUefi(LocationHelper, Variable):
"""
UEFI partitions for install
"""
type = "choiceedit-list"
element = "selecttable"
opt = ["--uefi"]
metavalue = "EFI"
re_not0_raid = re.compile(r"-raid[1-9]")
def init(self):
self.label = _("UEFI boot")
self.help = _("set UEFI boot disks")
@property
def install_to_not_x86_64(self):
return self.Get('os_install_arch_machine') != 'x86_64'
@property
def install_without_uefiboot(self):
return self.Get('os_uefi_set') == 'off'
@property
def install_to_flash(self):
return self.Get('os_install_root_type') == 'flash'
def is_force_param(self):
return "--force" in self.Get("cl_console_args")
def get(self):
# если используется авторазметка список разделов находится в ней
if self.GetBool('cl_autopartition_set'):
return self.Get('cl_autopartition_efi')
# исключаем определение UEFI если оно не может быть использовано
if (self.install_to_flash or self.install_to_not_x86_64 or
self.install_without_uefiboot):
return []
# если происходит обновление загрузчика текущей системы
# для определения используем /etc/fstab
fstabefidevs = self.Get('os_uefi')
if self.Get('cl_action') != 'system':
return fstabefidevs
rootdev = self.Get('os_install_root_dev')
rootscheme = self.select('os_disk_type',
os_disk_dev=rootdev, limit=1)
# определяем список физических дисков на которых находится rootdev
parents = set(self.select('os_disk_parent',
os_disk_dev=rootdev, limit=1).split(','))
efidev = [x for x in self.select('os_device_efi',
os_device_dev__in=parents) if x]
allefi = [x for x in self.select('os_device_efi',
os_device_type="hdd") if x]
# если корневое устройство расположено на ненулевом RAID - возвращаем
# полный список иначе только первое устройство
# если диски для установки не содержат EFI - берём efi из /etc/fstab
# если и там нет, то берём первый попавшийся EFI на любом из HDD
if self.re_not0_raid.search(rootscheme):
return efidev or fstabefidevs or allefi[:1]
# возвращаем первое найденное устройство
else:
return efidev[:1] or fstabefidevs or allefi[:1]
def set(self, value):
def transform(efidev):
if efidev not in self.Get('os_device_efi'):
return self.select('os_device_efi',
os_device_dev=efidev, limit=1) or efidev
return efidev
return list(filter(lambda x: x != "off", map(transform, value)))
def choice(self):
deviceParentMap = self.ZipVars('os_device_dev',
'os_device_efi', 'os_device_name')
return [(efidisk, "%s (%s)" % (dev, name or _("Unknown")))
for dev, efidisk, name in deviceParentMap
if efidisk]
def check(self, value):
if value:
efi_boot_mgr = getProgPath('/usr/sbin/efibootmgr')
if not efi_boot_mgr:
raise VariableError(
_("UEFI installation is unavailable, because '%s' command "
"not found") % efi_boot_mgr)
if self.install_without_uefiboot:
raise VariableError(
_("Your system must be loaded in UEFI for using this "
"bootloader"))
if self.install_to_not_x86_64:
raise VariableError(
_("Architecture of the target system must be x86_64"))
efidevs = self.Get('os_device_efi')
badefi = [x for x in value if x not in efidevs]
if badefi:
raise VariableError(
_("Wrong EFI device %s") % badefi[0])
fstab_disks = [dev
for dev, mp in self.ZipVars('os_disk_dev', 'os_disk_mount')
if mp and not mp.startswith("/boot/efi")
]
for disk in value:
if disk in fstab_disks:
raise VariableError(
_("Partition {disk} already used by "
"the current system").format(disk=disk))
not_fat_efi = self.select('os_disk_dev',
os_disk_format__ne="vfat")
for efipart in value:
if efipart in not_fat_efi and isMount(efipart):
if not self.is_force_param() or not try_umount(efipart):
raise VariableError(
_("Please unmount {device}, as it will be used for "
"installation").format(device=efipart))
for efipart in value:
if efipart in self.select('os_location_source',
os_location_dest__ne=""):
raise VariableError(
_("Partition {disk} already used for "
"installation").format(disk=efipart))
def uncompatible(self):
"""
Uncompatible with autopartition
"""
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
if self.Get('os_install_root_type') == 'flash':
return \
_("This option not used for Flash install")
return ""
class VariableOsInstallMbr(LocationHelper, Variable):
"""
Disks for boot mbr
"""
type = "choiceedit-list"
element = "selecttable"
opt = ["--mbr"]
metavalue = "MBR"
untrusted = True
check_after = ["os_install_uefi"]
def init(self):
self.label = _("Boot disk")
self.help = _("boot disk for the system bound for install")
def get(self):
"""Get default Master boot record install"""
if self.Get('os_install_uefi_set') == 'on':
return []
if self.Get('cl_autopartition_set') == 'on':
return self.Get('cl_autopartition_mbr')
if self.Get('os_install_root_type') in ("flash", "usb-hdd"):
rootdev = self.Get('os_install_root_dev')
device = filter(lambda x: x in rootdev,
self.Get('os_device_dev'))
if device:
return [device[0]]
else:
return []
bootdevices = self.Get('os_install_boot_devices')
# при установке с HDD также устанавливаем загрузчик на первый диск
# если есть возможность
if self.Get('os_root_type') == "hdd":
first_hdd = self.Select(
'os_device_dev', where='os_device_type', eq='hdd', limit=1)
if self.select('os_device_mbr', os_device_dev=first_hdd, limit=1):
bootdevices.append(first_hdd)
return sorted(set(bootdevices))
def choice(self):
deviceParentMap = self.ZipVars('os_device_mbr', 'os_device_name')
return [(mbrdisk, "%s (%s)" % (mbrdisk, name or _("Unknown")))
for mbrdisk, name in deviceParentMap
if mbrdisk]
def set(self, value):
# support off value
return list(filter(lambda x: x != "off", value))
def check(self, value):
if self.GetBool('cl_autopartition_set'):
return
rootType = self.Get('os_install_root_type')
if rootType == "flash":
if len(value) > 1:
raise VariableError(
_("For Flash install, you need only one disk"))
if value and self.Get('os_install_uefi_set') == "on":
raise VariableError(_("MBR is not used with the UEFI bootloader"))
useBtrfs = "btrfs" in self.Select('os_install_disk_format',
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC")[:1]
for mbrDisk in value:
if self.Get('cl_autopartition_set') == 'on':
tableOnBootDisk = self.Get('cl_autopartition_table')
else:
tableOnBootDisk = self.Select('os_device_table',
where="os_device_dev", eq=mbrDisk,
limit=1)
if not tableOnBootDisk:
raise VariableError(
_("Disk '%s' needs a partition table for the boot record") %
mbrDisk)
if rootType == "flash":
if tableOnBootDisk == "gpt":
raise VariableError(_("You need a disk with a dos "
"table for Flash install"))
if rootType in ("usb-hdd", "hdd") and tableOnBootDisk == "gpt":
bbsizes = (
size
for size, disk_id, disk_parent in self.ZipVars(
'os_disk_size', 'os_disk_id', 'os_disk_parent'
)
if disk_id == 'EF02' and mbrDisk in disk_parent
)
bios_grub_size = self.Get('cl_autopartition_bios_grub_size')
for bbsize in bbsizes:
minsize = "%dMb" % (int(bios_grub_size) / Sizes.M)
if not bbsize:
raise VariableError(
_("Your boot device must have a "
"BIOS Boot partition ({minsize})").format(
minsize=minsize))
# проверка размера EF02 при установке на btrfs
elif useBtrfs:
if (bbsize.isdigit() and bios_grub_size.isdigit() and
round(float(bbsize) / Sizes.M) < round(float(bios_grub_size)/ Sizes.M)):
raise VariableError(
_("Your boot device must have a BIOS Boot "
"partition ({minsize})").format(
minsize=minsize))
if mbrDisk not in self.Get('os_device_mbr'):
raise VariableError(
_("Device {device} has not BIOS Boot partition").format(
device=mbrDisk))
if value:
if not self.Get('os_grub2_path'):
self.checkForLegacyGrub()
def checkForLegacyGrub(self):
"""Check current disk configuration for installation for install
legacy grub"""
bootDiskType, bootDiskFormat = \
self.Select(['os_install_disk_type',
'os_install_disk_format'],
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC", limit=1)
if "lvm" in bootDiskType or "raid" in bootDiskType:
raise ValueError(
_("Legacy grub requires a separate /boot partition "
"to support boot from a RAID or a LVM"))
if bootDiskFormat in ("btrfs", "nilfs2"):
raise ValueError(
_("To support booting from %s, legacy grub needs a "
"separate /boot partition") % bootDiskFormat)
def uncompatible(self):
"""
Опция несовместима с использованием UEFI
"""
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
return ""
class VariableOsInstallRootType(LocationHelper, Variable):
"""
Type of installation
"""
opt = ["--type"]
metavalue = "DISKTYPE"
type = "choice"
def init(self):
self.help = _("device type for the system bound for install")
self.label = _("Installation type")
def get(self):
selectRootType = self.Get('cl_install_type')
if not selectRootType:
return self.Get('os_root_type')
if selectRootType == "flash":
return "flash"
else:
rootdev = self.Get('os_install_root_dev')
devs = list(device.udev.get_disk_devices(name=rootdev))
if not devs:
return "hdd"
device_type = self.Select(
'os_device_type', where='os_device_dev', eq=devs[0], limit=1)
if device_type in ("usb-hdd", "flash"):
return "usb-hdd"
return "hdd"
def choice(self):
return [("hdd", _("Hard disk")),
("flash", _("USB Flash")),
("usb-hdd", _("USB Hard Disk"))]
class VariableOsInstallRootDev(ReadonlyVariable):
def get(self):
"""Get install root device"""
if self.Get('cl_action') == 'system':
return self.Select('os_install_disk_dev_base',
where='os_install_disk_mount_base',
eq="/", limit=1) or ''
else:
return self.Get('os_root_dev')
class VariableOsInstallRootUuid(ReadonlyVariable):
def get(self):
"""UUID корневого устройства"""
if self.Get('cl_action') == 'system':
root_dev = self.Get('os_install_root_dev')
return self.Select('os_install_disk_uuid',
where='os_install_disk_dev',
eq=root_dev, limit=1) or ''
else:
root_dev = self.Get('os_root_dev')
return self.Select('os_disk_uuid',
where='os_disk_dev',
eq=root_dev, limit=1) or ''
class VariableOsInstallFstabMountConf(DeviceHelper, ReadonlyVariable):
"""
FStab.conf contains for mount and bind points
"""
def _commentFstab(self, s, mp, dev):
"""Generate comment for /etc/fstab each line"""
if s.startswith("UUID"):
return "# %s was on %s during installation\n%s" % (mp, dev, s)
else:
return s
def formatFstab(self, used, dev, mp, fs, opts, spec):
if fs in FileSystemManager.supportFS:
fs_orig = FileSystemManager.supportFS[fs].get('orig', fs)
else:
fs_orig = fs
ret = "{dev}\t{mp}\t{fs}\t{opts}\t{spec}".format(
dev=used, mp=mp, fs=fs_orig, opts=opts, spec=spec
)
if used.startswith("UUID"):
return "# %s was on %s during installation\n%s" % (mp, dev, ret)
return ret
def get(self):
devicesForFstab = self.Select([
'os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_options',
'os_install_disk_dev'],
where='os_install_disk_mount',
func=lambda x: x[0] != "" and x[0] != "swap")
devicesForFstab = sorted(
devicesForFstab, key=lambda x: self.separateDevice(x[1]))
rootLine = "\n".join(
self.formatFstab(used, dev, mp, fs, opts, "0 1")
for used, mp, fs, opts, dev in devicesForFstab[:1]
)
otherLines = "\n".join(
self.formatFstab(used, dev, mp, fs, opts, "0 0")
for used, mp, fs, opts, dev in devicesForFstab[1:]
)
bindData = self.ZipVars('os_install_bind_path',
'os_install_bind_mountpoint')
bindLines = "\n".join(map(lambda x: "%s\t%s\tnone\tbind\t0 0" \
% (x[0], x[1]), bindData))
return "\n".join(filter(lambda x: x, [rootLine, otherLines, bindLines]))
class VariableOsInstallFstabEfiConf(VariableOsInstallFstabMountConf):
"""
Переменная содержит часть fstab в которой содержится описание
подключения /boot/efi
"""
def get(self):
devicesForFstab = self.Select([
'os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_options',
'os_install_disk_dev'],
where='os_install_disk_mount',
func=lambda x: x[0].startswith("/boot/efi"))
devicesForFstab = sorted(
devicesForFstab, key=lambda x: self.separateDevice(x[1]))
efiLines = "\n".join(
self.formatFstab(used, dev, mp, fs, opts, "0 0")
for used, mp, fs, opts, dev in devicesForFstab
)
return "\n".join(filter(lambda x: x, [efiLines]))
class VariableOsInstallFstabSwapConf(VariableOsInstallFstabMountConf):
"""
FStab.conf contains swap partition
"""
def get(self):
return "\n".join(map(lambda x: "%s\tnone\tswap\tsw\t0 0" % \
self._commentFstab(x[0], "swap", x[2]),
self.Select(['os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_dev'],
where='os_install_disk_mount',
eq='swap')))
class VariableClInstallType(Variable):
"""
Installation type (extension variable describe, that
install must be to flash or hdd
"""
type = "choice"
value = ""
def choice(self):
return ["", "flash", "hdd"]
def check(self, value):
for dn in ("/proc", "/sys", "/dev", "/dev/pts"):
if not isMount(dn):
raise VariableError(_("%s is not mounted") %dn )
check_fn = '/run/.calculate-rw-check-%d' % os.getpid()
try:
with open(check_fn,'w') as f:
pass
os.unlink(check_fn)
except (IOError,OSError) as e:
raise VariableError(_("Failed to create data in /run"))
class VariableOsInstallDiskSingle(Variable):
"""
Installation disk
"""
type = "choiceedit"
opt = ["--disk", "-d"]
metavalue = 'DISK'
untrusted = True
value = ""
def init(self):
self.label = _("Installation disk")
self.help = _("set the USB Flash device")
def choice(self):
def generator():
device_names = dict(self.ZipVars('os_device_dev',
'os_device_name'))
for disk_dev, disk_type, disk_parent in self.ZipVars(
'os_disk_dev', 'os_disk_type', 'os_disk_parent'):
if disk_type == "disk-partition":
device_name = device_names.get(disk_parent, _("Unknown"))
yield disk_dev, "%s (%s)" % (disk_dev, device_name)
return list(generator())
def check(self, value):
# проверить, чтобы был выбран именно раздел
if value not in self.Get('os_disk_dev'):
raise VariableError(
_("Wrong device '%s'" % value)
)
disktype = self.select('os_disk_type', os_disk_dev=value, limit=1)
if disktype and disktype != "disk-partition":
raise VariableError(
_("Wrong device '%s'" % value))
# проверить, чтобы раздел не использовался системой (не описан в fstab)
mp = self.select('os_disk_mount', os_disk_dev=value, limit=1)
if mp:
raise VariableError(
_("The partition {dev} is already in use as {mp}").format(
dev=value, mp=mp))
# если система загружена с флешки (не iso) - нельзя переустановить
# эту систему
root_type = self.Get('os_root_type_ext')
if root_type in RootType.LiveFlash:
if value == self.Get('os_root_flash_dev'):
raise VariableError(
_("You cannot install the new system instead current"))
# detect using extended partition
disk_part = self.select('os_disk_part', os_disk_dev=value, limit=1)
if disk_part == 'extended':
raise VariableError(
_("Unable to use extended partition %s for installation") %
value)
if "cdrom" in disk_part:
raise VariableError(_("Unable to use CDROM %s for installation") %
value)
if not disk_part or disk_part == 'gpt':
raise VariableError(_("You need a disk with a dos "
"table for Flash install"))
class VariableOsInstallFormatSingleSet(Variable):
"""
Форматировать Flash
"""
type = "bool"
opt = ["--format"]
untrusted = True
value = "off"
def init(self):
self.label = _("Format the USB Flash")
self.help = _("perform the formatting of the USB Flash drive")
def must_be_formatted(self, dev):
fs = self.select('os_disk_format', os_disk_dev=dev, limit=1)
if fs != "vfat":
return True
return False
def cannot_be_formatted(self, dev):
flash_dev = self.Get('os_root_flash_dev')
return flash_dev and dev == flash_dev
def check(self, value):
devs = self.Get('os_disk_dev')
dev = self.Get('os_install_disk_single')
if dev not in devs:
return
if value == "on":
if self.cannot_be_formatted(dev):
raise VariableError(
_("You cannot format the USB Flash which "
"contains the current system"))
else:
if self.must_be_formatted(dev):
raise VariableError(
_("{device} must be formatted").format(device=dev))
if dev:
try:
with FlashDistributive(dev) as f:
dn = f.getDirectory()
df = DiskSpace()
free_size = df.get_free(dev)
squash_fn = path.join(dn, "livecd.squashfs")
if not path.exists(squash_fn):
source = self.Get('cl_image')
if isinstance(source, IsoDistributive):
image_size = source.get_squash_size()
if image_size > free_size:
raise VariableError(
_("Not enough free space on the "
"USB Flash"))
except DistributiveError:
pass
class VariableOsInstallBtrfsCompression(Variable):
"""
Алгоритм сжатия для btrfs, в которых выбрано использовать сжатие
"""
type = "choiceedit"
value = "zstd"
def choice(self):
return ["zlib","lzo","zstd"]
def check(self, value):
if not re.search(r"^(zlib|lzo|zstd|(1[0-9]|1-9))$", value):
raise VariableError(_("Wrong btrfs compression"))