You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/disk.py

2267 lines
81 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
import pty
import fcntl
from subprocess import Popen
from os import path
from itertools import *
from operator import itemgetter
from calculate.install.distr import FlashDistributive, DistributiveError, \
IsoDistributive
from calculate.lib.datavars import (TableVariable, Variable, VariableError,
ReadonlyVariable, ReadonlyTableVariable,
SourceReadonlyVariable, VariableInterface,
HumanReadable)
from calculate.lib.utils.device import (getUdevDeviceInfo, getDeviceType,
getPartitionType, getPartitionDevice,
getRaidPartitions,
getLvmPartitions, getPartitionSize,
humanreadableSize,
getUUIDDict, getCommonDeviceName)
from calculate.install.variables.autopartition import Sizes
from calculate.lib.utils.files import (listDirectory, pathJoin, readFile, FStab,
isMount, getProgPath, DiskSpace)
from calculate.install.fs_manager import FileSystemManager
from calculate.lib.cl_lang import setLocalTranslate, _
from calculate.lib.variables.system import RootType
setLocalTranslate('cl_install3', sys.modules[__name__])
class DeviceHelper(VariableInterface):
sysBlockPath = '/sys/block'
rePassDevice = re.compile("^(?!%s)" % "|".join(['sr', 'fd',
'ram', 'loop']))
def getDeviceFromSysPath(self):
"""Get interest devices from sys block path"""
return filter(self.rePassDevice.search,
listDirectory(self.sysBlockPath))
def separateDevice(self, device):
"""
Separate device word and number on tuple
Using for sort. (Example: sda2 ("sda",2), md5p1 ("md",5,"p",1)
"""
return map(lambda x: int(x) if x.isdigit() else x,
re.findall('\d+|\D+', device))
def mapUdevProperty(self, var, prop, default):
"""Get each element from var through udev [prop]"""
return map(lambda x: getUdevDeviceInfo(name=x).get(prop, default),
self.Get(var))
def getPerfectName(self, device, defaultValue=None):
"""
Get dev name or human-readable lvm name
"""
info = getUdevDeviceInfo(name=device)
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
lvmDeviceName = '/dev/{vg}/{lv}'.format(vg=info['DM_VG_NAME'],
lv=info['DM_LV_NAME'])
if path.exists(lvmDeviceName):
return lvmDeviceName
if defaultValue is None:
return info.get('DEVNAME', '')
else:
return defaultValue
def getLvmName(self, device):
"""
Get lvm name
"""
return self.getPerfectName(device, defaultValue="")
#######################################################
# Devices variables
#######################################################
class VariableOsDeviceData(ReadonlyTableVariable):
"""
Information about disk devices
"""
type = 'table'
source = ['os_device_dev',
'os_device_table',
'os_device_type',
'os_device_ssd_set',
'os_device_virtual_set',
'os_device_map',
'os_device_syspath',
'os_device_name',
'os_device_size']
class VariableOsDeviceDev(DeviceHelper, ReadonlyVariable):
"""
Disk devices
"""
type = "list"
master = None
def init(self):
pass
def get(self):
"""Get device /dev name"""
if self.master is None and not self.Get('cl_ebuild_phase'):
try:
self.master, slave = pty.openpty()
except OSError:
raise VariableError('Failed to create PTY')
udevAdm = getProgPath('/sbin/udevadm')
self.monitor = Popen([udevAdm, "monitor", "--kernel",
"--subsystem-match=block"], stdout=slave,
close_fds=True)
os.close(slave)
fl = fcntl.fcntl(self.master, fcntl.F_GETFL)
fcntl.fcntl(self.master, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def _getDiskName(devpath):
"""Get devname only for by udevadm that devpath is device (disk)"""
prop = getUdevDeviceInfo(devpath)
if (prop.get("ID_TYPE", "") == "disk" or
path.exists(path.join(devpath, "device"))) and \
prop.get("DEVTYPE", "") == "disk":
return prop.get('DEVNAME', '')
# get devices from /sys/block directories(discard mem,sr,loop and other)
return sorted(
filter(lambda x: x,
map(_getDiskName,
map(lambda x: path.join(self.sysBlockPath, x),
self.getDeviceFromSysPath()))),
key=self.separateDevice)
def close(self):
try:
if self.monitor:
self.monitor.kill()
self.monitor.wait()
except Exception:
pass
def refresh(self):
try:
if self.monitor:
res = os.read(self.master, 65535)
if res:
while len(res) == 65535:
res = os.read(self.master, 65535)
self.parent.Invalidate(self.name)
self.parent.Invalidate('os_install_disk_uuid')
self.parent.Invalidate('os_disk_dev')
getUdevDeviceInfo.clearCache()
except OSError as e:
pass
class VariableOsDeviceType(ReadonlyVariable):
"""
Device type (hdd,cdrom,usb-flash)
"""
type = "list"
def getType(self, device):
if path.basename(device) in self.usbdevices:
# check for usb flash (removeable fiel in sysfs contains "1")
removablePath = '/sys/block/%s/removable' % path.basename(device)
if readFile(removablePath).strip() == "1":
return "flash"
else:
return "usb-hdd"
else:
return "hdd"
def get(self):
# get usb device by '/dev/disk/by-id'(usb devices contain 'usb' in name)
diskIdPath = '/dev/disk/by-id'
if path.exists(diskIdPath):
self.usbdevices = \
map(lambda x: \
os.readlink(path.join(diskIdPath, x)).rpartition('/')[
2],
filter(lambda x: x.startswith('usb-'),
listDirectory(diskIdPath)))
else:
self.usbdevices = []
return map(self.getType,
self.Get('os_device_dev'))
class VariableOsDeviceMap(ReadonlyVariable):
"""
Map number for grub
Using for legecy grub (DEPRECATATED)
"""
type = "list"
def get(self):
return map(lambda x: str(x[0]),
enumerate(self.Get('os_device_dev')))
class VariableOsDeviceSsdSet(ReadonlyVariable):
"""
Table on device
"""
type = "list"
ssd_names = ("SSD", "OCZ", "PLEXTOR")
udev_property = 'ID_ATA_ROTATION_RATE_RPM'
def get(self):
"""Get device partition table"""
def isSsd(device, name):
prop = getUdevDeviceInfo(name=device)
rpm = prop.get(self.udev_property, None)
if rpm == "0" or any(x in name for x in self.ssd_names):
return "on"
else:
return "off"
return map(lambda x: isSsd(*x),
zip(self.Get('os_device_dev'),
self.Get('os_device_name')))
class VariableOsDeviceSyspath(ReadonlyVariable):
"""
Table on device
"""
type = "list"
udev_property = 'DEVPATH'
def get(self):
"""Get device partition table"""
def getSysPath(device):
prop = getUdevDeviceInfo(name=device)
syspath = prop.get(self.udev_property, "")
return syspath
return [getSysPath(x) for x in self.Get('os_device_dev')]
class VariableOsDeviceVirtualSet(ReadonlyVariable):
"""
Table on device
"""
type = "list"
virtual_names = ("VBOX", "VMWare")
virtual_syspath = ("virtio",)
def get(self):
"""Get device partition table"""
def isVirtual(device, name, syspath):
if any(x in name for x in self.virtual_names):
return "on"
elif any(x in syspath for x in self.virtual_syspath):
return "on"
else:
return "off"
return map(lambda x: isVirtual(*x),
zip(self.Get('os_device_dev'),
self.Get('os_device_name'),
self.Get('os_device_syspath')))
class VariableOsDeviceTable(ReadonlyVariable):
"""
Table on device
"""
type = "list"
def getTableByChild(self, device):
"""Get table by child partitions"""
syspath = getUdevDeviceInfo(name=device).get('DEVPATH', '')
if not syspath.startswith('/sys'):
syspath = pathJoin('/sys', syspath)
shortnameDevice = path.basename(device)
childs = filter(lambda x: x.startswith(shortnameDevice),
listDirectory(syspath))
if childs:
child = pathJoin(syspath, childs[0])
udevinfo = getUdevDeviceInfo(path=child)
map_names = {'mbr': 'dos',
'msdos': 'dos'}
table = (udevinfo.get('ID_PART_ENTRY_SCHEME', '') or
udevinfo.get('UDISKS_PARTITION_SCHEME', ''))
return map_names.get(table, table)
return ""
def get(self):
"""Get device partition table"""
autopartition = self.Get('cl_autopartition_set') == 'on'
autoDevice = self.Get('cl_autopartition_device')
def getTable(device):
prop = getUdevDeviceInfo(name=device)
return prop.get('ID_PART_TABLE_TYPE',
self.getTableByChild(device))
def getByAutopartition(device):
if autopartition and autoDevice == device:
return self.Get('cl_autopartition_table')
else:
return getTable(device)
return map(getByAutopartition,
self.Get('os_device_dev'))
class VariableOsDeviceName(ReadonlyVariable):
"""
Name of device
"""
type = "list"
def getName(self, device):
prop = getUdevDeviceInfo(name=device)
devicepath = prop.get("DEVPATH", "")
if devicepath:
if not devicepath.startswith("/sys"):
devicepath = pathJoin("/sys", devicepath)
pathVendor = "%s/device/vendor" % devicepath
pathModel = "%s/device/model" % devicepath
return ("%s %s" %
(readFile(pathVendor).strip(),
readFile(pathModel).strip())).strip()
else:
return ""
def get(self):
return map(self.getName,
self.Get('os_device_dev'))
class VariableOsDeviceSize(ReadonlyVariable):
"""
Name of device
"""
type = "list"
def get(self):
"""Get device size"""
return map(lambda x: getPartitionSize(name=x, inBytes=True),
self.Get('os_device_dev'))
def humanReadable(self):
return map(humanreadableSize,
self.Get())
#############################################
# Disk variables
#############################################
class VariableOsDiskData(ReadonlyTableVariable):
"""
Information about current system partition and mounts
"""
source = ['os_disk_dev',
'os_disk_uuid',
'os_disk_name',
'os_disk_size',
'os_disk_part',
'os_disk_format',
'os_disk_type',
'os_disk_raid',
'os_disk_lvm',
'os_disk_parent',
'os_disk_id',
'os_disk_grub']
class VariableOsDiskDev(DeviceHelper, ReadonlyVariable):
"""
List of available partition devices
"""
type = "list"
def selfOrPartition(self, devpath):
"""Return self device or partitions if it has them"""
# search partition in device
# get devices in sysfs which startswith devname
partitions = \
filter(lambda x: x.startswith(path.basename(devpath)),
listDirectory(devpath))
# if partition found then return them or partition on them
if partitions:
return filter(lambda x: x,
map(lambda x: self.selfOrPartition(
path.join(devpath, x)),
partitions))
else:
return devpath
def get(self):
# get device from syspath, then get partitions of this device or
# whole device, convert to single list,
# get for each name from udev
# remove empty
# sort by name and partition number
disks = self.getDeviceFromSysPath()
return list(sorted(
filter(lambda x: x,
map(lambda x: getUdevDeviceInfo(x).get('DEVNAME', ''),
reduce(lambda x, y: x + ([y] if type(y) == str else y),
map(lambda x: self.selfOrPartition(
path.join(self.sysBlockPath, x)),
disks), []))), key=self.separateDevice))
def humanReadable(self):
return map(self.getPerfectName,
self.Get())
class VariableOsDiskMount(DeviceHelper, ReadonlyVariable):
"""
List mounted points for current operation system
"""
type = "list"
def get(self):
disk_hash = self.Get('os_disk_dev')
fstab = FStab('/etc/fstab', devs=disk_hash)
rootdev = self.Get('os_root_dev')
return map(lambda x: '/' if x == rootdev else fstab.getBy(eq=x) or "",
self.Get('os_disk_dev'))
class VariableOsDiskContent(ReadonlyVariable):
"""
Partition content
"""
type = "list"
def get(self):
"""
TODO: need to write
"""
return map(lambda x: "",
self.Get('os_disk_dev'))
class VariableOsDiskFormat(ReadonlyVariable):
"""
Filesystem on device partitions
"""
type = "list"
def get(self):
"""Get current disk filesystem"""
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
def getFormat(device):
prop = getUdevDeviceInfo(name=device)
return prop.get('FSTAB_TYPE') or \
fstab.getBy(what=fstab.TYPE, eq=device) or \
prop.get('ID_FS_TYPE', '')
return map(getFormat,
self.Get('os_disk_dev'))
class VariableOsDiskType(ReadonlyVariable):
"""
List type (lvm,raid,partition,disk)
"""
type = "list"
def get(self):
"""Get partition scheme"""
types = map(lambda x: (x, getDeviceType(name=x)),
self.Get('os_disk_dev'))
lvmUsedDisks = {}
raidUsedDisks = {}
def forMember(typeInfo):
diskName, diskType = typeInfo
if diskName in raidUsedDisks:
diskType = "%s-raidmember(%s)" % (diskType,
raidUsedDisks[diskName])
if diskName in lvmUsedDisks:
diskType = "%s-lvmmember(%s)" % (diskType,
",".join(
lvmUsedDisks[diskName]))
return diskName, diskType
for device, diskType in types:
prop = getUdevDeviceInfo(name=device)
if "raid" in diskType:
for x in getRaidPartitions(prop.get('DEVPATH', '')):
raidUsedDisks[x] = device
if diskType.endswith("lvm"):
for x in getLvmPartitions(prop.get('DM_VG_NAME', ''),
prop.get('DM_LV_NAME', '')):
if x in lvmUsedDisks:
lvmUsedDisks[x].append(device)
else:
lvmUsedDisks[x] = [device]
return map(lambda x: x[1],
map(forMember,
types))
class VariableOsDiskRaid(ReadonlyVariable):
"""
Raids which this partition constructed
"""
type = "list"
def generateRaid(self):
for disktype in self.Get('os_disk_type'):
if "raidmember" in disktype:
yield disktype.rpartition('(')[2][:-1]
else:
yield ""
def get(self):
return list(self.generateRaid())
class VariableOsDiskLvm(DeviceHelper, ReadonlyVariable):
"""
LVM vgname and lvname
"""
type = "list"
def get(self):
"""Get each element from var through udev [prop]"""
return map(self.getLvmName,
self.Get('os_disk_dev'))
class VariableOsDiskUuid(DeviceHelper, ReadonlyVariable):
"""
List uudi for partition devices
"""
type = "list"
def get(self):
return self.mapUdevProperty('os_disk_dev', 'ID_FS_UUID', '')
class VariableOsDiskParent(ReadonlyVariable):
"""
List parent deivces for partition
"""
type = "list"
def get(self):
"""Get disk parent"""
return map(getPartitionDevice,
map(lambda x: getUdevDeviceInfo(name=x).get('DEVPATH', ''),
self.Get('os_disk_dev')))
class VariableOsDiskId(DeviceHelper, ReadonlyVariable):
"""
Partition's system id
"""
type = "list"
def get(self):
"""Get disk id"""
mapTypeUUID = {'ebd0a0a2-b9e5-4433-87c0-68b6b72699c7': '0700',
'0657fd6d-a4ab-43c4-84e5-0933c84b4f4f': '8200',
'a19d880f-05fc-4d3b-a006-743f0f84911e': 'FD00',
'21686148-6449-6e6f-744e-656564454649': 'EF02',
'c12a7328-f81f-11d2-ba4b-00a0c93ec93b': 'EF00',
'0fc63daf-8483-4772-8e79-3d69d8477de4': '8300'}
return map(lambda x: mapTypeUUID.get(x, x),
map(lambda x: x.rpartition("x")[2],
self.mapUdevProperty('os_disk_dev', 'ID_PART_ENTRY_TYPE',
'')))
class VariableOsDiskGrub(ReadonlyVariable):
"""
List grub id for partition devices
"""
type = "list"
def get(self):
"""Get disk grub map"""
devicesMap = dict(zip(self.Get('os_device_dev'),
self.Get('os_device_map')))
def getGrubMap(devParent):
device, parent = devParent
prop = getUdevDeviceInfo(name=device)
partnum = int(prop.get('ID_PART_ENTRY_NUMBER', 0))
if parent in devicesMap.keys() and partnum:
return "%s,%d" % (devicesMap[parent], partnum - 1)
else:
return ""
return map(getGrubMap,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_parent')))
class VariableOsDiskPart(ReadonlyVariable):
"""
Type of partition devices
If msdos then(primary, extended or logical)
If gpt then gpt
"""
type = "list"
def get(self):
return \
map(lambda x: x[0] if x[1] else "",
map(lambda x: (
getPartitionType(getUdevDeviceInfo(name=x[0])), x[1]),
self.ZipVars('os_disk_dev', 'os_disk_parent')))
class VariableOsDiskSize(ReadonlyVariable):
"""
Partition size
"""
type = "list"
def get(self):
"""Get disk size"""
return map(lambda x: getPartitionSize(name=x, inBytes=True),
self.Get('os_disk_dev'))
def humanReadable(self):
return map(humanreadableSize,
self.Get())
class VariableOsDiskName(DeviceHelper, ReadonlyVariable):
"""
Label of partitions
"""
type = "list"
def get(self):
"""Get disk label"""
return self.mapUdevProperty('os_disk_dev', 'ID_FS_LABEL', '')
class VariableOsDiskOptions(ReadonlyVariable):
"""
List mount options
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
def getFormat(device):
return fstab.getBy(what=fstab.OPTS, eq=device)
return map(getFormat,
self.Get('os_disk_dev'))
################################################
# Bind mount points
################################################
class VariableOsBindData(ReadonlyTableVariable):
"""
Table of bind mount points
"""
source = ['os_bind_path',
'os_bind_mountpoint']
class VariableOsBindPath(ReadonlyVariable):
"""
List source bind path
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
return fstab.getBy(what=fstab.NAME, where=fstab.OPTS,
contains="bind", allentry=True)
class VariableOsBindMountpoint(ReadonlyVariable):
"""
Mountpoints for directories bind
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev'))
return fstab.getBy(what=fstab.DIR, where=fstab.OPTS,
contains="bind", allentry=True)
######################################################################
# Userselect partion parameters
######################################################################
class LocationHelper(VariableInterface):
"""
Location variable
"""
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
return ""
class VariableOsLocationBriefData(LocationHelper, TableVariable):
source = ["os_location_source",
"os_location_dest",
"os_location_format",
"os_location_perform_format",
"os_location_size"]
orig_source = [('os_install_disk_dev',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_perform_format',
'os_install_disk_size'),
('os_install_bind_path',
'os_install_bind_mountpoint', '', '', '')]
def init(self):
self.label = _("Layout")
def get(self, hr=HumanReadable.No):
for varname, value in ifilter(lambda x: type(x[1]) != list,
imap(lambda x: (x, self.Get(x)),
ifilter(None,
chain(*self.orig_source)))):
raise VariableError(
_("Source variable %s does not contain a list") % varname)
return list(
chain(*
map(lambda k: map(list,
izip_longest(
*map(lambda x: self.Get(x,
humanreadable=hr) if x else '',
k), fillvalue='')),
self.orig_source))) or [[]]
class VariableOsLocationData(LocationHelper, TableVariable):
"""
Select installation disk variable
"""
opt = ["--disk", "-d"]
metavalue = 'DISK[[:MP[:FS[:FORMAT]]]]'
untrusted = True
source = ["os_location_source",
"os_location_dest",
"os_location_format",
"os_location_perform_format",
"os_location_size"]
check_after = ["os_install_root_type"]
def init(self):
self.help = (_("DISK bound for installation will be mounted to the "
"MP directory. To create a bind mount point, you have "
"to specify the source directory as DISK") + ". " +
_("To change the filesystem, you have to specify it as FS. "
"FORMAT is used for the specifying the need to format "
"partition or not"))
self.label = _("Layout")
def set(self, value):
return sorted(value, key=lambda x: x and x[0])
class VariableOsLocationSource(LocationHelper, DeviceHelper, Variable):
"""
Source disk or directory
"""
type = "choiceedit-list"
def init(self):
self.label = _("Disk or directory")
def availDevs(self, choice=False):
"""
Available devices
"""
if self.Get('cl_install_type') == 'flash':
flashDrives = self.Select('os_device_dev',
where='os_device_type',
eq="flash")
return self.Select('os_disk_dev',
where='os_disk_parent',
_in=flashDrives)
else:
if choice:
return self.Get('os_disk_dev') + self.Get('os_bind_path')
else:
devFrom = self.Get('cl_install_dev_from')
if self.Get('os_install_scratch') == 'on':
return self.Select('os_disk_dev',
where='os_disk_mount',
func=lambda x: x[0] == "swap" or
x[1] == devFrom)
else:
return self.Select('os_disk_dev',
where='os_disk_mount',
func=lambda x: not x[0] in ("", "/") or
x[1] == devFrom) + \
self.Get('os_bind_path')
def get(self):
if self.Get('cl_autopartition_set') == "on":
return map(getCommonDeviceName,
self.Get('cl_autopartition_disk_dev')) + \
self.Get('cl_autopartition_bind_path')
else:
return self.availDevs()
def set(self, value):
def normpath(val):
if type(val) == str and val:
return path.normpath(val)
return val
return map(normpath, value)
def choice(self):
return map(lambda x: (x, self.getPerfectName(x) or x),
self.fixOsDiskDev(self.availDevs(choice=True))) \
+ [("", "")]
def fixOsDiskDev(self, sourcelist=None):
"""
Fix os_disk_dev by autopartitions
"""
if not sourcelist:
sourcelist = self.Get('os_disk_dev')
scheme = self.Get('cl_autopartition_set') == "on"
if scheme:
excludeDisks = self.Select('os_disk_dev',
where='os_disk_parent',
eq=self.Get('cl_autopartition_device'))
appendDisks = self.Get('cl_autopartition_disk_dev') + \
self.Get('cl_autopartition_bind_path')
return list(ifilterfalse(excludeDisks.__contains__,
sourcelist)) + appendDisks
else:
return sourcelist
def check(self, value):
"""Check set location source"""
################################
# check of device specifing
################################
if not value:
raise VariableError(
_("To install the system, you need to specify the root device"))
###########################
# check wrong dev
###########################
disks = filter(lambda x: x.startswith('/dev/'), value)
# get original /dev names
cnDisks = map(lambda x: getUdevDeviceInfo(name=x).get('DEVNAME', x),
disks)
wrongDevices = list(set(cnDisks) - \
set(self.fixOsDiskDev()))
if wrongDevices:
raise VariableError(_("Wrong device '%s'") % wrongDevices[0])
wrongSource = filter(lambda x: x and not x.startswith('/'), value)
if wrongSource:
raise VariableError(
_("Wrong bind mount point '%s'") % wrongSource[0])
##########################
# detect duplicate devices
##########################
dupDevices = list(set(filter(lambda x: disks.count(x) > 1,
disks)))
if dupDevices:
raise VariableError(
_("Device '%s' is used more than once") % dupDevices[0])
class VariableOsLocationDest(LocationHelper, Variable):
"""
Desination directory of install disk data
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mount point")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return self.Get('cl_autopartition_disk_mount') + \
self.Get('cl_autopartition_bind_mountpoint')
else:
source = self.Get('os_location_source')
installFrom = self.Get('cl_install_dev_from')
singleDevice = self.Get('os_install_disk_single')
def installMountPoint(info):
dev, mount = info
if self.Get('cl_action') == 'system':
if self.Get('cl_install_type') == 'flash':
if dev == singleDevice:
return "/"
else:
return ""
else:
if dev == installFrom:
return "/"
elif mount == "/":
return ""
return mount
return map(installMountPoint,
filter(lambda x: x[0] in source,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_mount')) + \
zip(self.Get('os_bind_path'),
self.Get('os_bind_mountpoint'))))
def set(self, value):
"""Add abilitiy not specify root"""
def normpath(val):
if type(val) == str and val:
return path.normpath(val)
return val
def eficonvert(val):
if val.lower() in ("efi", "uefi"):
return "/boot/efi"
else:
return val
value = map(normpath, map(eficonvert, value))
return map(lambda x: x or "/", value)
def choice(self):
if self.Get('cl_install_type') == 'flash':
return ["/", ""]
else:
return ['/', '/boot', '/var/calculate', '/home',
'/usr', '/var', '/tmp', 'swap', '/boot/efi', '']
def check(self, value):
"""Check set location source"""
################################
# check size for root device
################################
osInstallRootType = self.Get('os_install_root_type')
if osInstallRootType != "flash" and \
not "/usr" in value:
for mp, size in filter(lambda x: x[0] == '/' and x[1].isdigit() and \
int(x[1]) < 7 * 1024 * 1024 * 1024,
izip(value,
self.Get("os_location_size"))):
raise VariableError(
_("The root partition should be at least %s") % "7 Gb")
source = self.Get("os_location_source")
################################
# check of root device specifing
################################
if not source:
return
if not filter(lambda x: x == "/", value):
raise VariableError(_("To install the system, you need to "
"specify the root device"))
################################
disks = filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(source, value))
disksDevs = map(lambda x: x[0], disks)
binds = filter(lambda x: not x[0].startswith('/dev/') and x[1],
zip(source, value))
##########################
# detect duplicate mps
##########################
dupMP = list(set(filter(lambda x: value.count(x) > 1,
filter(lambda x: x and x != "swap",
value))))
if dupMP:
raise VariableError(
_("Mount point '%s' is used more than once") % dupMP[0])
#########################
# detect wrong bind
#########################
wrongBind = filter(lambda x: not x[0].startswith("/") or
not x[1].startswith("/"),
binds)
if wrongBind:
raise VariableError(
_("Incorrect mount point (bind '%(bindSrc)s' to "
"'%(bindDst)s')") \
% {'bindSrc': wrongBind[0][0],
'bindDst': wrongBind[0][1]})
#########################################
# Check '/' in start path of dest pointst
#########################################
wrongMP = filter(lambda x: x and not x.startswith("/") and x != "swap",
value)
if wrongMP:
raise VariableError(_("Wrong mount point '%s'") % wrongMP[0])
#########################################
# Check using current root
#########################################
rootDev = self.Get('os_root_dev')
if rootDev in self.Get('os_install_disk_dev'):
raise VariableError(
_("You may not use the current root partition %s for "
"installation") % rootDev)
#################################
# detect using extended partition
#################################
extendedPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_part',
eq='extended', limit=1)
if extendedPartitions:
raise VariableError(
_("Unable to use extended partition %s for installation") %
extendedPartitions)
##########################
# detect using CDROM disks
##########################
cdromPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_type',
like='cdrom', limit=1)
if cdromPartitions:
raise VariableError(_("Unable to use CDROM %s for installation") %
cdromPartitions)
#############################
# detect right part for uefi#
#############################
efiDisks = map(lambda x: getCommonDeviceName(x[0]),
filter(lambda x: x[1] == '/boot/efi',
zip(source, value)))
wrongPart = self.Select('os_install_disk_dev',
where='os_install_disk_type',
ne='disk-partition')
if set(efiDisks) & set(wrongPart):
raise VariableError(_("UEFI partition must be a disk partition"))
###############################
# check cross bind mount points
###############################
DEVICE, MP = 0, 1
srcMountPoints = map(lambda x: x[DEVICE], binds)
destMountPoints = map(lambda x: x[MP], binds)
wrongBind = filter(lambda x: x in destMountPoints, srcMountPoints)
if wrongBind:
incompBind = filter(lambda x: x[1] == wrongBind[0],
zip(srcMountPoints, destMountPoints))
raise VariableError(
_("Source directory %(src)s is already used "
"for binding '%(bindSrc)s' to '%(bindDst)s'") \
% {'src': wrongBind[0],
'bindSrc': incompBind[0][0],
'bindDst': incompBind[0][1]})
#######################################
# check multipart for flash and builder
#######################################
osInstallRootType = self.Get('os_install_root_type')
if osInstallRootType == "flash":
if filter(lambda x: x and x != '/', value):
raise VariableError(
_("Flash install does not support multipartition mode"))
if filter(lambda x: x == "swap", value):
raise VariableError(
_("Flash install does not support swap disks"))
########################################
# check install on member of RAID or LVM
########################################
installTypes = zip(self.Get('os_install_disk_dev'),
self.Get('os_install_disk_type'))
for checkType in ("raid", "lvm"):
memberData = filter(lambda x: checkType + "member" in x[1],
installTypes)
if memberData:
raise VariableError(
_(
"Unable to use {part} partition used by active {typepart} for installation").format(
typepart=checkType.upper(),
part=memberData[0][0]))
class VariableOsLocationFormat(LocationHelper, Variable):
type = "choice-list"
def init(self):
self.label = _("Filesystem")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return self.Get('cl_autopartition_disk_format') + \
map(lambda x: "", self.Get('cl_autopartition_bind_path'))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
value = [""] * len(source)
return map(self.defaultFormat(),
zip(source, mount, value))
def choice(self):
if self.Get('cl_install_type') == "flash":
return ["", "vfat"]
else:
return [""] + self.Get('os_format_type')
def defaultFormat(self):
"""Describe default value for filesystem"""
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
osInstallRootType = self.Get('os_install_root_type')
availFS = set(self.Select('os_format_type',
where='os_format_use',
eq='yes'))
def wrap(info):
dev, mount, fs = info
if mount and not fs and dev.startswith('/dev/'):
if mount == "swap":
return "swap"
elif mount == '/boot/efi':
return "vfat"
if dev in diskFormat and diskFormat[dev] in availFS:
if mount.count('/') == 1:
if FileSystemManager.checkFSForTypeMount(
diskFormat[dev],
osInstallRootType, mount):
return diskFormat[dev]
else:
return diskFormat[dev]
return FileSystemManager.defaultFS.get(osInstallRootType,
"ext4")
return fs
return wrap
def set(self, value):
value = map(lambda x: "vfat" if x == "uefi" else x, value)
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
return map(self.defaultFormat(),
zip(source, mount, value))
def check(self, value):
osInstallRootType = self.Get('os_install_root_type')
devMpFs = zip(self.Get('os_location_source'),
self.Get('os_location_dest'), value)
for dev, mp, fs in devMpFs:
if dev.startswith('/') and not dev.startswith('/dev/') and fs:
raise VariableError(
_("The bind mount point does not use filesystem"))
# check compatible fs for mount point only root dirs
if dev.startswith('/dev/') and mp and (mp.count('/') == 1 or
mp in (
'/var/calculate',
'/boot/efi')):
if not FileSystemManager.checkFSForTypeMount(fs,
osInstallRootType,
mp):
raise VariableError(
_("The filesystem for '%(mp)s' should not be '%(opt)s'")
% {'mp': mp, 'opt': fs} + " " +
_("for {typedisk} install").format(
typedisk=osInstallRootType))
if mp == "swap" and fs != "swap":
raise VariableError(
_(
"The swap partition {dev} must be formatted as swap").format(
dev=dev))
class VariableOsLocationPerformFormat(LocationHelper, Variable):
type = "boolauto-list"
def init(self):
self.label = _("Format")
def get(self):
if self.Get('cl_autopartition_set') == "on":
return map(lambda x: "on",
self.Get('cl_autopartition_disk_format')) + \
map(lambda x: "",
self.Get('cl_autopartition_bind_path'))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
fs = self.Get("os_location_format")
value = [""] * len(source)
return map(self.defaultPerformFormat(),
zip(source, mount, fs, value))
fixNtfs = lambda self, x: {'ntfs-3g': 'ntfs'}.get(x, x)
def check(self, value):
"""Check perform format
Check what format will perform for need partition.
At example on change filesystem on partition.
"""
DEV, MP, FS, FORMAT = 0, 1, 2, 3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
diskMount = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_mount'))))
unavailFS = set(self.Select('os_format_type',
where='os_format_use',
eq="no"))
fixNtfs = self.fixNtfs
for dev, mp, fs, isformat in info:
# should format if change fs or partition is root, but non flash
partitionMustFormat = \
fixNtfs(diskFormat.get(dev, fs)) != fixNtfs(fs) or \
(mp == '/' and
self.Get('os_install_root_type') != 'flash')
# if entry has mount point AND
# partition must was formated
if mp and partitionMustFormat:
# partition use in current system
if diskMount.get(dev, ''):
raise VariableError(
_("{device} must but cannot be formatted, as it is "
"mounted to {mountpoint} on the current system").format(
device=dev, mountpoint=diskMount.get(dev, '')))
if isMount(dev):
raise VariableError(
_("Please unmount {device}, as it will be used for "
"installation").format(device=dev))
# but user select non-format
if not self.isTrue(isformat):
raise VariableError(
_("{device} must be formatted").format(
device=dev))
if self.isTrue(isformat):
if not mp:
raise VariableError(
_("No need to format unused device {dev}").format(
dev=dev))
if fs in unavailFS:
raise VariableError(
_("Filesystem '%s' is not available") % fs)
if not dev.startswith('/dev/'):
raise VariableError(
_("Bind mount points should not be formatted"))
elif diskMount.get(dev, "") and isformat:
raise VariableError(
_(
"{device} must but cannot be formatted, as it is mounted to {mountpoint} on the current system"
).format(
device=dev, mountpoint=diskMount.get(dev, '')))
elif isMount(dev):
raise VariableError(
_("Please unmount disk {device} to "
"use it for install").format(device=dev))
def defaultPerformFormat(self):
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
def wrap(info):
source, dest, fs, isformat = info
fixNtfs = self.fixNtfs
if not isformat and source.startswith('/dev/'):
if dest == '/':
return "on"
if dest and fixNtfs(diskFormat.get(source, fs)) != fixNtfs(fs):
return "on"
return isformat or ("off" if source.startswith('/dev/') else "")
return wrap
def set(self, value):
"""Default values for perform format"""
value = Variable.set(self, value)
DEV, MP, FS, FORMAT = 0, 1, 2, 3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
return map(self.defaultPerformFormat(),
map(lambda x: [x[DEV], x[MP], x[FS], ""] \
if x[FORMAT] == "off" and not x[DEV].startswith("/dev/")
else x,
info))
class VariableOsLocationSize(LocationHelper, SourceReadonlyVariable):
"""
Location size
"""
type = "list"
indexField = "os_location_source"
def init(self):
self.label = _("Size")
def getMap(self):
mapDevSize = dict(self.ZipVars('os_disk_dev', 'os_disk_size'))
mapDevSize.update(
zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_disk_size')))
return mapDevSize
def get(self):
return map(lambda x: x or "",
map(self.getMap().get,
map(getCommonDeviceName,
self.Get(self.indexField))))
def getMapHumanReadable(self):
# return map(lambda x:humanreadableSize(x) if x else "",
# self.Get())
mapDevSize = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_size', humanreadable=True)))
mapDevSize.update(
zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_disk_size', humanreadable=True)))
return mapDevSize
def humanReadable(self):
return map(lambda x: x or "",
map(self.getMapHumanReadable().get,
map(getCommonDeviceName,
self.Get(self.indexField))))
class VariableClUuidSet(Variable):
"""
Use or not UUID for /etc/fstab
"""
type = "bool"
opt = ["--uuid"]
value = "on"
def init(self):
self.label = _("Use UUID")
self.help = _("use UUID")
def uncompatible(self):
"""
Unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return _("Impossible to use UUID for Flash install")
#############################################################
# Install disk parameters
#############################################################
class VariableOsInstallDiskData(ReadonlyTableVariable):
"""
Table of install disk params
"""
source = ["os_install_disk_dev",
"os_install_disk_mount",
"os_install_disk_format",
"os_install_disk_perform_format",
"os_install_disk_options",
"os_install_disk_id",
"os_install_disk_uuid",
"os_install_disk_use",
"os_install_disk_name",
"os_install_disk_size",
"os_install_disk_type",
"os_install_disk_part",
"os_install_disk_parent"]
class VariableOsInstallDiskEfi(ReadonlyVariable):
type = "list"
def get(self):
if self.Get('os_install_root_type') == 'usb-hdd':
validParent = self.Select('os_install_disk_parent_base',
where='os_install_disk_mount_base',
eq='/')
else:
validParent = self.Select('os_device_dev',
where='os_device_type', eq='hdd')
efiPartitions = self.Select('os_disk_dev',
where=['os_disk_id', 'os_disk_parent'],
func=lambda x: x[0] == 'EF00' and x[
1] in validParent)
return list(set(efiPartitions) - set(self.Get('os_location_source')))
class VariableOsAddonDiskDev(DeviceHelper, ReadonlyVariable):
type = "list"
def get(self):
if self.Get('os_install_uefi_set') == 'on':
if not "/boot/efi" in self.Get('os_location_dest'):
efiPartition = self.Get('os_install_disk_efi')
if efiPartition:
return efiPartition[:1]
return []
class VariableOsAddonDiskMount(DeviceHelper, ReadonlyVariable):
type = "list"
def get(self):
if self.Get('os_install_uefi_set') == 'on':
if not "/boot/efi" in self.Get('os_location_dest'):
efiPartition = self.Get('os_install_disk_efi')
if efiPartition:
return ['/boot/efi']
return []
class VariableOsAddonDiskFormat(DeviceHelper, ReadonlyVariable):
type = "list"
def get(self):
if self.Get('os_install_uefi_set') == 'on':
if not "/boot/efi" in self.Get('os_location_dest'):
efiPartition = self.Get('os_install_disk_efi')
if efiPartition:
return ['vfat']
return []
class VariableOsAddonDiskPerformFormat(DeviceHelper, ReadonlyVariable):
type = "list"
def get(self):
if self.Get('os_install_uefi_set') == 'on':
if not "/boot/efi" in self.Get('os_location_dest'):
efiPartition = self.Get('os_install_disk_efi')
if efiPartition:
fsPart = self.Select('os_disk_format', where='os_disk_dev',
eq=efiPartition[0], limit=1)
if fsPart != "vfat":
return ['on']
else:
return ['off']
return []
class VariableOsInstallDiskDevBase(DeviceHelper, ReadonlyVariable):
"""
Variable using for resolv cyclic deps
"""
type = "list"
def get(self):
if self.Get('cl_install_type') == 'flash':
disk = self.Get('os_install_disk_single')
if disk:
return [getCommonDeviceName(disk)]
return []
return map(getCommonDeviceName,
map(lambda x: x[0],
filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest')))))
class VariableOsInstallDiskParentBase(SourceReadonlyVariable):
"""
Partition parent devices using for install
"""
type = "list"
indexField = "os_install_disk_dev_base"
def getMap(self):
diskParent = dict(self.ZipVars('os_disk_dev', 'os_disk_parent'))
# replace value for autopartition
if self.Get('cl_autopartition_set') == 'on':
diskParent.update(zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_device') *
len(self.Get('cl_autopartition_disk_dev'))))
return diskParent
humanReadable = Variable.humanReadable
class VariableOsInstallDiskDev(ReadonlyVariable, DeviceHelper):
"""
Disks for installation
"""
type = "list"
def get(self):
return self.Get('os_addon_disk_dev') + \
self.Get('os_install_disk_dev_base')
def humanReadable(self):
return map(lambda x: self.getPerfectName(x, defaultValue=x),
self.Get())
class VariableOsInstallDiskUuid(ReadonlyVariable):
"""
Uudi for install
"""
type = "list"
def get(self):
diskDev = self.Get('os_install_disk_dev')
hashUUID = getUUIDDict(revers=True)
return map(lambda x: hashUUID.get(x, "")[5:], diskDev)
class VariableOsInstallDiskMountBase(ReadonlyVariable):
"""
List mounted points for installed system
Variable use for resolv cyclic deps by UEFI vars
"""
type = "list"
def get(self):
if self.Get('cl_install_type') == 'flash':
disk = self.Get('os_install_disk_single')
if disk:
return ["/"]
return []
return map(lambda x: x[1],
filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'))))
class VariableOsInstallDiskMount(ReadonlyVariable):
"""
List mounted points for installed system
"""
type = "list"
def get(self):
"""Get install disk dest"""
return self.Get('os_addon_disk_mount') + \
self.Get('os_install_disk_mount_base')
class VariableOsInstallDiskUse(ReadonlyVariable):
"""
/dev/sd or UUID= list (by cl_uuid_set)
"""
type = "list"
def get(self):
"""Get real id (by cl_uuid_set) device"""
if self.Get('cl_uuid_set') == "on":
return map(lambda x: "UUID=%s" % x[0] if x[0] else x[1],
zip(self.Get('os_install_disk_uuid'),
self.Get('os_install_disk_dev')))
else:
return self.Get('os_install_disk_dev')
class VariableOsInstallDiskOptions(ReadonlyVariable):
"""
List mount options of installed os
"""
type = "list"
def get(self):
def postProcess(opt):
l_opt = filter(None, opt.split(','))
return ",".join(l_opt)
diskOpts = dict(filter(lambda x: x[1],
zip(self.Get('os_disk_dev'),
self.Get('os_disk_options'))))
ssdDisk = dict(self.ZipVars('install.os_device_dev',
'install.os_device_ssd_set'))
return map(postProcess,
map(lambda x: diskOpts.get(x[0], x[1]),
map(lambda x: (
x[0], FileSystemManager.getDefaultOpt(x[1],
ssd=ssdDisk.get(
x[2],
'off') == 'on')),
self.ZipVars('os_install_disk_dev',
'os_install_disk_format',
'os_install_disk_parent'))))
class VariableOsInstallDiskFormat(ReadonlyVariable):
"""
Install list filesystem for partition devices
"""
type = "choice-list"
def get(self):
_format = map(lambda x: x[2],
filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'))))
return self.Get('os_addon_disk_format') + _format
class VariableOsInstallDiskPerformFormat(ReadonlyVariable):
"""
List need for format
"""
type = "bool-list"
def get(self):
_format = map(lambda x: x[2],
filter(lambda x: x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_perform_format'))))
return self.Get('os_addon_disk_perform_format') + _format
class VariableOsInstallDiskId(ReadonlyVariable):
"""
Install partition's system id
"""
type = "list"
def get(self):
diskId = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_id')))
diskId.update(zip(self.Get('cl_autopartition_disk_dev'),
[""] * len(self.Get('cl_autopartition_disk_dev'))))
def getIdByFS(fs, parttable, oldid, mp):
if parttable == "gpt":
if mp == '/boot/efi':
fs = 'uefi'
fsinfo = FileSystemManager.supportFS.get(fs, {})
return fsinfo.get('gpt', '8300')
elif parttable in ("primary", "extended", "logical"):
fsinfo = FileSystemManager.supportFS.get(fs, {})
return fsinfo.get('msdos', '83')
return oldid
FORMAT, PART, DEV, MOUNT, PARENT = 0, 1, 2, 3, 4
return map(lambda x: getIdByFS(x[FORMAT],
x[PART], diskId.get(x[DEV], ''), x[MOUNT]
) if x[PARENT] else '',
self.ZipVars('os_install_disk_format',
'os_install_disk_part',
'os_install_disk_dev',
'os_install_disk_mount',
'os_install_disk_parent'))
class VariableOsInstallDiskName(Variable):
"""
New labels for disk
"""
type = "list"
def get(self):
diskLabel = dict(self.ZipVars('os_disk_dev', 'os_disk_name'))
def changeLabel(info):
dev, mount = info
if mount == '/':
return "%s-%s" % (self.Get('os_install_linux_shortname'),
self.Get('os_install_linux_ver'))
else:
return diskLabel.get(dev, '')
return map(changeLabel,
self.ZipVars('os_install_disk_dev',
'os_install_disk_mount'))
class VariableOsInstallDiskSize(SourceReadonlyVariable):
"""
New partition sizes (for feature change partition)
"""
type = "list"
indexField = 'os_install_disk_dev'
def getMap(self):
retDict = dict(self.ZipVars('os_disk_dev', 'os_disk_size'))
retDict.update(
dict(self.ZipVars('os_location_source', 'os_location_size')))
return retDict
def getMapHumanReadable(self):
retDict = dict(zip(map(getCommonDeviceName,
self.Get('os_disk_dev')),
self.Get('os_disk_size', humanreadable=True)))
retDict.update(dict(zip(map(getCommonDeviceName,
self.Get('os_location_source')),
self.Get('os_location_size',
humanreadable=True))))
return retDict
class VariableOsInstallDiskType(SourceReadonlyVariable):
"""
New partition scheme (for feature change partition)
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskType = dict(self.ZipVars('os_disk_dev', 'os_disk_type'))
diskType.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_type'))
return diskType
humanReadable = Variable.humanReadable
class VariableOsInstallDiskParent(SourceReadonlyVariable):
"""
Partition parent devices using for install
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskParent = dict(self.ZipVars('os_disk_dev', 'os_disk_parent'))
# replace value for autopartition
if self.Get('cl_autopartition_set') == 'on':
diskParent.update(zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_device') *
len(self.Get('cl_autopartition_disk_dev'))))
return diskParent
humanReadable = Variable.humanReadable
class VariableOsInstallDiskPart(SourceReadonlyVariable):
"""
Get new type partitions using for install
"""
type = "list"
indexField = "os_install_disk_dev"
def getMap(self):
diskPart = dict(self.ZipVars('os_disk_dev', 'os_disk_part'))
diskPart.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_part'))
return diskPart
humanReadable = Variable.humanReadable
class VariableOsInstallBindData(ReadonlyTableVariable):
"""
Table of install bind mount points
"""
source = ['os_install_bind_path',
'os_install_bind_mountpoint']
class VariableOsInstallBindPath(ReadonlyVariable):
"""
Install directories for bind
"""
type = "list"
def get(self):
"""Get install bind source"""
return self.Select('os_location_source',
where='os_location_dest',
func=lambda x: not x[1].startswith('/dev/') and x[0])
class VariableOsInstallBindMountpoint(ReadonlyVariable):
"""
Mountpoint for install directories bind
"""
def get(self):
return self.Select('os_location_dest',
where='os_location_source',
func=lambda x: not x[0].startswith('/dev/') and x[1])
class VariableOsInstallBootloader(ReadonlyVariable):
"""
Bootloader for brief information
"""
def init(self):
self.label = _("Bootloader")
def get(self):
if self.Get('os_install_uefi_set') == 'on':
return "uefi"
else:
return ",".join(self.Get('os_install_mbr'))
def humanReadable(self):
if self.Get('os_install_uefi_set') == 'on':
return _("UEFI")
else:
return ",".join(self.Get('os_install_mbr', humanreadable=True))
class VariableOsInstallMbr(LocationHelper, Variable):
"""
Disks for boot mbr
"""
type = "choice-list"
element = "selecttable"
opt = ["--mbr"]
metavalue = "MBR"
untrusted = True
check_after = ["os_install_uefi_set"]
def init(self):
self.label = _("Boot disk")
self.help = _("boot disk for the system bound for install")
def get(self):
"""Get default Master boot record install"""
if self.Get('os_install_uefi_set') == 'on':
return []
if self.Get('cl_autopartition_set') == 'on' and \
self.Get('cl_autopartition_lvm_set') == 'on':
return self.Get('cl_autopartition_device')
if self.Get('os_install_root_type') in ("flash", "usb-hdd"):
rootdev = self.Get('os_install_root_dev')
device = filter(lambda x: x in rootdev,
self.Get('os_device_dev'))
if device:
return [device[0]]
else:
return []
bootDev = self.Select('os_install_disk_dev',
where='os_install_disk_mount',
_in=('/', '/boot'), sort="DESC", limit=1)
# if install on lvm detected
if self.Get('os_install_lvm_set') == "on":
if bootDev:
lvmDisk = self.Select('os_disk_lvm',
where='os_disk_dev',
eq=bootDev, limit=1)
if lvmDisk:
vgName, lvName = lvmDisk[5:].split('/')
disks = self.Select('os_lvm_pvname',
where='os_lvm_vgname',
eq=vgName)
devices = self.Select('os_disk_parent',
where='os_disk_dev',
_in=disks)
res = map(itemgetter(0),
filter(lambda x: x[0] in devices and x[1],
self.ZipVars('os_device_dev',
'os_device_table')))
if res:
return res
if self.Get('os_install_mdadm_set') == "on":
if bootDev:
disks = self.Select('os_disk_parent',
where='os_disk_type',
like='raidmember\(%s\)' % bootDev)
res = map(itemgetter(0),
filter(lambda x: x[0] in disks and x[1],
self.ZipVars('os_device_dev',
'os_device_table')))
if res:
return res
# if loaded system livecd
if self.Get('os_root_type') == "livecd":
bootDevice = self.Select('os_install_disk_parent',
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC")[:1]
# search /boot device or / device, by priority /boot,/
if bootDevice:
return bootDevice
return [self.Select('os_device_dev', where='os_device_type', eq='hdd',
limit=1)]
def choice(self):
deviceParentMap = dict(self.ZipVars('os_device_dev', 'os_device_name'))
return map(lambda x: (x, "%s (%s)" % (x,
deviceParentMap.get(x, _(
"Unknown")))),
self.Get('os_device_dev'))
def set(self, value):
# support off value
return filter(lambda x: x != "off", value)
def check(self, value):
rootType = self.Get('os_install_root_type')
if rootType == "flash":
if len(value) > 1:
raise VariableError(
_("For Flash install, you need only one disk"))
if self.Get('os_install_uefi_set') == 'on':
return
useBtrfs = "btrfs" in self.Select('os_install_disk_format',
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC")[:1]
for mbrDisk in value:
if self.Get('cl_autopartition_set') == 'on':
tableOnBootDisk = self.Get('cl_autopartition_table')
else:
tableOnBootDisk = self.Select('os_device_table',
where="os_device_dev", eq=mbrDisk,
limit=1)
if not tableOnBootDisk:
raise VariableError(
_("Disk '%s' needs a partition table for the boot record") %
mbrDisk)
if rootType == "flash":
if tableOnBootDisk == "gpt":
raise VariableError(_("You need a disk with a dos "
"table for Flash install"))
if tableOnBootDisk == "dos" and useBtrfs:
raise VariableError(_("You need a disk with a GPT table for "
"install on a btrfs partition"))
if rootType in ("usb-hdd", "hdd") and tableOnBootDisk == "gpt":
efisize = self.Select('os_disk_size',
where=['os_disk_id', 'os_disk_parent'],
func=lambda os_disk_id, os_disk_parent: (
os_disk_id == 'EF02' and os_disk_parent == mbrDisk),
limit=1)
bios_grub_size = self.Get('cl_autopartition_bios_grub_size')
minsize = "%dMb" % (int(bios_grub_size) / Sizes.M)
if not efisize:
raise VariableError(
_("Your boot device must have a BIOS Boot partition "
"({minsize})").format(minsize=minsize))
# проверка размера EF02 при установке на btrfs
elif useBtrfs:
if (efisize.isdigit() and bios_grub_size.isdigit() and
int(efisize) < int(bios_grub_size)):
raise VariableError(
_("Your boot device must have a BIOS Boot "
"partition ({minsize})").format(minsize=minsize))
if value:
if self.Get('os_grub2_path'):
self.checkForGrub2()
else:
self.checkForLegacyGrub()
def checkForGrub2(self):
"""
Проверить текущую конфигурацию диска для установки GRUB2
"""
grubDiskType = self.Select('os_install_disk_parent',
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC", limit=1)
if "lvm-raid" in grubDiskType:
raise VariableError(
_(
"Grub does not support booting from a RAID assembled from a LVM")
+ ". " +
_("Try to use a separate /boot partition"))
if grubDiskType.count("raid") > 1:
raise VariableError(
_("Grub does not support booting from a RAID assembled "
"from another RAID")
+ ". " +
_("Try to use a separate /boot partition"))
def checkForLegacyGrub(self):
"""Check current disk configuration for installation for install
legacy grub"""
bootDiskType, bootDiskFormat = \
self.Select(['os_install_disk_type',
'os_install_disk_format'],
where='os_install_disk_mount',
_in=('/', '/boot'),
sort="DESC", limit=1)
if "lvm" in bootDiskType or "raid" in bootDiskType:
raise ValueError(
_(
"Legacy grub requires a separate /boot partition to support boot from a RAID or a LVM"))
if bootDiskFormat in ("btrfs", "nilfs2"):
raise ValueError(
_("To support booting from %s, legacy grub needs a "
"separate /boot partition") % bootDiskFormat)
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('os_install_uefi_set') == "on":
return _("MBR is not used with the UEFI bootloader")
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
return ""
class VariableOsInstallRootType(LocationHelper, Variable):
"""
Type of installation
"""
opt = ["--type"]
metavalue = "DISKTYPE"
type = "choice"
def init(self):
self.help = _("device type for the system bound for install")
self.label = _("Installation type")
def get(self):
selectRootType = self.Get('cl_install_type')
if not selectRootType:
return self.Get('os_root_type')
if selectRootType == "flash":
return "flash"
else:
rootdev = self.Get('os_install_root_dev')
device = getPartitionDevice(
getUdevDeviceInfo(name=rootdev).get('DEVPATH', ''))
device_type = self.Select(
'os_device_type', where='os_device_dev', eq=device, limit=1)
if device_type in ("usb-hdd", "flash"):
return "usb-hdd"
return "hdd"
def choice(self):
return [("hdd", _("Hard disk")),
("flash", _("USB Flash")),
("usb-hdd", _("USB Hard Disk"))]
class VariableOsInstallRootDev(ReadonlyVariable):
def get(self):
"""Get install root device"""
if self.Get('cl_action') == 'system':
return self.Select('os_install_disk_dev_base',
where='os_install_disk_mount_base',
eq="/", limit=1) or ''
else:
return self.Get('os_root_dev')
class VariableOsInstallRootUuid(ReadonlyVariable):
def get(self):
"""UUID корневого устройства"""
if self.Get('cl_action') == 'system':
root_dev = self.Get('os_install_root_dev')
return self.Select('os_install_disk_uuid',
where='os_install_disk_dev',
eq=root_dev, limit=1) or ''
else:
root_dev = self.Get('os_root_dev')
return self.Select('os_disk_uuid',
where='os_disk_dev',
eq=root_dev, limit=1) or ''
class VariableOsInstallFstabMountConf(DeviceHelper, ReadonlyVariable):
"""
FStab.conf contains for mount and bind points
"""
def _commentFstab(self, s, mp, dev):
"""Generate comment for /etc/fstab each line"""
if s.startswith("UUID"):
return "# %s was on %s during installation\n%s" % (mp, dev, s)
else:
return s
def get(self):
devicesForFstab = sorted(
self.Select(['os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_options',
'os_install_disk_dev'],
where='os_install_disk_mount',
func=lambda x: x[0] != "" and x[0] != "swap"),
lambda x, y: cmp(self.separateDevice(x[1]),
self.separateDevice(y[1])))
if self.Get('os_install_scratch') == "on":
devicesForFstab = filter(lambda x: x[1] != "/", devicesForFstab)
# rootLine one string, but it correct work if devicesForFstab is empty
rootLine = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 1" %
(
self._commentFstab(x[0], x[1],
x[4]),
x[1], x[2], x[3]),
devicesForFstab[:1]))
otherLines = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 0" %
(self._commentFstab(x[0], x[1],
x[4]), x[1],
x[2], x[3]),
devicesForFstab[1:]))
bindData = self.ZipVars('os_install_bind_path',
'os_install_bind_mountpoint')
bindLines = "\n".join(map(lambda x: "%s\t%s\tnone\tbind\t0 0" \
% (x[0], x[1]), bindData))
return "\n".join(filter(lambda x: x, [rootLine, otherLines, bindLines]))
class VariableOsInstallFstabSwapConf(VariableOsInstallFstabMountConf):
"""
FStab.conf contains swap partition
"""
def get(self):
return "\n".join(map(lambda x: "%s\tnone\tswap\tsw\t0 0" % \
self._commentFstab(x[0], "swap", x[2]),
self.Select(['os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_dev'],
where='os_install_disk_mount',
eq='swap')))
class VariableClInstallType(Variable):
"""
Installation type (extension variable describe, that
install must be to flash or hdd
"""
type = "choice"
value = ""
def choice(self):
return ["", "flash", "hdd"]
class VariableOsInstallDiskSingle(Variable):
"""
Installation disk
"""
type = "choiceedit"
opt = ["--disk", "-d"]
metavalue = 'DISK'
untrusted = True
value = ""
def init(self):
self.label = _("Installation disk")
self.help = _("set the USB Flash device")
def choice(self):
diskParentMap = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_parent')))
deviceParentMap = dict(self.ZipVars('os_device_dev', 'os_device_name'))
disks = self.select('os_disk_dev', os_disk_part__ne="")
return map(lambda x: (x, "%s (%s)" % (x,
deviceParentMap.get(
diskParentMap.get(x, x),
_("Unknown")))),
disks)
def check(self, value):
# проверить, чтобы был выбран именно раздел
if value not in self.Get('os_disk_dev'):
raise VariableError(
_("Wrong device '%s'" % value)
)
# проверить, чтобы раздел не использовался системой (не описан в fstab)
mp = self.select('os_disk_mount', os_disk_dev=value, limit=1)
if mp:
raise VariableError(
_("The partition {dev} is already in use as {mp}").format(
dev=value, mp=mp))
# если система загружена с флешки (не iso) - нельзя переустановить
# эту систему
root_type = self.Get('os_root_type_ext')
if root_type in RootType.LiveFlash:
if value == self.Get('os_root_flash_dev'):
raise VariableError(
_("You cannot install the new system instead current"))
# detect using extended partition
disk_part = self.select('os_disk_part', os_disk_dev=value, limit=1)
if disk_part == 'extended':
raise VariableError(
_("Unable to use extended partition %s for installation") %
value)
if "cdrom" in disk_part:
raise VariableError(_("Unable to use CDROM %s for installation") %
value)
if disk_part == 'gpt':
raise VariableError(_("You need a disk with a dos "
"table for Flash install"))
class VariableOsInstallFormatSingleSet(Variable):
"""
Форматировать Flash
"""
type = "bool"
opt = ["--format"]
untrusted = True
value = "off"
def init(self):
self.label = _("Format the USB Flash")
self.help = _("perform the formatting of the USB Flash drive")
def must_be_formatted(self, dev):
fs = self.select('os_disk_format', os_disk_dev=dev, limit=1)
if fs != "vfat":
return True
return False
def cannot_be_formatted(self, dev):
flash_dev = self.Get('os_root_flash_dev')
return flash_dev and dev == flash_dev
def check(self, value):
devs = self.Get('os_disk_dev')
dev = self.Get('os_install_disk_single')
if dev not in devs:
return
if value == "on":
if self.cannot_be_formatted(dev):
raise VariableError(
_("You cannot format the USB Flash which "
"contains the current system"))
else:
if self.must_be_formatted(dev):
raise VariableError(
_("{device} must be formatted").format(device=dev))
if dev:
try:
with FlashDistributive(dev) as f:
dn = f.getDirectory()
df = DiskSpace()
free_size = df.get_free(dev)
squash_fn = path.join(dn, "livecd.squashfs")
if not path.exists(squash_fn):
source = self.Get('cl_image')
if isinstance(source, IsoDistributive):
image_size = source.get_squash_size()
if image_size > free_size:
raise VariableError(
_("Not enough free space on the "
"USB Flash"))
except DistributiveError:
pass