You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/install/variables/disk.py

1430 lines
50 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path
from calculate.lib.datavars import Variable,VariableError,ReadonlyVariable
from calculate.lib.utils.device import (getUdevDeviceInfo,getDeviceType,
getPartitionType,getPartitionDevice,getRaidPartitions,
getLvmPartitions,getPartitionSize, humanreadableSize,
getUUIDDict)
from calculate.lib.utils.files import listDirectory,pathJoin,readFile,FStab
from calculate.install.cl_distr import PartitionDistributive
from calculate.install.cl_install import FileSystemManager
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_install',sys.modules[__name__])
class DeviceHelper:
sysBlockPath = '/sys/block'
rePassDevice = re.compile("^(?!%s)"%"|".join(['fd','ram','loop']))
def getDeviceFromSysPath(self):
"""Get interest devices from sys block path"""
return filter(self.rePassDevice.search,
listDirectory(self.sysBlockPath))
def separateDevice(self,device):
"""
Separate device word and number on tuple
Using for sort. (Example: sda2 ("sda",2), md5p1 ("md",5,"p",1)
"""
return map(lambda x: int(x) if x.isdigit() else x,
re.findall('\d+|\D+',device))
def mapUdevProperty(self,var,prop,default):
"""Get each element from var through udev [prop]"""
return map(lambda x:getUdevDeviceInfo(name=x).get(prop,default),
self.Get(var))
#######################################################
# Devices variables
#######################################################
class VariableOsDeviceData(ReadonlyVariable):
"""
Information about disk devices
"""
type = 'table'
source = ['os_device_dev',
'os_device_table',
'os_device_type',
'os_device_map',
'os_device_name',
'os_device_size']
class VariableOsDeviceDev(ReadonlyVariable,DeviceHelper):
"""
Disk devices
"""
type = "list"
def get(self):
"""Get device /dev name"""
def _getDiskName(devpath):
"""Get devname only for by udevadm that devpath is device (disk)"""
prop = getUdevDeviceInfo(devpath)
if prop.get("ID_TYPE","")=="disk" and \
prop.get("DEVTYPE","")=="disk":
return prop.get('DEVNAME','')
# get devices from /sys/block directories(discard mem,sr,loop and other)
return sorted(
filter(lambda x:x,
map(_getDiskName,
map(lambda x:path.join(self.sysBlockPath,x),
self.getDeviceFromSysPath()))),
key=self.separateDevice)
class VariableOsDeviceType(ReadonlyVariable):
"""
Device type (hdd,cdrom,usb-flash)
"""
type = "list"
def getType(self,device):
if path.basename(device) in self.usbdevices:
# check for usb flash (removeable fiel in sysfs contains "1")
removablePath = '/sys/block/%s/removable'%path.basename(device)
if readFile(removablePath).strip() == "1":
return "flash"
else:
return "usb-hdd"
else:
return "hdd"
def get(self):
# get usb device by '/dev/disk/by-id'(usb devices contain 'usb' in name)
diskIdPath = '/dev/disk/by-id'
if path.exists(diskIdPath):
self.usbdevices = \
map(lambda x: \
os.readlink(path.join(diskIdPath,x)).rpartition('/')[2],
filter(lambda x: x.startswith('usb-'),listDirectory(diskIdPath)))
else:
self.usbdevices = []
return map(self.getType,
self.Get('os_device_dev'))
class VariableOsDeviceMap(ReadonlyVariable):
"""
Map number for grub
Using for legecy grub (DEPRECATATED)
"""
type = "list"
def get(self):
return map(lambda x:x[0],
enumerate(self.Get('os_device_dev')))
class VariableOsDeviceTable(ReadonlyVariable):
"""
Table on device
"""
type = "list"
def getTableByChild(self,device):
"""Get table by child partitions"""
syspath = getUdevDeviceInfo(name=device).get('DEVPATH','')
if not syspath.startswith('/sys'):
syspath = pathJoin('/sys',syspath)
shortnameDevice = path.basename(device)
childs = filter(lambda x:x.startswith(shortnameDevice),
listDirectory(syspath))
if childs:
child = pathJoin(syspath,childs[0])
return getUdevDeviceInfo(path=child).get('ID_PART_ENTRY_SCHEME','')
return ""
def get(self):
"""Get device partition table"""
autopartition = self.Get('cl_autopartition_scheme')
autoDevice = self.Get('cl_autopartition_device')
def getTable(device):
prop = getUdevDeviceInfo(name=device)
return prop.get('ID_PART_TABLE_TYPE',
self.getTableByChild(device))
def getByAutopartition(device):
if autopartition and autoDevice == device:
return self.Get('cl_autopartition_table')
else:
return getTable(device)
return map(getByAutopartition,
self.Get('os_device_dev'))
class VariableOsDeviceName(ReadonlyVariable):
"""
Name of device
"""
type = "list"
def getName(self,device):
prop = getUdevDeviceInfo(name=device)
devicepath = prop.get("DEVPATH","")
if devicepath:
if not devicepath.startswith("/sys"):
devicepath = pathJoin("/sys",devicepath)
pathVendor = "%s/device/vendor"%devicepath
pathModel = "%s/device/model"%devicepath
return ("%s %s"% \
(readFile(pathVendor).strip(),
readFile(pathModel).strip())).strip()
else:
return ""
def get(self):
return map(self.getName,
self.Get('os_device_dev'))
class VariableOsDeviceSize(ReadonlyVariable):
"""
Name of device
"""
type = "list"
def get(self):
"""Get device size"""
return map(lambda x:getPartitionSize(name=x,inBytes=True),
self.Get('os_device_dev'))
def humanReadable(self):
return map(humanreadableSize,
self.Get())
#############################################
# Disk variables
#############################################
class VariableOsDiskData(ReadonlyVariable):
"""
Information about current system partition and mounts
"""
type = "table"
source = ['os_disk_dev',
'os_disk_uuid',
'os_disk_name',
'os_disk_size',
'os_disk_part',
'os_disk_format',
'os_disk_type',
'os_disk_parent',
'os_disk_id',
'os_disk_grub']
class VariableOsDiskDev(ReadonlyVariable,DeviceHelper):
"""
List of available partition devices
"""
type = "list"
def selfOrPartition(self,devpath):
"""Return self device or partitions if it has them"""
# search partition in device
# get devices in sysfs which startswith devname
partitions = \
filter(lambda x:x.startswith(path.basename(devpath)),
listDirectory(devpath))
# if partition found then return them or partition on them
if partitions:
return filter(lambda x:x,
map(lambda x:self.selfOrPartition(path.join(devpath,x)),
partitions))
else:
return devpath
def get(self):
# get device from syspath, then get partitions of this device or
# whole device, convert to single list,
# get for each name from udev
# remove empty
# sort by name and partition number
disks = self.getDeviceFromSysPath()
return list(sorted(
filter(lambda x:x,
map(lambda x:getUdevDeviceInfo(x).get('DEVNAME',''),
reduce(lambda x,y:x+([y] if type(y) == str else y),
map(lambda x:self.selfOrPartition(
path.join(self.sysBlockPath,x)),
disks),[]))),key=self.separateDevice))
class VariableOsDiskMount(ReadonlyVariable,DeviceHelper):
"""
List mounted points for current operation system
"""
type = "list"
def get(self):
disk_hash = self.Get('os_disk_dev')
fstab = FStab('/etc/fstab')
rootdev = self.Get('os_root_dev')
return map(lambda x: '/' if x == rootdev else fstab.getBy(eq=x) or "",
self.Get('os_disk_dev'))
class VariableOsDiskContent(ReadonlyVariable):
"""
Partition content
"""
type = "list"
def get(self):
"""
TODO: need to write
"""
return map(lambda x:"",
self.Get('os_disk_dev'))
class VariableOsDiskFormat(ReadonlyVariable):
"""
Filesystem on device partitions
"""
type = "list"
def get(self):
"""Get current disk file system"""
fstab = FStab('/etc/fstab')
def getFormat(device):
prop = getUdevDeviceInfo(name=device)
return prop.get('FSTAB_TYPE') or \
fstab.getBy(what=fstab.TYPE,eq=device) or \
prop.get('ID_FS_TYPE','')
return map(getFormat,
self.Get('os_disk_dev'))
class VariableOsDiskType(ReadonlyVariable):
"""
List type (lvm,raid,partition,disk)
"""
type = "list"
def get(self):
"""Get partition scheme"""
types = map(lambda x:(x,getDeviceType(name=x)),
self.Get('os_disk_dev'))
lvmUsedDisks = {}
raidUsedDisks = {}
def forMember(typeInfo):
diskName,diskType = typeInfo
if diskName in raidUsedDisks:
diskType = "%s-raidmember(%s)"%(diskType,
raidUsedDisks[diskName])
if diskName in lvmUsedDisks:
diskType = "%s-lvmmember(%s)"%(diskType,
",".join(lvmUsedDisks[diskName]))
return diskName,diskType
for device,diskType in types:
prop = getUdevDeviceInfo(name=device)
if "raid" in diskType:
for x in getRaidPartitions(prop.get('DEVPATH','')):
raidUsedDisks[x]=device
if diskType.endswith("lvm"):
for x in getLvmPartitions(prop.get('DM_VG_NAME',''),
prop.get('DM_LV_NAME','')):
if x in lvmUsedDisks:
lvmUsedDisks[x].append(device)
else:
lvmUsedDisks[x] = [device]
return map(lambda x:x[1],
map(forMember,
types))
class VariableOsDiskUuid(ReadonlyVariable,DeviceHelper):
"""
List uudi for partition devices
"""
type = "list"
def get(self):
return self.mapUdevProperty('os_disk_dev','ID_FS_UUID','')
class VariableOsDiskParent(ReadonlyVariable):
"""
List parent deivces for partition
"""
type = "list"
def get(self):
"""Get disk parent"""
return map(getPartitionDevice,
map(lambda x:getUdevDeviceInfo(name=x).get('DEVPATH',''),
self.Get('os_disk_dev')))
class VariableOsDiskId(ReadonlyVariable,DeviceHelper):
"""
Partition's system id
"""
type = "list"
def get(self):
"""Get disk id"""
mapTypeUUID = {'ebd0a0a2-b9e5-4433-87c0-68b6b72699c7':'0700',
'0657fd6d-a4ab-43c4-84e5-0933c84b4f4f':'8200',
'a19d880f-05fc-4d3b-a006-743f0f84911e':'FD00',
'21686148-6449-6e6f-744e-656564454649':'EF02'}
return map(lambda x:mapTypeUUID.get(x,x),
map(lambda x:x.rpartition("x")[2],
self.mapUdevProperty('os_disk_dev','ID_PART_ENTRY_TYPE','')))
class VariableOsDiskGrub(ReadonlyVariable):
"""
List grub id for partition devices
"""
type = "list"
def get(self):
"""Get disk grub map"""
devicesMap = dict(zip(self.Get('os_device_dev'),
self.Get('os_device_map')))
def getGrubMap(devParent):
device,parent = devParent
prop = getUdevDeviceInfo(name=device)
partnum = int(prop.get('ID_PART_ENTRY_NUMBER',0))
if parent in devicesMap.keys() and partnum:
return "%s,%d"%(devicesMap[parent],partnum-1)
else:
return ""
return map(getGrubMap,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_parent')))
class VariableOsDiskPart(ReadonlyVariable):
"""
Type of partition devices
If msdos then(primary, extended or logical)
If gpt then gpt
"""
type = "list"
def get(self):
return \
map(lambda x:x[0] if x[1] else "",
map(lambda x:(getPartitionType(getUdevDeviceInfo(name=x[0])),x[1]),
self.ZipVars('os_disk_dev','os_disk_parent')))
class VariableOsDiskSize(ReadonlyVariable):
"""
Partition size
"""
type = "list"
def get(self):
"""Get disk size"""
return map(lambda x:getPartitionSize(name=x),
self.Get('os_disk_dev'))
class VariableOsDiskName(ReadonlyVariable,DeviceHelper):
"""
Label of partitions
"""
type = "list"
def get(self):
"""Get disk label"""
return self.mapUdevProperty('os_disk_dev','ID_FS_LABEL','')
class VariableOsDiskOptions(ReadonlyVariable):
"""
List mount options
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab')
def getFormat(device):
return fstab.getBy(what=fstab.OPTS,eq=device)
return map(getFormat,
self.Get('os_disk_dev'))
################################################
# Bind mount points
################################################
class VariableOsBindData(ReadonlyVariable):
"""
Table of bind mount points
"""
type = "table"
source = ['os_bind_path',
'os_bind_mountpoint']
class VariableOsBindPath(ReadonlyVariable):
"""
List source bind path
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab')
return fstab.getBy(what=fstab.NAME,where=fstab.OPTS,
_in="bind",allentry=True)
class VariableOsBindMountpoint(ReadonlyVariable):
"""
Mountpoints for directories bind
"""
type = "list"
def get(self):
fstab = FStab('/etc/fstab')
return fstab.getBy(what=fstab.DIR,where=fstab.OPTS,
_in="bind",allentry=True)
######################################################################
# Userselect partion parameters
######################################################################
class LocationVariable:
"""
Location variable
"""
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('cl_autopartition_scheme'):
return \
_("Location not available with autopartitioning")
return ""
class VariableOsLocationData(LocationVariable,Variable):
"""
Select installation disk variable
"""
type = "table"
opt = ["--disk","-d"]
metavalue = 'DISKS'
source = ["os_location_source",
"os_location_dest",
"os_location_format",
"os_location_perform_format",
"os_location_size"]
def init(self):
self.help = _("DISK for installation, will be mounted to DIR. "
"DIR set to 'none' will cancel the mount point "
"transfer. For creating bind mount point you have "
"to specify the source directory as DISK")
self.label = _("Locations")
class VariableOsLocationSource(LocationVariable,Variable):
"""
Source disk or directory
"""
type = "choiceedit-list"
def init(self):
self.label = _("Disk or directory")
def availDevs(self):
"""
Available devices
"""
if self.Get('cl_install_type') == 'flash':
flashDrives = self.Select('os_device_dev',
where='os_device_type',
eq="flash")
return self.Select('os_disk_dev',
where='os_disk_parent',
_in=flashDrives)
else:
return self.Get('os_disk_dev')+self.Get('os_bind_path')
def get(self):
print "LocationSource"
if self.Get('cl_autopartition_scheme'):
print "Scheme"
print self.Get('cl_autopartition_disk_dev') + \
self.Get('cl_autopartition_bind_path')
return self.Get('cl_autopartition_disk_dev') + \
self.Get('cl_autopartition_bind_path')
else:
print "Avail"
return self.availDevs()
def choice(self):
return self.fixOsDiskDev(self.availDevs()) + [""]
def fixOsDiskDev(self,sourcelist=None):
"""
Fix os_disk_dev by autopartitions
"""
if not sourcelist:
sourcelist = self.Get('os_disk_dev')
scheme = self.Get('cl_autopartition_scheme')
if scheme:
excludeDisks = self.Select('os_disk_dev',
where='os_disk_parent',
eq=self.Get('cl_autopartition_device'))
appendDisks = self.Get('cl_autopartition_disk_dev')+ \
self.Get('cl_autopartition_bind_path')
else:
excludeDisks = []
appendDisks = []
return list((set(sourcelist)
-set(excludeDisks))
|set(appendDisks))
def check(self,value):
"""Check set location source"""
################################
# check of device specifing
################################
if not value:
raise VariableError(_("For installation need specify root device"))
###########################
# check wrong dev
###########################
disks = filter(lambda x:x.startswith('/dev/'),value)
wrongDevices = list(set(disks) - \
set(self.fixOsDiskDev()))
if wrongDevices:
raise VariableError(_("Incorrect device '%s'")%wrongDevices[0])
wrongSource = filter(lambda x:x and not x.startswith('/'),value)
if wrongSource:
raise VariableError(
_("Incorrect bind mount point '%s'")%wrongSource[0])
##########################
# detect duplicate devices
##########################
dupDevices = list(set(filter(lambda x:disks.count(x)>1,
disks)))
if dupDevices:
raise VariableError(_("Device '%s' is used twice")%dupDevices[0])
class VariableOsLocationDest(LocationVariable,Variable):
"""
Desination directory of install disk data
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mount point")
def get(self):
if self.Get('cl_autopartition_scheme'):
return self.Get('cl_autopartition_disk_mount') + \
self.Get('cl_autopartition_bind_mountpoint')
else:
source = self.Get('os_location_source')
installFrom = self.Get('os_install_dev_from')
singleDevice = self.Get('os_install_disk_single')
def installMountPoint(info):
dev,mount = info
if self.Get('cl_action') == 'system':
if self.Get('cl_install_type') == 'flash':
if dev == singleDevice: return "/"
else: return ""
else:
if dev == installFrom: return "/"
elif mount == "/": return ""
return mount
return map(installMountPoint,
filter(lambda x:x[0] in source,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_mount'))+\
zip(self.Get('os_bind_path'),
self.Get('os_bind_mountpoint'))))
def set(self,value):
"""Add abilitiy not specify root"""
if len(value) == 1 and value[0] == "":
return ["/"]
else:
return value
def choice(self):
if self.Get('cl_install_type') == 'flash': return ["/",""]
else: return ['/','/boot','/var/calculate','/home',
'/usr','/var','/tmp','swap','']
def check(self,value):
"""Check set location source"""
################################
# check of root device specifing
################################
if not filter(lambda x:x == "/",value):
raise VariableError(_("For installation need specify root device"))
################################
source = self.Get("os_location_source")
disks = filter(lambda x:x[0].startswith('/dev/') and x[1],
zip(source,value))
disksDevs = map(lambda x:x[0],disks)
binds = filter(lambda x:not x[0].startswith('/dev/') and x[1],
zip(source,value))
##########################
# detect duplicate mps
##########################
dupMP = list(set(filter(lambda x:value.count(x)>1,
filter(lambda x:x,
value))))
if dupMP:
raise VariableError(_("Mount point '%s' is used twice")%dupMP[0])
#########################
# detect wrong bind
#########################
wrongBind = filter(lambda x:not x[0].startswith("/") or
not x[1].startswith("/"),
binds)
if wrongBind:
raise VariableError(
_("Incorrect mount point (bind '%(bindSrc)s' to "\
"'%(bindDst)s')")\
%{'bindSrc':wrongBind[0][0],
'bindDst':wrongBind[0][1]})
#########################################
# Check '/' in start path of dest pointst
#########################################
wrongMP = filter(lambda x:x and not x.startswith("/") and x != "swap",
value)
if wrongMP:
raise VariableError(_("Incorrect mount point '%s'")%wrongMP[0])
#########################################
# Check using current root
#########################################
rootDev = self.Get('os_root_dev')
if rootDev in self.Get('os_install_disk_dev'):
raise VariableError(
_("The current root partition %s can not be "
"used for installation")%rootDev)
#################################
# detect using extended partition
#################################
extendedPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_part',
eq='extended',limit=1)
if extendedPartitions:
raise VariableError(
_("Unable to use the extended partition %s for installation")%
extendedPartitions)
##########################
# detect using CDROM disks
##########################
cdromPartitions = self.Select('os_install_disk_dev',
where='os_install_disk_type',
like='cdrom',limit=1)
if cdromPartitions:
raise VariableError(_("Unable to use CDROM %s for installation")%
cdromPartitions)
###############################
# check cross bind mount points
###############################
DEVICE,MP = 0,1
srcMountPoints = map(lambda x:x[DEVICE],binds)
destMountPoints = map(lambda x:x[MP],binds)
wrongBind = filter(lambda x:x in destMountPoints,srcMountPoints)
if wrongBind:
incompBind = filter(lambda x:x[1]==wrongBind[0],
zip(srcMountPoints,destMountPoints))
raise VariableError(
_("Source directory %(src)s already used "
"for binding '%(bindSrc)s' to '%(bindDst)s'")\
%{'src':wrongBind[0],
'bindSrc':incompBind[0][0],
'bindDst':incompBind[0][1]})
#######################################
# check multipart for flash and builder
#######################################
osInstallRootType = self.Get('os_install_root_type')
if osInstallRootType == "flash":
if filter(lambda x: x and x != '/', value):
raise VariableError(_("Installation to flash disk is not supported for "
"multipartition install"))
if filter(lambda x: x == "swap",value):
raise VariableError(_("Installation to flash disk is not "
"supported for swap disks"))
########################################
# check install on member of RAID or LVM
########################################
installTypes = zip(self.Get('os_install_disk_dev'),
self.Get('os_install_disk_type'))
for checkType in ("raid","lvm"):
memberData = filter(lambda x:checkType in x[1],
installTypes)
if memberData:
raise VariableError(
_("Unable to use active {typepart} member {part} "
"for installation").format(
typepart=checkType.upper(),
part=memberData[0][0]))
class VariableOsLocationFormat(LocationVariable,Variable):
type = "choice-list"
def init(self):
self.label = _("File system")
def get(self):
if self.Get('cl_autopartition_scheme'):
return self.Get('cl_autopartition_disk_format') + \
map(lambda x:"",self.Get('cl_autopartition_bind_path'))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
value = [""]*len(source)
return map(self.defaultFormat(),
zip(source,mount,value))
def choice(self):
if self.Get('cl_install_type') == "flash":
return ["","vfat"]
else:
return [""]+self.Get('os_format_type')
def defaultFormat(self):
"""Describe default value for filesystem"""
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
osInstallRootType = self.Get('os_install_root_type')
availFS = set(self.Select('os_format_type',
where='os_format_use',
eq='on'))
def wrap(info):
dev,mount,fs = info
if mount and not fs and dev.startswith('/dev/'):
if mount == "swap":
return "swap"
if dev in diskFormat and diskFormat[dev] in availFS:
if mount.count('/') == 1:
if FileSystemManager.checkFSForType(diskFormat[dev],
osInstallRootType):
return diskFormat[dev]
else:
return diskFormat[dev]
return FileSystemManager.defaultFS.get(osInstallRootType,
"ext4")
return fs
return wrap
def set(self,value):
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
return map(self.defaultFormat(),
zip(source,mount,value))
def check(self,value):
osInstallRootType = self.Get('os_install_root_type')
devMpFs = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),value)
for dev,mp,fs in devMpFs:
# check compatible fs for mount point only root dirs
if dev.startswith('/dev/') and mp and (mp.count('/') == 1 or
mp == '/var/calculate'):
if not FileSystemManager.checkFSForType(fs,
osInstallRootType):
raise VariableError(
_("File system for '%(mp)s' should not be '%(opt)s'")
%{'mp':mp, 'opt':fs}+" "+
_("for {typedisk} installation").format(
typedisk=osInstallRootType))
if mp == "swap" and fs != "swap":
raise VariableError(
_("Swap partition {dev} must be formatted as swap").format(
dev=dev))
class VariableOsLocationPerformFormat(LocationVariable,Variable):
type = "bool-list"
def init(self):
self.label = _("Format")
def get(self):
if self.Get('cl_autopartition_scheme'):
return map(lambda x:"on",
self.Get('cl_autopartition_disk_format')) + \
map(lambda x:"",
self.Get('cl_autopartition_bind_path'))
else:
mount = self.Get("os_location_dest")
source = self.Get("os_location_source")
fs = self.Get("os_location_format")
value = [""]*len(source)
return map(self.defaultPerformFormat(),
zip(source,mount,fs,value))
def check(self,value):
"""Check perform format
Check what format will perform for need partition.
At example on change file system on partition.
"""
DEV,MP,FS,FORMAT = 0,1,2,3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
diskMount = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_mount'))))
unavailFS = set(self.Select('os_format_type',
where='os_format_use',
eq="off"))
for dev,mp,fs,isformat in info:
if mp and diskFormat.get(dev,fs) != fs and diskMount.get(dev,''):
raise VariableError(
_("Disk {device} should be formatted, but can "
"not be formatted because mounted to "
"{mountpoint} in the current system").format(
device=dev,mountpoint=diskMount.get(dev,'')))
# has fs changed
if mp and diskFormat.get(dev,fs) != fs and not self.isTrue(isformat):
raise VariableError(
_("Disk {device} should be formatted").format(device=dev))
if self.isTrue(isformat):
if not mp:
raise VariableError(
_("No need to format the unused {dev} disk").format(
dev=dev))
if fs in unavailFS:
raise VariableError(
_("File system '%s' is not available")%fs)
if not dev.startswith('/dev/'):
raise VariableError(
_("Bind mount points should not be formatted"))
def defaultPerformFormat(self):
diskFormat = dict(zip(self.Get('os_disk_dev'),
(self.Get('os_disk_format'))))
def wrap(info):
source,dest,fs,isformat = info
if not isformat and source.startswith('/dev/'):
if dest == '/':
return "on"
if dest and diskFormat.get(source,fs) != fs:
return "on"
return isformat
return wrap
def set(self,value):
"""Default values for perform format"""
DEV,MP,FS,FORMAT = 0,1,2,3
info = zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'),
value)
return map(self.defaultPerformFormat(),
info)
class VariableOsLocationSize(LocationVariable,ReadonlyVariable):
"""
Location size
"""
type = "list"
def init(self):
self.label = _("Size")
def get(self):
mapDevSize = dict(self.ZipVars('os_disk_dev','os_disk_size'))
mapDevSize.update(
zip(self.Get('cl_autopartition_disk_dev'),
self.Get('cl_autopartition_disk_size',humanreadable=True)))
return map(lambda x:mapDevSize.get(x,''),
self.Get('os_location_source'))
class VariableClUuidSet(Variable):
"""
Use or not UUID for /etc/fstab
"""
type = "bool"
opt = ["--uuid"]
value = "on"
def init(self):
self.label = _("Use UUID")
self.help = _("use UUID")
#############################################################
# Install disk parameters
#############################################################
class VariableOsInstallDiskData(ReadonlyVariable):
"""
Table of install disk params
"""
type = "table"
source = ["os_install_disk_dev",
"os_install_disk_mount",
"os_install_disk_format",
"os_install_disk_perform_format",
"os_install_disk_options",
"os_install_disk_id",
"os_install_disk_uuid",
"os_install_disk_use",
"os_install_disk_name",
"os_install_disk_size",
"os_install_disk_type",
"os_install_disk_part",
"os_install_disk_parent"]
class VariableOsInstallDiskDev(ReadonlyVariable):
"""
Disks for installation
"""
type = "list"
def get(self):
return map(lambda x:x[0],
filter(lambda x:x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'))))
class VariableOsInstallDiskUuid(ReadonlyVariable):
"""
Uudi for install
"""
type = "list"
def get(self):
diskDev = self.Get('os_install_disk_dev')
hashUUID = getUUIDDict(revers=True)
return map(lambda x:hashUUID.get(x,"")[5:],diskDev)
class VariableOsInstallDiskMount(ReadonlyVariable):
"""
List mounted points for installed system
"""
type = "list"
def get(self):
"""Get install disk dest"""
return map(lambda x:x[1],
filter(lambda x:x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'))))
class VariableOsInstallDiskUse(ReadonlyVariable):
"""
/dev/sd or UUID= list (by cl_uuid_set)
"""
type = "list"
def get(self):
"""Get real id (by cl_uuid_set) device"""
if self.Get('cl_uuid_set') == "on":
return map(lambda x:"UUID=%s"%x[0] if x[0] else x[1],
zip(self.Get('os_install_disk_uuid'),
self.Get('os_install_disk_dev')))
else:
return self.Get('os_install_disk_dev')
class VariableOsInstallDiskOptions(ReadonlyVariable):
"""
List mount options of installed os
"""
type = "list"
def get(self):
"""TODO: get real options"""
diskOpts = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_options')))
return map(lambda x:diskOpts.get(x,''),
self.Get('os_install_disk_dev'))
class VariableOsInstallDiskFormat(ReadonlyVariable):
"""
Install list filesystem for partition devices
"""
type = "choice-list"
def get(self):
return map(lambda x:x[2],
filter(lambda x:x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_format'))))
class VariableOsInstallDiskPerformFormat(ReadonlyVariable):
"""
List need for format
"""
type = "bool-list"
def get(self):
return map(lambda x:x[2],
filter(lambda x:x[0].startswith('/dev/') and x[1],
zip(self.Get('os_location_source'),
self.Get('os_location_dest'),
self.Get('os_location_perform_format'))))
class VariableOsInstallDiskId(ReadonlyVariable):
"""
Install partition's system id
"""
type = "list"
def get(self):
diskId = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_id')))
diskId.update(zip(self.Get('cl_autopartition_disk_dev'),
[""]*len(self.Get('cl_autopartition_disk_dev'))))
def getIdByFS(fs,parttable,oldid):
if parttable == "gpt":
return PartitionDistributive.formatIdGpt.get(fs,oldid)
elif parttable in ("primary","extended","logical"):
return PartitionDistributive.formatId.get(fs,oldid)
return oldid
return map(lambda x:getIdByFS(x[0],
x[1], diskId.get(x[2],'')) if x[3] else '',
self.ZipVars('os_install_disk_format',
'os_install_disk_part',
'os_install_disk_dev',
'os_install_disk_parent'))
class VariableOsInstallDiskName(Variable):
"""
New labels for disk
"""
type = "list"
def get(self):
diskLabel = dict(self.ZipVars('os_disk_dev','os_disk_name'))
def changeLabel(info):
dev,mount = info
if mount == '/':
return "%s-%s"%(self.Get('os_install_linux_shortname'),
self.Get('os_install_linux_ver'))
else:
return diskLabel.get(dev,'')
return map(changeLabel,
self.ZipVars('os_install_disk_dev',
'os_install_disk_mount'))
class VariableOsInstallDiskSize(ReadonlyVariable):
"""
New partition sizes (for feature change partition)
"""
type = "list"
def get(self):
diskSize = dict(self.ZipVars('os_location_source','os_location_size'))
return map(lambda x:diskSize.get(x,''),
self.Get('os_install_disk_dev'))
class VariableOsInstallDiskType(ReadonlyVariable):
"""
New partition scheme (for feature change partition)
"""
type = "list"
def get(self):
diskType = dict(self.ZipVars('os_disk_dev','os_disk_type'))
diskType.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_type'))
return map(lambda x:diskType.get(x,''),
self.Get('os_install_disk_dev'))
class VariableOsInstallDiskParent(ReadonlyVariable):
"""
Partition parent devices using for install
"""
type = "list"
def get(self):
diskParent = dict(self.ZipVars('os_disk_dev','os_disk_parent'))
# replace value for autopartition
diskParent.update(zip(self.Get('cl_autopartition_disk_dev'),
[self.Get('cl_autopartition_device')]*
len(self.Get('cl_autopartition_disk_dev'))))
return map(lambda x:diskParent.get(x,''),
self.Get('os_install_disk_dev'))
class VariableOsInstallDiskPart(ReadonlyVariable):
"""
Get new type partitions using for install
"""
type = "list"
def get(self):
diskPart = dict(self.ZipVars('os_disk_dev','os_disk_part'))
diskPart.update(self.ZipVars('cl_autopartition_disk_dev',
'cl_autopartition_disk_part'))
return map(lambda x:diskPart.get(x,''),
self.Get('os_install_disk_dev'))
class VariableOsInstallBindData(ReadonlyVariable):
"""
Table of install bind mount points
"""
type = "table"
source = ['os_install_bind_path',
'os_install_bind_mountpoint']
class VariableOsInstallBindPath(ReadonlyVariable):
"""
Install directories for bind
"""
type = "list"
def get(self):
"""Get install bind source"""
return self.Select('os_location_source',
where='os_location_dest',
func=lambda x:not x[1].startswith('/dev/') and x[0])
class VariableOsInstallBindMountpoint(ReadonlyVariable):
"""
Mountpoint for install directories bind
"""
def get(self):
return self.Select('os_location_dest',
where='os_location_source',
func=lambda x:not x[0].startswith('/dev/') and x[1])
class VariableOsInstallMbr(LocationVariable,Variable):
"""
Disks for boot mbr
"""
type = "choice-list"
opt = ["--mbr"]
metavalue = "MBR"
def init(self):
self.label = _("Disk for install")
def get(self):
"""Get default Master boot record install
TODO: add get list of devices if install on RAID
"""
if self.Get('os_install_root_type') in ("flash","usb-hdd"):
rootdev = self.Get('os_install_root_dev')
device = filter(lambda x:x in rootdev,
self.Get('os_device_dev'))
if device:
return [device[0]]
else:
return []
# if loaded system livecd
if self.Get('os_root_type') == "livecd":
# search /boot device or / device, by priority /boot,/
bootDev=self.Select('os_install_disk_parent',
where='os_install_disk_mount',
_in=('/','/boot'),
sort="DESC",limit=1)
if bootDev:
return bootDev
if self.Get('os_device_dev'):
return [self.Get('os_device_dev')[0]]
return []
def choice(self):
return self.Get('os_device_dev')
def set(self,value):
# support off value
return filter(lambda x:x != "off",value)
def check(self,value):
rootType = self.Get('os_install_root_type')
if rootType == "flash":
if len(value) > 1:
raise VariableError(
_("For flash installation need only one disk"))
for mbrDisk in value:
tableOnBootDisk = self.Select('os_device_table',
where="os_device_dev",eq=mbrDisk,limit=1)
if not tableOnBootDisk:
raise VariableError(
_("Disk '%s' without partition table "
"contains no boot record")%mbrDisk)
if rootType == "flash":
if tableOnBootDisk == "gpt":
raise VariableError(
_("For flash installation need disk with dos table"))
if value:
if self.Get('os_grub2_path'):
self.checkForGrub2()
else:
self.checkForLegacyGrub()
def checkForGrub2(self):
"""Check current disk configuration for installation for install
GRUB2"""
grubDiskType=self.Select('os_install_disk_parent',
where='os_install_disk_mount',
_in=('/','/boot'),
sort="DESC",limit=1)
if "lvm-raid" in grubDiskType:
raise VariableError(
_("Grub does not support booting from a RAID assembled from LVM.")
+ " " +
_("Try to use a separate /boot partition"))
if grubDiskType.count("raid")>1:
raise VariableError(
_("Grub does not support booting from a RAID assembled "
"from another RAID.")
+ " " +
_("Try to use a separate /boot partition"))
def checkForLegacyGrub(self):
"""Check current disk configuration for installation for install
legacy grub"""
bootDiskType,bootDiskFormat = \
self.Select(['os_install_disk_type',
'os_install_disk_format'],
where='os_install_disk_mount',
_in=('/','/boot'),
sort="DESC",limit=1)
if "lvm" in bootDiskType or "raid" in bootDiskType:
raise ValueError(
_("Legacy grub does not support boot from raid or lvm "
"without separate /boot partition"))
if bootDiskFormat in ("btrfs","nilfs2"):
raise ValueError(
_("Legacy grub does not support booting from %s without "
"separate /boot partition")%bootDiskFormat)
class VariableOsInstallRootType(LocationVariable,Variable):
"""
Type of installation
"""
opt = ["--type"]
metavalue = "DISKTYPE"
type = "choice"
def init(self):
self.help = _("device type for the system bound for install")
self.label = _("Installation type")
def get(self):
rootdev = self.Get('os_install_root_dev')
devicetype = getPartitionDevice(
getUdevDeviceInfo(name=rootdev).get('DEVPATH',''))
return self.Select('os_device_type',where='os_device_dev',
eq=devicetype,limit=1) \
or self.Get('os_root_type')
def choice(self):
return [("hdd",_("Hard disk")),
("flash",_("USB Flash")),
("usb-hdd",_("USB Hard Disk"))]
class VariableOsInstallRootDev(ReadonlyVariable):
def get(self):
"""Get install root device"""
return self.Select('os_install_disk_dev',
where='os_install_disk_mount',
eq="/",limit=1) or ''
class VariableOsInstallFstabMountConf(ReadonlyVariable,DeviceHelper):
"""
FStab.conf contains for mount and bind points
"""
def _commentFstab(self,s,mp,dev):
"""Generate comment for /etc/fstab each line"""
if s.startswith("UUID"):
return "# %s was on %s during installation\n%s" % (mp,dev,s)
else:
return s
def get(self):
devicesForFstab = sorted(
self.Select(['os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_format',
'os_install_disk_options',
'os_disk_dev'],
where='os_install_disk_mount',
func=lambda x:x[0] != "" and x[0] != "swap"),
lambda x,y: cmp(self.separateDevice(x[1]),
self.separateDevice(y[1])))
if self.Get('os_install_scratch') == "on":
devicesForFstab = filter(lambda x:x[1] != "/", devicesForFstab)
# rootLine one string, but it correct work if devicesForFstab is empty
rootLine = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 2" %
(self._commentFstab(x[0],x[1],x[4]),x[1],x[2],x[3]),
devicesForFstab[:1]))
otherLines = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 0" %
(self._commentFstab(x[0],x[1],x[4]),x[1],x[2],x[3]),
devicesForFstab[1:]))
bindData = self.ZipVars('os_install_bind_path',
'os_install_bind_mountpoint')
bindLines = "\n".join(map(lambda x: "%s\t%s\tnone\tbind\t0 0"\
%(x[0],x[1]), bindData))
return "\n".join(filter(lambda x: x, [rootLine,otherLines,bindLines]))
class VariableOsInstallFstabSwapConf(VariableOsInstallFstabMountConf):
"""
FStab.conf contains swap partition
"""
def get(self):
return "\n".join(map(lambda x: "%s\tnone\tswap\tsw\t0 0"%\
self._commentFstab(x[0],"swap",x[2]),
self.Select(['os_install_disk_use',
'os_install_disk_mount',
'os_install_disk_dev'],
where='os_install_disk_mount',
eq='swap')))
class VariableClInstallType(Variable):
"""
Installation type (extension variable describe, that
install must be to flash
"""
type = "choice"
value = ""
def choice(self):
return ["","flash"]
def check(self,value):
if value == "flash" and \
len(self.Choice('os_install_disk_single')) == 1:
raise VariableError(_("Flash drive not found"))
class VariableOsInstallDiskSingle(Variable):
"""
Installation disk
"""
type = "choice"
def init(self):
self.label = _("Installation disk")
def availDevs(self):
"""
Available devices
"""
if self.Get('cl_install_type') == 'flash':
flashDrives = self.Select('os_device_dev',
where='os_device_type',
eq="flash")
return self.Select('os_disk_dev',
where='os_disk_parent',
_in=flashDrives)
else:
return self.Get('os_disk_dev')+self.Get('os_bind_path')
def get(self):
if self.Get('cl_install_type') == 'flash':
disks = self.availDevs()
if disks:
return disks[0]
return ""
def choice(self):
diskParentMap = dict(zip(self.Get('os_disk_dev'),
self.Get('os_disk_parent')))
deviceParentMap = dict(self.ZipVars('os_device_dev','os_device_name'))
return map(lambda x:(x,"%s (%s)"%(x,
deviceParentMap.get(diskParentMap.get(x,x),_("Unknown")))),
self.availDevs())+[("","")]