# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import re import pty import fcntl from subprocess import Popen from os import path from itertools import * from calculate.install.distr import (FlashDistributive, DistributiveError, IsoDistributive) from calculate.lib.datavars import (TableVariable, Variable, VariableError, ReadonlyVariable, ReadonlyTableVariable, SourceReadonlyVariable, VariableInterface, HumanReadable) import calculate.lib.utils.device as device from calculate.lib.utils.device import (getPartitionSize, humanreadableSize, getUUIDDict) from calculate.install.variables.autopartition import Sizes from calculate.lib.utils.files import getProgPath from calculate.lib.utils.mount import isMount, FStab, DiskSpace, Btrfs, \ BtrfsError, try_umount from calculate.install.fs_manager import FileSystemManager from calculate.lib.cl_lang import setLocalTranslate, _ from calculate.lib.variables.system import RootType setLocalTranslate('cl_install3', sys.modules[__name__]) class DeviceHelper(VariableInterface): rePassDevice = re.compile("^/block/(?!%s)" % "|".join(['sr', 'fd', 'ram', 'loop'])) def getBlockDevices(self): """Get interest devices from sys block path""" return filter(self.rePassDevice.search, device.udev.get_block_devices()) def separateDevice(self, dev): """ Separate device word and number on tuple Using for sort. (Example: sda2 ("sda",2), md5p1 ("md",5,"p",1) """ return map(lambda x: int(x) if x.isdigit() else x, re.findall('\d+|\D+', dev)) def mapUdevProperty(self, var, prop, default): """Get each element from var through udev [prop]""" return [device.udev.get_device_info(name=x).get(prop, default) for x in self.Get(var)] def getPerfectName(self, dev, defaultValue=None): """ Get dev name or human-readable lvm name """ info = device.udev.get_device_info(name=dev) if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info: lvmDeviceName = '/dev/{vg}/{lv}'.format(vg=info['DM_VG_NAME'], lv=info['DM_LV_NAME']) if path.exists(lvmDeviceName): return lvmDeviceName if defaultValue is None: return info.get('DEVNAME', '') else: return defaultValue def getLvmName(self, dev): """ Get lvm name """ return self.getPerfectName(dev, defaultValue="") ####################################################### # Devices variables ####################################################### class VariableOsDeviceData(ReadonlyTableVariable): """ Information about disk devices """ type = 'table' source = ['os_device_dev', 'os_device_table', 'os_device_type', 'os_device_parent', 'os_device_ssd_set', 'os_device_virtual_set', 'os_device_map', 'os_device_syspath', 'os_device_name', 'os_device_size', 'os_device_mbr', 'os_device_efi', 'os_device_fulltype'] class VariableOsDeviceInvalidator(ReadonlyVariable): """ Переменная используемая для автоматического сброса значений переменных если во время работы программы произошли изменения среди блочных устройств """ master = None def get(self): """Get device /dev name""" if self.master is None and not self.Get('cl_ebuild_phase'): try: self.master, slave = pty.openpty() except OSError: raise VariableError('Failed to create PTY') udevAdm = getProgPath('/sbin/udevadm') self.monitor = Popen([udevAdm, "monitor", "--kernel", "--subsystem-match=block"], stdout=slave, close_fds=True) os.close(slave) fl = fcntl.fcntl(self.master, fcntl.F_GETFL) fcntl.fcntl(self.master, fcntl.F_SETFL, fl | os.O_NONBLOCK) return "Device invalidator" def close(self): try: if self.monitor: self.monitor.kill() self.monitor.wait() except Exception: pass def refresh(self): try: if self.monitor: res = os.read(self.master, 65535) if res: while len(res) == 65535: res = os.read(self.master, 65535) self.parent.Invalidate(self.name) self.parent.Invalidate('os_install_disk_uuid') self.parent.Invalidate('os_install_disk_partuuid') self.parent.Invalidate('os_disk_dev') device.udev.clear_cache() except OSError as e: pass class VariableOsDeviceDev(DeviceHelper, ReadonlyVariable): """ Disk devices """ type = "list" re_disk_raid = re.compile("^disk-.*-raid\d+$", re.I) def init(self): pass def get(self): """Get device /dev name""" self.Get('os_device_invalidator') # get devices from block sys directories(discard mem,sr,loop and other) devices = (x for x in self.getBlockDevices() if x.count('/') == 2) devnames = device.udev.syspath_to_devname( x for x in devices if device.udev.is_device(device.udev.get_device_info(x)) or self.re_disk_raid.match(device.udev.get_device_type(path=x))) return list(sorted((x for x in devnames), key=self.separateDevice)) class VariableOsDeviceFulltype(ReadonlyVariable): """ Полный тип """ type = "list" def get(self): """Get device /dev name""" return [ device.udev.get_device_type(x) for x in self.Get('os_device_syspath') ] class VariableOsDeviceType(ReadonlyVariable): """ Device type (hdd,cdrom,usb-flash) """ type = "list" def getType(self, dev): info = device.udev.get_device_info(name=dev) if device.udev.is_raid(info): return info["MD_LEVEL"] device_name = path.basename(dev) if device_name in self.usbdevices: if device.sysfs.read(device.sysfs.Path.Block, device_name, "removable").strip() == "1": return "flash" else: return "usb-hdd" else: return "hdd" def get(self): # get usb device by '/dev/disk/by-id'(usb devices contain 'usb' in name) diskIdPath = '/dev/disk/by-id' if device.devfs.exists(diskIdPath): self.usbdevices = \ map(lambda x: \ device.devfs.realpath(diskIdPath, x).rpartition('/')[2], filter(lambda x: x.startswith('usb-'), device.devfs.listdir(diskIdPath, fullpath=False))) else: self.usbdevices = [] return map(self.getType, self.Get('os_device_dev')) class VariableOsDeviceParent(ReadonlyVariable): """ Базовые устройства RAID массива """ type = "list" def get(self): """Get disk parent""" return [",".join(device.udev.get_all_base_devices(name=dev)) if "raid" in _type else "" for dev, _type in self.ZipVars( 'os_device_dev', 'os_device_type')] class MbrEfiHelper(VariableInterface): boottype = "" def get_boot_partition(self, sysdevice): basename = path.basename(sysdevice) for devname in device.udev.syspath_to_devname( device.sysfs.glob(sysdevice, "%s*" % basename), dropempty=True): partid = self.select('os_disk_id', os_disk_dev=devname, limit=1) if partid.upper() == self.boottype: return devname return "" class VariableOsDeviceMbr(MbrEfiHelper, ReadonlyVariable): """ Разделы на устройстве, которые могут быть bios_boot """ type = "list" boottype = "EF02" def get(self): def generator(): for dev, sysdevice, _type, table in self.ZipVars( 'os_device_dev', 'os_device_syspath', 'os_device_type', 'os_device_table'): if "raid" in _type: yield "" elif table == "dos": yield dev else: if self.get_boot_partition(sysdevice): yield dev else: yield "" return list(generator()) class VariableOsDeviceEfi(MbrEfiHelper, ReadonlyVariable): """ Разделы на устройстве, которые могут быть EFI """ boottype = "EF00" type = "list" def get(self): def generator(): for dev, sysdevice, _type, table in self.ZipVars( 'os_device_dev', 'os_device_syspath', 'os_device_type', 'os_device_table'): if "raid" in _type or table == "dos": yield "" else: yield self.get_boot_partition(sysdevice) or "" return list(generator()) class VariableOsDeviceMap(ReadonlyVariable): """ Map number for grub Using for legecy grub (DEPRECATATED) """ type = "list" def get(self): return map(lambda x: str(x[0]), enumerate(self.Get('os_device_dev'))) class VariableOsDeviceArraySet(ReadonlyVariable): """ Диски массивы (при создании разделов на таких дисках перед номером раздела добавляется "p": nvme0n1p1 вместо nvme0n11 """ type = "list" devnames = ("nvme", "mmcblk") def get(self): """Get device partition table""" def isArray(device, name): if any(x in device for x in self.devnames): return "on" else: return "off" return map(lambda x: isArray(*x), zip(self.Get('os_device_dev'), self.Get('os_device_name'))) class VariableOsDeviceSsdSet(ReadonlyVariable): """ Ssd property """ type = "list" ssd_names = ("SSD", "OCZ", "PLEXTOR") udev_property = 'ID_ATA_ROTATION_RATE_RPM' # считаем, что nvme диски - SSD devnames = ("nvme",) def get(self): """Get device partition table""" def isSsd(dev, name): prop = device.udev.get_device_info(name=dev) rpm = prop.get(self.udev_property, None) if (any(x in dev for x in self.devnames) or rpm == "0" or any(x in name for x in self.ssd_names)): return "on" else: return "off" return map(lambda x: isSsd(*x), zip(self.Get('os_device_dev'), self.Get('os_device_name'))) class VariableOsDeviceSyspath(ReadonlyVariable): """ Table on device """ type = "list" udev_property = 'DEVPATH' def get(self): """Get device partition table""" def getSysPath(dev): prop = device.udev.get_device_info(name=dev) syspath = prop.get(self.udev_property, "") return syspath return [getSysPath(x) for x in self.Get('os_device_dev')] class VariableOsDeviceVirtualSet(ReadonlyVariable): """ Table on device """ type = "list" virtual_names = ("VBOX", "VMWare", "QEMU") virtual_syspath = ("virtio",) def get(self): """Get device partition table""" def isVirtual(device, name, syspath): if any(x in name for x in self.virtual_names): return "on" elif any(x in syspath for x in self.virtual_syspath): return "on" else: return "off" return map(lambda x: isVirtual(*x), zip(self.Get('os_device_dev'), self.Get('os_device_name'), self.Get('os_device_syspath'))) class VariableOsDeviceTable(ReadonlyVariable): """ Table on device """ type = "list" def getTableByChild(self, dev): """Get table by child partitions""" syspath = device.udev.get_syspath(name=dev) shortname = path.basename(dev) for child in device.sysfs.glob(syspath, "%s*" % shortname): udevinfo = device.udev.get_device_info(path=child) map_names = {'mbr': 'dos', 'msdos': 'dos'} table = (udevinfo.get('ID_PART_ENTRY_SCHEME', '') or udevinfo.get('UDISKS_PARTITION_SCHEME', '')) return map_names.get(table, table) return "" def get(self): """Get device partition table""" autopartition = self.Get('cl_autopartition_set') == 'on' autoDevice = self.Get('cl_autopartition_device') def getTable(dev): prop = device.udev.get_device_info(name=dev) return prop.get('ID_PART_TABLE_TYPE', self.getTableByChild(dev)) def getByAutopartition(dev): if autopartition and autoDevice == dev: return self.Get('cl_autopartition_table') else: return getTable(dev) return map(getByAutopartition, self.Get('os_device_dev')) class VariableOsDeviceName(ReadonlyVariable): """ Name of device """ type = "list" nameless_devices = { 'nvme': 'NVME', 'mmcblk': 'Multimedia Card' } def getName(self, dev): devicepath = device.udev.get_syspath(name=dev) if devicepath: vendor = device.sysfs.read(devicepath, "device/vendor").strip() model = device.sysfs.read(devicepath, "device/model").strip() if vendor or model: return ("%s %s" % (vendor, model)).strip() else: for k, v in self.nameless_devices.items(): if k in devicepath: return v return "" else: return "" def get(self): return map(self.getName, self.Get('os_device_dev')) class VariableOsDeviceSize(ReadonlyVariable): """ Name of device """ type = "list" def get(self): """Get device size""" return map(lambda x: getPartitionSize(name=x, inBytes=True), self.Get('os_device_dev')) def humanReadable(self): return map(humanreadableSize, self.Get()) ############################################# # Disk variables ############################################# class VariableOsDiskData(ReadonlyTableVariable): """ Information about current system partition and mounts """ source = ['os_disk_dev', 'os_disk_uuid', 'os_disk_partuuid', 'os_disk_name', 'os_disk_size', 'os_disk_part', 'os_disk_format', 'os_disk_type', 'os_disk_raid', 'os_disk_lvm', 'os_disk_parent', 'os_disk_id', 'os_disk_grub'] class VariableOsDiskDev(DeviceHelper, ReadonlyVariable): """ List of available partition devices """ type = "list" def get(self): # получить блочные утсройства, в списке устройства с таблицей раздела # разделены '/' re_parent = re.compile("^/block/[^/]+") disks = self.getBlockDevices() parents = {re_parent.search(x).group() for x in disks if x.count("/") > 2} dev_names = device.udev.syspath_to_devname( (x for x in disks if x not in parents), dropempty=False) return list(sorted((x for x in dev_names), key=self.separateDevice)) def humanReadable(self): return map(self.getPerfectName, self.Get()) class VariableOsDiskMount(DeviceHelper, ReadonlyVariable): """ List mounted points for current operation system """ type = "list" def get(self): disk_hash = self.Get('os_disk_dev') fstab = FStab('/etc/fstab', devs=disk_hash) rootdev = self.Get('os_root_dev') return map(lambda x: '/' if x == rootdev else fstab.getBy(eq=x) or "", self.Get('os_disk_dev')) class VariableOsDiskContent(ReadonlyVariable): """ Partition content """ type = "list" def get(self): """ TODO: need to write """ return map(lambda x: "", self.Get('os_disk_dev')) class VariableOsDiskFormat(ReadonlyVariable): """ Filesystem on device partitions """ type = "list" def get(self): """Get current disk filesystem""" fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev')) def getFormat(dev): prop = device.udev.get_device_info(name=dev) fs = prop.get('FSTAB_TYPE') or \ fstab.getBy(what=fstab.TYPE, eq=dev) or \ prop.get('ID_FS_TYPE', '') if fs == "btrfs": if "compress" in fstab.getBy(what=fstab.OPTS, eq=dev): return "btrfs-compress" try: if Btrfs(dev).compression != "": return "btrfs-compress" except BtrfsError: pass return fs return map(getFormat, self.Get('os_disk_dev')) class VariableOsDiskType(ReadonlyVariable): """ List type (lvm,raid,partition,disk) """ type = "list" re_raid = re.compile("-raid\d+$") re_raid_partition = re.compile("-raid\d+-partition$") def get(self): """Get partition scheme""" types = map(lambda x: (x, device.udev.get_device_type(name=x)), self.Get('os_disk_dev')) lvmUsedDisks = {} raidUsedDisks = {} def forMember(typeInfo): diskName, diskType = typeInfo if diskName in raidUsedDisks: diskType = "%s-raidmember(%s)" % (diskType, raidUsedDisks[diskName]) if diskName in lvmUsedDisks: diskType = "%s-lvmmember(%s)" % (diskType, ",".join( lvmUsedDisks[diskName])) return diskName, diskType for dev, diskType in types: prop = device.udev.get_device_info(name=dev) if self.re_raid.search(diskType): raiddevice = prop.get('DEVPATH', '') elif self.re_raid_partition.search(diskType): raiddevice = path.dirname(prop.get('DEVPATH', '')) else: raiddevice = None if raiddevice: raiddev = device.udev.get_devname(raiddevice) for x in device.raid.devices(raiddevice): raidUsedDisks[x] = raiddev if diskType.endswith("lvm"): for x in device.lvm.used_partitions(prop.get('DM_VG_NAME', ''), prop.get('DM_LV_NAME', '')): if x in lvmUsedDisks: lvmUsedDisks[x].append(dev) else: lvmUsedDisks[x] = [dev] return map(lambda x: x[1], map(forMember, types)) class VariableOsDiskRaid(ReadonlyVariable): """ Raids which this partition constructed """ type = "list" def generateRaid(self): for disktype in self.Get('os_disk_type'): if "raidmember" in disktype: yield disktype.rpartition('(')[2][:-1] else: yield "" def get(self): return list(self.generateRaid()) class VariableOsDiskLvm(DeviceHelper, ReadonlyVariable): """ LVM vgname and lvname """ type = "list" def get(self): """Get each element from var through udev [prop]""" return map(self.getLvmName, self.Get('os_disk_dev')) class VariableOsDiskUuid(DeviceHelper, ReadonlyVariable): """ List uudi for partition devices """ type = "list" def get(self): return self.mapUdevProperty('os_disk_dev', 'ID_FS_UUID', '') class VariableOsDiskPartuuid(DeviceHelper, ReadonlyVariable): """ List uudi for partition devices """ type = "list" def get(self): return self.mapUdevProperty('os_disk_dev', 'ID_PART_ENTRY_UUID', '') class VariableOsDiskParent(ReadonlyVariable): """ List parent deivces for partition """ type = "list" def get(self): """Get disk parent""" return [",".join(device.udev.get_disk_devices(name=x)) for x in self.Get('os_disk_dev')] class VariableOsDiskId(DeviceHelper, ReadonlyVariable): """ Partition's system id """ type = "list" def get(self): """Get disk id""" mapTypeUUID = {'ebd0a0a2-b9e5-4433-87c0-68b6b72699c7': '0700', '0657fd6d-a4ab-43c4-84e5-0933c84b4f4f': '8200', 'a19d880f-05fc-4d3b-a006-743f0f84911e': 'FD00', '21686148-6449-6e6f-744e-656564454649': 'EF02', 'c12a7328-f81f-11d2-ba4b-00a0c93ec93b': 'EF00', '0fc63daf-8483-4772-8e79-3d69d8477de4': '8300'} return map(lambda x: mapTypeUUID.get(x, x), map(lambda x: x.rpartition("x")[2], self.mapUdevProperty('os_disk_dev', 'ID_PART_ENTRY_TYPE', ''))) class VariableOsDiskGrub(ReadonlyVariable): """ List grub id for partition devices """ type = "list" def get(self): """Get disk grub map""" devicesMap = dict(zip(self.Get('os_device_dev'), self.Get('os_device_map'))) def getGrubMap(devParent): dev, disktype, parent = devParent # grub id вычисляем только для разделов расположенных на диске # (исключаются lvm, raid и прочие абстракции) if disktype != "disk-partition": return "" prop = device.udev.get_device_info(name=dev) partnum = int(prop.get('ID_PART_ENTRY_NUMBER', 0)) if parent in devicesMap.keys() and partnum: return "%s,%d" % (devicesMap[parent], partnum - 1) else: return "" return map(getGrubMap, zip(self.Get('os_disk_dev'), self.Get('os_disk_type'), self.Get('os_disk_parent'))) class VariableOsDiskPart(ReadonlyVariable): """ Type of partition devices If msdos then(primary, extended or logical) If gpt then gpt """ type = "list" def get(self): def generator(): for disk_dev, disk_type in self.ZipVars( 'os_disk_dev', 'os_disk_type'): if disk_type.endswith("-partition"): yield device.udev.get_partition_type(name=disk_dev) else: yield "" return list(generator()) class VariableOsDiskSize(ReadonlyVariable): """ Partition size """ type = "list" def get(self): """Get disk size""" return map(lambda x: getPartitionSize(name=x, inBytes=True), self.Get('os_disk_dev')) def humanReadable(self): return map(humanreadableSize, self.Get()) class VariableOsDiskName(DeviceHelper, ReadonlyVariable): """ Label of partitions """ type = "list" def get(self): """Get disk label""" return self.mapUdevProperty('os_disk_dev', 'ID_FS_LABEL', '') class VariableOsDiskOptions(ReadonlyVariable): """ List mount options """ type = "list" def get(self): fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev')) def getFormat(dev): return fstab.getBy(what=fstab.OPTS, eq=dev) return map(getFormat, self.Get('os_disk_dev')) ################################################ # Bind mount points ################################################ class VariableOsBindData(ReadonlyTableVariable): """ Table of bind mount points """ source = ['os_bind_path', 'os_bind_mountpoint'] class VariableOsBindPath(ReadonlyVariable): """ List source bind path """ type = "list" def get(self): fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev')) return fstab.getBy(what=fstab.NAME, where=fstab.OPTS, contains="bind", allentry=True) class VariableOsBindMountpoint(ReadonlyVariable): """ Mountpoints for directories bind """ type = "list" def get(self): fstab = FStab('/etc/fstab', devs=self.Get('os_disk_dev')) return fstab.getBy(what=fstab.DIR, where=fstab.OPTS, contains="bind", allentry=True) ###################################################################### # Userselect partion parameters ###################################################################### class LocationHelper(VariableInterface): """ Location variable """ def uncompatible(self): """ Network setting up unavailable for flash installation """ if self.Get('cl_autopartition_set') == "on": return \ _("The layout is not available with autopartitioning") return "" class VariableOsLocationBriefData(LocationHelper, TableVariable): source = ["os_location_source", "os_location_dest", "os_location_format", "os_location_perform_format", "os_location_size"] orig_source = [('os_install_disk_dev', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_perform_format', 'os_install_disk_size'), ('os_install_bind_path', 'os_install_bind_mountpoint', '', '', '')] def init(self): self.label = _("Mount points") def get_autopartition(self, hr=HumanReadable.No): # при авторазметке получаем только информацию о # /boot/efi разделах if self.GetBool('cl_autopartition_uefi_set'): boot = [[dev, mp, fs, _format, size] for dev, mp, fs, _format, size in self.ZipVars( 'os_install_disk_dev', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_perform_format', 'os_install_disk_size', humanreadable=hr) if mp.startswith('/boot/efi')] else: boot = [] # исключаем из устройств авторазметки информацию о efi разделах # так как она не достоверная devices = [[dev, mp, fs, _format, size] for dev, mp, fs, _format, size in self.ZipVars( 'cl_autopartition_disk_dev_full', 'cl_autopartition_disk_mount_full', 'cl_autopartition_disk_format_full', 'cl_autopartition_disk_perform_format_full', 'cl_autopartition_disk_size_full', humanreadable=hr) if not mp.startswith('/boot/efi') ] binds = [[dev, mp, "", "", ""] for dev, mp in self.ZipVars( 'cl_autopartition_bind_path', 'cl_autopartition_bind_mountpoint', humanreadable=hr)] autodevs = self.Get('cl_autopartition_disk_dev_full') bootdevs = [x[0] for x in boot] def keysort(dev): if dev in autodevs: return autodevs.index(dev), -1 else: return -1, bootdevs.index(dev) return sorted(boot + devices, key=lambda x: keysort(x[0])) + binds or [[]] def get_manual(self, hr=HumanReadable.No): devs = self.Get('os_disk_dev') def keysort(dev): if dev in devs: return devs.index(dev), -1 else: return -1, dev devices = map(list, self.ZipVars( 'os_install_disk_dev', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_perform_format', 'os_install_disk_size', humanreadable=hr)) binds = [[dev, mp, "", "", ""] for dev, mp in self.ZipVars( 'os_install_bind_path', 'os_install_bind_mountpoint', humanreadable=hr)] return sorted(devices, key=lambda x: keysort(x[0])) + binds or [[]] def get(self, hr=HumanReadable.No): if self.GetBool('cl_autopartition_set'): return self.get_autopartition(hr) else: return self.get_manual(hr) class VariableOsLocationData(LocationHelper, TableVariable): """ Select installation disk variable """ opt = ["--disk", "-d"] metavalue = 'DISK[[:MP[:FS[:FORMAT]]]]' untrusted = True source = ["os_location_source", "os_location_dest", "os_location_format", "os_location_perform_format", "os_location_size"] check_after = ["os_install_root_type"] def init(self): self.help = (_("DISK bound for installation will be mounted to the " "MP directory. To create a bind mount point, you have " "to specify the source directory as DISK") + ". " + _("To change the filesystem, you have to specify it as FS. " "FORMAT is used for the specifying the need to format " "partition or not")) self.label = _("Mount points") def set(self, value): return sorted(value, key=lambda x: x and x[0]) class VariableOsLocationSource(LocationHelper, DeviceHelper, Variable): """ Source disk or directory """ type = "choiceedit-list" def init(self): self.label = _("Disk or directory") def availDevs(self, choice=False): """ Available devices """ if self.Get('cl_install_type') == 'flash': flashes = self.Select('os_device_dev', where='os_device_type', eq="flash") return [disk_dev for disk_dev, disk_parent, disk_type in self.ZipVars("os_disk_dev", "os_disk_parent", "os_disk_type") if disk_type == "disk-partition" and disk_parent in flashes] else: if choice: return self.Get('os_disk_dev') + self.Get('os_bind_path') else: dev_from = self.Get('cl_install_dev_from') return [ disk_dev for disk_dev, disk_mount in self.ZipVars( "os_disk_dev", "os_disk_mount") if ((disk_mount not in ("", "/") or disk_dev == dev_from) and not disk_mount.startswith("/boot/efi")) ] + self.Get('os_bind_path') def get(self): if self.Get('cl_autopartition_set') == "on": return ([device.udev.get_devname(name=x) for x in self.Get('cl_autopartition_disk_dev')] + self.Get('cl_autopartition_bind_path')) else: return self.availDevs() def set(self, value): def normpath(val): if type(val) == str and val: return path.normpath(val) return val return map(normpath, value) def choice(self): return map(lambda x: (x, self.getPerfectName(x) or x), self.fixOsDiskDev(self.availDevs(choice=True))) + [("", "")] def fixOsDiskDev(self, sourcelist=None): """ Fix os_disk_dev by autopartitions """ if not sourcelist: sourcelist = self.Get('os_disk_dev') scheme = self.Get('cl_autopartition_set') == "on" if scheme: autopartition_devices = self.Get('cl_autopartition_device') exclude = { disk_dev for disk_dev, disk_parent in self.ZipVars("os_disk_dev", "os_disk_parent") if any(x in autopartition_devices for x in disk_parent.split(',')) } appendDisks = (self.Get('cl_autopartition_disk_dev') + self.Get('cl_autopartition_bind_path')) return [x for x in sourcelist if x not in exclude] + appendDisks else: return sourcelist def check(self, value): """Check set location source""" ################################ # check of device specifing ################################ if not value: raise VariableError( _("To install the system, you need to specify the root device")) ########################### # check wrong dev ########################### disks = filter(lambda x: x.startswith('/dev/'), value) # get original /dev names cnDisks = (device.udev.get_device_info(name=x).get('DEVNAME', x) for x in disks) wrongDevices = list(set(cnDisks) - set(self.fixOsDiskDev())) if wrongDevices: raise VariableError(_("Wrong device '%s'") % wrongDevices[0]) wrongSource = filter(lambda x: x and not x.startswith('/'), value) if wrongSource: raise VariableError( _("Wrong bind mount point '%s'") % wrongSource[0]) ########################## # detect duplicate devices ########################## dupDevices = list(set(filter(lambda x: disks.count(x) > 1, disks))) if dupDevices: raise VariableError( _("Device '%s' is used more than once") % dupDevices[0]) class VariableClRootSizeMin(Variable): """ Минимальнй размер root раздела """ value_format = "{cl_autopartition_root_size_min}" class VariableOsLocationDest(LocationHelper, Variable): """ Desination directory of install disk data """ type = "choiceedit-list" def init(self): self.label = _("Mount point") def get(self): if self.Get('cl_autopartition_set') == "on": return self.Get('cl_autopartition_disk_mount') + \ self.Get('cl_autopartition_bind_mountpoint') else: source = self.Get('os_location_source') installFrom = self.Get('cl_install_dev_from') singleDevice = self.Get('os_install_disk_single') def installMountPoint(info): dev, mount = info if self.Get('cl_action') == 'system': if self.Get('cl_install_type') == 'flash': if dev == singleDevice: return "/" else: return "" else: if dev == installFrom: return "/" elif mount == "/": return "" return mount return map(installMountPoint, filter(lambda x: x[0] in source, zip(self.Get('os_disk_dev'), self.Get('os_disk_mount')) + \ zip(self.Get('os_bind_path'), self.Get('os_bind_mountpoint')))) def set(self, value): """Add abilitiy not specify root""" def normpath(val): if type(val) == str and val: return path.normpath(val) return val value = map(normpath, value) return map(lambda x: x or "/", value) def choice(self): if self.Get('cl_install_type') == 'flash': return ["/", ""] else: return ['/', '/boot', '/var/calculate', '/home', '/usr', '/var', '/tmp', 'swap', ''] def check(self, value): """Check set location source""" if self.Get('cl_autopartition_set') == "on": return ################################ # check size for root device ################################ minroot = int(self.Get('cl_root_size_min')) osInstallRootType = self.Get('os_install_root_type') if osInstallRootType != "flash" and \ not "/usr" in value: for mp, size in filter(lambda x: x[0] == '/' and x[1].isdigit() and \ int(x[1]) < minroot, izip(value, self.Get("os_location_size"))): raise VariableError( _("The root partition should be at least %s") % "7 Gb") source = self.Get("os_location_source") ################################ # check of root device specifing ################################ if not source: return if not filter(lambda x: x == "/", value): raise VariableError(_("To install the system, you need to " "specify the root device")) ################################ disks = filter(lambda x: x[0].startswith('/dev/') and x[1], zip(source, value)) disksDevs = map(lambda x: x[0], disks) binds = filter(lambda x: not x[0].startswith('/dev/') and x[1], zip(source, value)) ########################## # detect efi specifing ########################## reEfi = re.compile("/u?efi", re.I) if any(reEfi.search(x) for x in value): if self.Get('cl_client_type') == 'gui': raise VariableError( _("Please specify EFI partition by UEFI parameter in " "advanced options")) else: raise VariableError( _("Please specify EFI partition by UEFI option")) ########################## # detect duplicate mps ########################## dupMP = list(set(filter(lambda x: value.count(x) > 1, filter(lambda x: x and x != "swap", value)))) if dupMP: raise VariableError( _("Mount point '%s' is used more than once") % dupMP[0]) ######################### # detect wrong bind ######################### wrongBind = filter(lambda x: not x[0].startswith("/") or not x[1].startswith("/"), binds) if wrongBind: raise VariableError( _("Incorrect mount point (bind '%(bindSrc)s' to " "'%(bindDst)s')") \ % {'bindSrc': wrongBind[0][0], 'bindDst': wrongBind[0][1]}) ######################################### # Check '/' in start path of dest pointst ######################################### wrongMP = filter(lambda x: x and not x.startswith("/") and x != "swap", value) if wrongMP: raise VariableError(_("Wrong mount point '%s'") % wrongMP[0]) ######################################### # Check using current root ######################################### rootDev = self.Get('os_root_dev') if rootDev in self.Get('os_install_disk_dev'): raise VariableError( _("You may not use the current root partition %s for " "installation") % rootDev) ################################# # detect using extended partition ################################# extendedPartitions = self.Select('os_install_disk_dev', where='os_install_disk_part', eq='extended', limit=1) if extendedPartitions: raise VariableError( _("Unable to use extended partition %s for installation") % extendedPartitions) ########################## # detect using CDROM disks ########################## cdromPartitions = self.Select('os_install_disk_dev', where='os_install_disk_type', like='cdrom', limit=1) if cdromPartitions: raise VariableError(_("Unable to use CDROM %s for installation") % cdromPartitions) ############################### # check cross bind mount points ############################### DEVICE, MP = 0, 1 srcMountPoints = map(lambda x: x[DEVICE], binds) destMountPoints = map(lambda x: x[MP], binds) wrongBind = filter(lambda x: x in destMountPoints, srcMountPoints) if wrongBind: incompBind = filter(lambda x: x[1] == wrongBind[0], zip(srcMountPoints, destMountPoints)) raise VariableError( _("Source directory %(src)s is already used " "for binding '%(bindSrc)s' to '%(bindDst)s'") \ % {'src': wrongBind[0], 'bindSrc': incompBind[0][0], 'bindDst': incompBind[0][1]}) ####################################### # check multipart for flash and builder ####################################### osInstallRootType = self.Get('os_install_root_type') if osInstallRootType == "flash": if filter(lambda x: x and x != '/', value): raise VariableError( _("Flash install does not support multipartition mode")) if filter(lambda x: x == "swap", value): raise VariableError( _("Flash install does not support swap disks")) ######################################## # check install on member of RAID or LVM ######################################## installTypes = zip(self.Get('os_install_disk_dev'), self.Get('os_install_disk_type')) for checkType in ("raid", "lvm"): memberData = filter(lambda x: checkType + "member" in x[1], installTypes) if memberData: raise VariableError( _("Unable to use {part} partition used by active " "{typepart} for installation").format( typepart=checkType.upper(), part=memberData[0][0])) class VariableOsLocationFormat(LocationHelper, Variable): type = "choice-list" def init(self): self.label = _("Filesystem") def get(self): if self.Get('cl_autopartition_set') == "on": return self.Get('cl_autopartition_disk_format') + \ map(lambda x: "", self.Get('cl_autopartition_bind_path')) else: mount = self.Get("os_location_dest") source = self.Get("os_location_source") value = [""] * len(source) return map(self.defaultFormat(), zip(source, mount, value)) def choice(self): if self.Get('cl_install_type') == "flash": return ["", "vfat"] else: return [""] + self.Get('os_format_type') def defaultFormat(self): """Describe default value for filesystem""" diskFormat = dict(zip(self.Get('os_disk_dev'), (self.Get('os_disk_format')))) osInstallRootType = self.Get('os_install_root_type') availFS = set(self.Select('os_format_type', where='os_format_use', eq='yes')) allAvailFS = self.Get('os_format_type') default_format = None if self.Get('os_root_type_ext') in RootType.HDD: root_format = self.select( 'os_disk_format', os_disk_mount="/", limit=1) autoformat = self.Get('cl_autopartition_default_format') for _format in (root_format, autoformat): if _format and self.select( 'os_format_use', os_format_type=_format, limit=1) == "yes": default_format = _format break else: root_format = None def wrap(info): dev, mount, fs = info if mount and not fs and dev.startswith('/dev/'): if mount == "swap": return "swap" elif mount.startswith('/boot/efi'): return "vfat" if mount == "/": if root_format and root_format in allAvailFS: return root_format else: if dev in diskFormat and diskFormat[dev] in allAvailFS: if mount.count('/') == 1 or mount == "/var/calculate": if FileSystemManager.checkFSForTypeMount( diskFormat[dev], osInstallRootType, mount): return diskFormat[dev] else: return diskFormat[dev] if default_format: return default_format return FileSystemManager.get_default_fs(self, osInstallRootType) return fs return wrap def set(self, value): value = map(lambda x: "vfat" if x == "uefi" else x, value) mount = self.Get("os_location_dest") source = self.Get("os_location_source") return map(self.defaultFormat(), zip(source, mount, value)) def check(self, value): osInstallRootType = self.Get('os_install_root_type') devMpFs = zip(self.Get('os_location_source'), self.Get('os_location_dest'), value) for dev, mp, fs in devMpFs: if dev.startswith('/') and not dev.startswith('/dev/') and fs: raise VariableError( _("The bind mount point does not use filesystem")) # check compatible fs for mount point only root dirs if dev.startswith('/dev/') and mp and (mp.count('/') == 1 or mp in ( '/var/calculate', '/boot/efi')): if not FileSystemManager.checkFSForTypeMount(fs, osInstallRootType, mp): raise VariableError( _("The filesystem for '%(mp)s' should not be '%(opt)s'") % {'mp': mp, 'opt': fs} + " " + _("for {typedisk} install").format( typedisk=osInstallRootType)) if mp == "swap" and fs != "swap": raise VariableError( _( "The swap partition {dev} must be formatted as swap").format( dev=dev)) class VariableOsLocationPerformFormat(LocationHelper, Variable): type = "boolauto-list" def init(self): self.label = _("Format") def get(self): if self.Get('cl_autopartition_set') == "on": return map(lambda x: "on", self.Get('cl_autopartition_disk_format')) + \ map(lambda x: "", self.Get('cl_autopartition_bind_path')) else: mount = self.Get("os_location_dest") source = self.Get("os_location_source") fs = self.Get("os_location_format") value = [""] * len(source) return map(self.defaultPerformFormat(), zip(source, mount, fs, value)) fixNtfs = lambda self, x: {'ntfs-3g': 'ntfs'}.get(x, x) def is_force_param(self): return "--force" in self.Get("cl_console_args") def check(self, value): """Check perform format Check what format will perform for need partition. At example on change filesystem on partition. """ DEV, MP, FS, FORMAT = 0, 1, 2, 3 info = zip(self.Get('os_location_source'), self.Get('os_location_dest'), self.Get('os_location_format'), value) diskFormat = dict(zip(self.Get('os_disk_dev'), (self.Get('os_disk_format')))) diskMount = dict(zip(self.Get('os_disk_dev'), (self.Get('os_disk_mount')))) unavailFS = set(self.Select('os_format_type', where='os_format_use', eq="no")) fixNtfs = self.fixNtfs for dev, mp, fs, isformat in info: # should format if change fs or partition is root, but non flash partitionMustFormat = \ fixNtfs(diskFormat.get(dev, fs)) != fixNtfs(fs) or \ (mp == '/' and self.Get('os_install_root_type') != 'flash') # if entry has mount point AND # partition must was formated if mp and partitionMustFormat: # partition use in current system if diskMount.get(dev, ''): raise VariableError( _("{device} must but cannot be formatted, as it is " "mounted to {mountpoint} on the current system").format( device=dev, mountpoint=diskMount.get(dev, ''))) if isMount(dev): if not self.is_force_param() or not try_umount(dev): raise VariableError( _("Please unmount {device}, as it will be used for " "installation").format(device=dev)) # but user select non-format if not self.isTrue(isformat): raise VariableError( _("{device} must be formatted").format( device=dev)) if self.isTrue(isformat): if not mp: raise VariableError( _("No need to format unused device {dev}").format( dev=dev)) if fs in unavailFS: raise VariableError( _("Filesystem '%s' is not available") % fs) if not dev.startswith('/dev/'): raise VariableError( _("Bind mount points should not be formatted")) elif diskMount.get(dev, "") and isformat: raise VariableError( _( "{device} must but cannot be formatted, as it is mounted to {mountpoint} on the current system" ).format( device=dev, mountpoint=diskMount.get(dev, ''))) elif isMount(dev): if not self.is_force_param() or not try_umount(dev): raise VariableError( _("Please unmount disk {device} to " "use it for install").format(device=dev)) def defaultPerformFormat(self): diskFormat = dict(zip(self.Get('os_disk_dev'), (self.Get('os_disk_format')))) def wrap(info): source, dest, fs, isformat = info fixNtfs = self.fixNtfs if not isformat and source.startswith('/dev/'): if dest == '/': return "on" if dest and fixNtfs(diskFormat.get(source, fs)) != fixNtfs(fs): return "on" return isformat or ("off" if source.startswith('/dev/') else "") return wrap def set(self, value): """Default values for perform format""" value = Variable.set(self, value) DEV, MP, FS, FORMAT = 0, 1, 2, 3 info = zip(self.Get('os_location_source'), self.Get('os_location_dest'), self.Get('os_location_format'), value) return map(self.defaultPerformFormat(), map(lambda x: [x[DEV], x[MP], x[FS], ""] \ if x[FORMAT] == "off" and not x[DEV].startswith("/dev/") else x, info)) class VariableOsLocationSize(LocationHelper, SourceReadonlyVariable): """ Location size """ type = "list" indexField = "os_location_source" def init(self): self.label = _("Size") def getMap(self): mapDevSize = dict(self.ZipVars('os_disk_dev', 'os_disk_size')) mapDevSize.update( zip(self.Get('cl_autopartition_disk_dev'), self.Get('cl_autopartition_disk_size'))) return mapDevSize def get(self): return self.get_sizes(self.getMap().get) def get_sizes(self, method): devices = (device.udev.get_devname(name=x) for x in self.Get(self.indexField)) mapped = (method(x) for x in devices) return [x or "" for x in mapped] def getMapHumanReadable(self): mapDevSize = dict(zip(self.Get('os_disk_dev'), self.Get('os_disk_size', humanreadable=True))) mapDevSize.update( zip(self.Get('cl_autopartition_disk_dev'), self.Get('cl_autopartition_disk_size', humanreadable=True))) return mapDevSize def humanReadable(self): return self.get_sizes(self.getMapHumanReadable().get) class VariableClUuidSet(Variable): """ Use or not UUID for /etc/fstab """ type = "bool" opt = ["--uuid"] value = "on" def init(self): self.label = _("Use UUID") self.help = _("use UUID") def uncompatible(self): """ Unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return _("Impossible to use UUID for Flash install") ############################################################# # Install disk parameters ############################################################# class VariableOsInstallDiskData(ReadonlyTableVariable): """ Table of install disk params """ source = ["os_install_disk_dev", "os_install_disk_mount", "os_install_disk_format", "os_install_disk_perform_format", "os_install_disk_options", "os_install_disk_id", "os_install_disk_uuid", "os_install_disk_use", "os_install_disk_name", "os_install_disk_size", "os_install_disk_type", "os_install_disk_part", "os_install_disk_parent"] class VariableOsInstallDiskParent(SourceReadonlyVariable): """ Partition parent devices using for install """ type = "list" indexField = "os_install_disk_dev" def getMap(self): diskParent = dict(self.ZipVars('os_disk_dev', 'os_disk_parent')) # replace value for autopartition if self.Get('cl_autopartition_set') == 'on': disk_parent = self.Get('cl_autopartition_parent') for disk_dev in self.Get('cl_autopartition_disk_dev'): diskParent[disk_dev] = disk_parent return diskParent humanReadable = Variable.humanReadable class VariableOsInstallDiskDevBase(DeviceHelper, ReadonlyVariable): """ Variable using for resolv cyclic deps """ type = "list" def get(self): if self.Get('cl_install_type') == 'flash': disk = self.Get('os_install_disk_single') if disk: return [device.udev.get_devname(name=disk)] return [] return [dev for dev, mount in self.ZipVars('os_location_source', 'os_location_dest') if (dev.startswith("/dev") and mount and not mount.startswith("/boot/efi"))] class VariableOsInstallDiskParentBase(VariableOsInstallDiskParent): """ Partition parent devices using for install """ type = "list" indexField = "os_install_disk_dev_base" humanReadable = Variable.humanReadable class VariableOsInstallDiskDev(ReadonlyVariable, DeviceHelper): """ Disks for installation """ type = "list" def get(self): return (self.Get('os_install_uefi') + self.Get('os_install_disk_dev_base')) def humanReadable(self): return map(lambda x: self.getPerfectName(x, defaultValue=x), self.Get()) class VariableOsInstallDiskUuid(ReadonlyVariable): """ Uudi for install """ type = "list" def get(self): diskDev = self.Get('os_install_disk_dev') hashUUID = getUUIDDict(revers=True) return map(lambda x: hashUUID.get(x, "")[5:], diskDev) class VariableOsInstallDiskPartuuid(ReadonlyVariable): """ Uudi for install """ type = "list" def mapUdevProperty(self, var, prop, default): """Get each element from var through udev [prop]""" return [device.udev.get_device_info(name=x).get(prop, default) for x in self.Get(var)] def get(self): diskDev = self.Get('os_install_disk_dev') return self.mapUdevProperty('os_install_disk_dev', 'ID_PART_ENTRY_UUID', '') class VariableOsInstallDiskMountBase(ReadonlyVariable): """ List mounted points for installed system Variable use for resolv cyclic deps by UEFI vars """ type = "list" def get(self): if self.Get('cl_install_type') == 'flash': disk = self.Get('os_install_disk_single') if disk: return ["/"] return [] return [mount for dev, mount in self.ZipVars('os_location_source', 'os_location_dest') if (dev.startswith("/dev") and mount and not mount.startswith("/boot/efi"))] class VariableOsInstallDiskMount(ReadonlyVariable): """ List mounted points for installed system """ type = "list" def generate_uefi_mountpoints(self): yield "/boot/efi" for i in range(2, 20): yield "/boot/efi%d" % i def get(self): """Get install disk dest""" mps = self.generate_uefi_mountpoints() return ([next(mps) for x in self.Get('os_install_uefi')] + self.Get('os_install_disk_mount_base')) class VariableOsInstallDiskUse(ReadonlyVariable): """ /dev/sd or UUID= list (by cl_uuid_set) """ type = "list" def get(self): """Get real id (by cl_uuid_set) device""" if self.Get('cl_uuid_set') == "on": return map(lambda x: "UUID=%s" % x[0] if x[0] else x[1], zip(self.Get('os_install_disk_uuid'), self.Get('os_install_disk_dev'))) else: return self.Get('os_install_disk_dev') class VariableOsInstallDiskOptions(ReadonlyVariable): """ List mount options of installed os """ type = "list" def get(self): ssd_devices = { dev for dev, ssd in self.ZipVars('install.os_device_dev', 'install.os_device_ssd_set') if ssd == 'on' } old_options = { dev: options for dev, options in self.ZipVars('os_disk_dev', 'os_disk_options') if options } def generator(): for disk_dev, disk_format, disk_parent in self.ZipVars( 'os_install_disk_dev', 'os_install_disk_format', 'os_install_disk_parent'): if disk_dev in old_options: yield old_options[disk_dev] else: all_ssd = all(x in ssd_devices for x in disk_parent.split(',')) compression = self.Get('os_install_btrfs_compression') yield FileSystemManager.getDefaultOpt(disk_format, all_ssd, compression) return list(generator()) class VariableOsInstallDiskFormat(ReadonlyVariable): """ Install list filesystem for partition devices """ type = "choice-list" def get(self): _format = [fs for dev, mp, fs in self.ZipVars('os_location_source', 'os_location_dest', 'os_location_format') if dev.startswith('/dev/') and mp] efiformat = ['vfat' for x in self.Get('os_install_uefi')] return efiformat + _format class VariableOsInstallDiskPerformFormat(ReadonlyVariable): """ List need for format """ type = "bool-list" def get(self): _format = map(lambda x: x[2], filter(lambda x: x[0].startswith('/dev/') and x[1], zip(self.Get('os_location_source'), self.Get('os_location_dest'), self.Get('os_location_perform_format')))) if self.GetBool('cl_autopartition_set'): efiformat = ['on' for x in self.Get('os_install_uefi')] res = efiformat + _format else: vfatdevs = self.select('os_disk_dev', os_disk_format="vfat") res = ["off" if dv in vfatdevs else "on" for dv in self.Get('os_install_uefi')] + _format return res class VariableOsInstallDiskId(ReadonlyVariable): """ Install partition's system id """ type = "list" def get(self): def generator(): for (disk_dev, disk_part, disk_mount, disk_format) in self.ZipVars('os_install_disk_dev', 'os_install_disk_part', 'os_install_disk_mount', 'os_install_disk_format'): if disk_part in ("gpt", "primary", "extended", "logical"): if disk_part == "gpt": if disk_mount.startswith("/boot/efi"): disk_format = "uefi" else: disk_part = "msdos" fsinfo = FileSystemManager.supportFS.get( disk_format, FileSystemManager.default_param) yield fsinfo.get(disk_part) else: yield "" return list(generator()) class VariableOsInstallDiskName(Variable): """ New labels for disk """ type = "list" def get(self): diskLabel = dict(self.ZipVars('os_disk_dev', 'os_disk_name')) def changeLabel(info): dev, mount = info if mount == '/': return "%s-%s" % (self.Get('os_install_linux_shortname'), self.Get('os_install_linux_ver')) else: return diskLabel.get(dev, '') return map(changeLabel, self.ZipVars('os_install_disk_dev', 'os_install_disk_mount')) class VariableOsInstallDiskSize(SourceReadonlyVariable): """ New partition sizes (for feature change partition) """ type = "list" indexField = 'os_install_disk_dev' def getMap(self): if self.GetBool("cl_autopartition_set"): return { dev: size for dev, size in chain( self.ZipVars('os_disk_dev', 'os_disk_size'), self.ZipVars('os_location_source', 'os_location_size'), self.ZipVars('cl_autopartition_disk_dev_full', 'cl_autopartition_disk_size_full')) } else: return { dev: size for dev, size in chain( self.ZipVars('os_disk_dev', 'os_disk_size'), self.ZipVars('os_location_source', 'os_location_size')) } def getMapHumanReadable(self): if self.GetBool("cl_autopartition_set"): return { dev: size for dev, size in chain( self.ZipVars('os_disk_dev', 'os_disk_size', humanreadable=True), self.ZipVars('os_location_source', 'os_location_size', humanreadable=True), self.ZipVars('cl_autopartition_disk_dev_full', 'cl_autopartition_disk_size_full', humanreadable=True)) } else: return { device.udev.get_devname(name=dev): size for dev, size in chain( zip(self.Get('os_disk_dev'), self.Get('os_disk_size', humanreadable=True)), zip(self.Get('os_location_source'), self.Get('os_location_size', humanreadable=True))) } class VariableOsInstallDiskType(SourceReadonlyVariable): """ New partition scheme (for feature change partition) """ type = "list" indexField = "os_install_disk_dev" def getMap(self): diskType = dict(self.ZipVars('os_disk_dev', 'os_disk_type')) diskType.update(self.ZipVars('cl_autopartition_disk_dev', 'cl_autopartition_disk_type')) return diskType humanReadable = Variable.humanReadable class VariableOsInstallDiskPart(SourceReadonlyVariable): """ Get new type partitions using for install """ type = "list" indexField = "os_install_disk_dev" def getMap(self): diskPart = dict(self.ZipVars('os_disk_dev', 'os_disk_part')) diskPart.update(self.ZipVars('cl_autopartition_disk_dev', 'cl_autopartition_disk_part')) return diskPart humanReadable = Variable.humanReadable class VariableOsInstallBindData(ReadonlyTableVariable): """ Table of install bind mount points """ source = ['os_install_bind_path', 'os_install_bind_mountpoint'] class VariableOsInstallBindPath(ReadonlyVariable): """ Install directories for bind """ type = "list" def get(self): """Get install bind source""" return self.Select('os_location_source', where='os_location_dest', func=lambda x: not x[1].startswith('/dev/') and x[0]) class VariableOsInstallBindMountpoint(ReadonlyVariable): """ Mountpoint for install directories bind """ def get(self): return self.Select('os_location_dest', where='os_location_source', func=lambda x: not x[0].startswith('/dev/') and x[1]) class VariableOsInstallBootloader(ReadonlyVariable): """ Bootloader for brief information """ def init(self): self.label = _("Bootloader") def get(self): if self.Get('os_install_uefi_set') == 'on': return "uefi" else: return ",".join(self.Get('os_install_mbr')) def humanReadable(self): if self.Get('os_install_uefi_set') == 'on': return _("UEFI") else: mbrs = self.Get('os_install_mbr', humanreadable=True) if not mbrs: return _("no") return ",".join(mbrs) class VariableOsInstallBootDevices(ReadonlyVariable): """ Физическое устройство с которого будет производиться загрузка системы на котором находится /boot или /, т.е. если / находится на RAID, расположенном на двух дисках - будет эти диски """ type = "list" def get(self): bootDev = (self.Select('os_install_disk_parent', where='os_install_disk_mount', _in=('/', '/boot'), sort="DESC", limit=1) or self.select('os_disk_parent', os_disk_dev=self.Get('os_install_root_dev'), limit=1)) if bootDev: devices = bootDev.split(',') return [mbr for dev, mbr in self.ZipVars('os_device_dev', 'os_device_mbr') if mbr and dev in devices] return [] class VariableOsUefi(ReadonlyVariable): """ UEFI partitions from fstab """ def get(self): return self.select('os_disk_dev', os_disk_mount__startswith="/boot/efi") class VariableOsInstallUefi(LocationHelper, Variable): """ UEFI partitions for install """ type = "choiceedit-list" element = "selecttable" opt = ["--uefi"] metavalue = "EFI" re_not0_raid = re.compile("-raid[1-9]") def init(self): self.label = _("UEFI boot") self.help = _("set UEFI boot disks") @property def install_to_not_x86_64(self): return self.Get('os_install_arch_machine') != 'x86_64' @property def install_without_uefiboot(self): return self.Get('os_uefi_set') == 'off' @property def install_to_flash(self): return self.Get('os_install_root_type') == 'flash' def is_force_param(self): return "--force" in self.Get("cl_console_args") def get(self): # если используется авторазметка список разделов находится в ней if self.GetBool('cl_autopartition_set'): return self.Get('cl_autopartition_efi') # исключаем определение UEFI если оно не может быть использовано if (self.install_to_flash or self.install_to_not_x86_64 or self.install_without_uefiboot): return [] # если происходит обновление загрузчика текущей системы # для определения используем /etc/fstab fstabefidevs = self.Get('os_uefi') if self.Get('cl_action') != 'system': return fstabefidevs rootdev = self.Get('os_install_root_dev') rootscheme = self.select('os_disk_type', os_disk_dev=rootdev, limit=1) # определяем список физических дисков на которых находится rootdev parents = set(self.select('os_disk_parent', os_disk_dev=rootdev, limit=1).split(',')) efidev = [x for x in self.select('os_device_efi', os_device_dev__in=parents) if x] allefi = [x for x in self.select('os_device_efi', os_device_type="hdd") if x] # если корневое устройство расположено на ненулевом RAID - возвращаем # полный список иначе только первое устройство # если диски для установки не содержат EFI - берём efi из /etc/fstab # если и там нет, то берём первый попавшийся EFI на любом из HDD if self.re_not0_raid.search(rootscheme): return efidev or fstabefidevs or allefi[:1] # возвращаем первое найденное устройство else: return efidev[:1] or fstabefidevs or allefi[:1] def set(self, value): def transform(efidev): if efidev not in self.Get('os_device_efi'): return self.select('os_device_efi', os_device_dev=efidev, limit=1) or efidev return efidev return filter(lambda x: x != "off", map(transform, value)) def choice(self): deviceParentMap = self.ZipVars('os_device_dev', 'os_device_efi', 'os_device_name') return [(efidisk, "%s (%s)" % (dev, name or _("Unknown"))) for dev, efidisk, name in deviceParentMap if efidisk] def check(self, value): if value: efi_boot_mgr = getProgPath('/usr/sbin/efibootmgr') if not efi_boot_mgr: raise VariableError( _("UEFI installation is unavailable, because '%s' command " "not found") % efi_boot_mgr) if self.install_without_uefiboot: raise VariableError( _("Your system must be loaded in UEFI for using this " "bootloader")) if self.install_to_not_x86_64: raise VariableError( _("Architecture of the target system must be x86_64")) efidevs = self.Get('os_device_efi') badefi = [x for x in value if x not in efidevs] if badefi: raise VariableError( _("Wrong EFI device %s") % badefi[0]) fstab_disks = [dev for dev, mp in self.ZipVars('os_disk_dev', 'os_disk_mount') if mp and not mp.startswith("/boot/efi") ] for disk in value: if disk in fstab_disks: raise VariableError( _("Partition {disk} already used by " "the current system").format(disk=disk)) not_fat_efi = self.select('os_disk_dev', os_disk_format__ne="vfat") for efipart in value: if efipart in not_fat_efi and isMount(efipart): if not self.is_force_param() or not try_umount(efipart): raise VariableError( _("Please unmount {device}, as it will be used for " "installation").format(device=efipart)) for efipart in value: if efipart in self.select('os_location_source', os_location_dest__ne=""): raise VariableError( _("Partition {disk} already used for " "installation").format(disk=efipart)) def uncompatible(self): """ Uncompatible with autopartition """ if self.Get('cl_autopartition_set') == "on": return \ _("The layout is not available with autopartitioning") if self.Get('os_install_root_type') == 'flash': return \ _("This option not used for Flash install") return "" class VariableOsInstallMbr(LocationHelper, Variable): """ Disks for boot mbr """ type = "choiceedit-list" element = "selecttable" opt = ["--mbr"] metavalue = "MBR" untrusted = True check_after = ["os_install_uefi"] def init(self): self.label = _("Boot disk") self.help = _("boot disk for the system bound for install") def get(self): """Get default Master boot record install""" if self.Get('os_install_uefi_set') == 'on': return [] if self.Get('cl_autopartition_set') == 'on': return self.Get('cl_autopartition_mbr') if self.Get('os_install_root_type') in ("flash", "usb-hdd"): rootdev = self.Get('os_install_root_dev') device = filter(lambda x: x in rootdev, self.Get('os_device_dev')) if device: return [device[0]] else: return [] bootdevices = self.Get('os_install_boot_devices') # при установке с HDD также устанавливаем загрузчик на первый диск # если есть возможность if self.Get('os_root_type') == "hdd": first_hdd = self.Select( 'os_device_dev', where='os_device_type', eq='hdd', limit=1) if self.select('os_device_mbr', os_device_dev=first_hdd, limit=1): bootdevices.append(first_hdd) return sorted(set(bootdevices)) def choice(self): deviceParentMap = self.ZipVars('os_device_mbr', 'os_device_name') return [(mbrdisk, "%s (%s)" % (mbrdisk, name or _("Unknown"))) for mbrdisk, name in deviceParentMap if mbrdisk] def set(self, value): # support off value return filter(lambda x: x != "off", value) def check(self, value): if self.GetBool('cl_autopartition_set'): return rootType = self.Get('os_install_root_type') if rootType == "flash": if len(value) > 1: raise VariableError( _("For Flash install, you need only one disk")) if value and self.Get('os_install_uefi_set') == "on": raise VariableError(_("MBR is not used with the UEFI bootloader")) useBtrfs = "btrfs" in self.Select('os_install_disk_format', where='os_install_disk_mount', _in=('/', '/boot'), sort="DESC")[:1] for mbrDisk in value: if self.Get('cl_autopartition_set') == 'on': tableOnBootDisk = self.Get('cl_autopartition_table') else: tableOnBootDisk = self.Select('os_device_table', where="os_device_dev", eq=mbrDisk, limit=1) if not tableOnBootDisk: raise VariableError( _("Disk '%s' needs a partition table for the boot record") % mbrDisk) if rootType == "flash": if tableOnBootDisk == "gpt": raise VariableError(_("You need a disk with a dos " "table for Flash install")) if rootType in ("usb-hdd", "hdd") and tableOnBootDisk == "gpt": bbsizes = ( size for size, disk_id, disk_parent in self.ZipVars( 'os_disk_size', 'os_disk_id', 'os_disk_parent' ) if disk_id == 'EF02' and mbrDisk in disk_parent ) bios_grub_size = self.Get('cl_autopartition_bios_grub_size') for bbsize in bbsizes: minsize = "%dMb" % (int(bios_grub_size) / Sizes.M) if not bbsize: raise VariableError( _("Your boot device must have a " "BIOS Boot partition ({minsize})").format( minsize=minsize)) # проверка размера EF02 при установке на btrfs elif useBtrfs: if (bbsize.isdigit() and bios_grub_size.isdigit() and round(float(bbsize) / Sizes.M) < round(float(bios_grub_size)/ Sizes.M)): raise VariableError( _("Your boot device must have a BIOS Boot " "partition ({minsize})").format( minsize=minsize)) if mbrDisk not in self.Get('os_device_mbr'): raise VariableError( _("Device {device} has not BIOS Boot partition").format( device=mbrDisk)) if value: if not self.Get('os_grub2_path'): self.checkForLegacyGrub() def checkForLegacyGrub(self): """Check current disk configuration for installation for install legacy grub""" bootDiskType, bootDiskFormat = \ self.Select(['os_install_disk_type', 'os_install_disk_format'], where='os_install_disk_mount', _in=('/', '/boot'), sort="DESC", limit=1) if "lvm" in bootDiskType or "raid" in bootDiskType: raise ValueError( _("Legacy grub requires a separate /boot partition " "to support boot from a RAID or a LVM")) if bootDiskFormat in ("btrfs", "nilfs2"): raise ValueError( _("To support booting from %s, legacy grub needs a " "separate /boot partition") % bootDiskFormat) def uncompatible(self): """ Опция несовместима с использованием UEFI """ if self.Get('cl_autopartition_set') == "on": return \ _("The layout is not available with autopartitioning") return "" class VariableOsInstallRootType(LocationHelper, Variable): """ Type of installation """ opt = ["--type"] metavalue = "DISKTYPE" type = "choice" def init(self): self.help = _("device type for the system bound for install") self.label = _("Installation type") def get(self): selectRootType = self.Get('cl_install_type') if not selectRootType: return self.Get('os_root_type') if selectRootType == "flash": return "flash" else: rootdev = self.Get('os_install_root_dev') devs = list(device.udev.get_disk_devices(name=rootdev)) if not devs: return "hdd" device_type = self.Select( 'os_device_type', where='os_device_dev', eq=devs[0], limit=1) if device_type in ("usb-hdd", "flash"): return "usb-hdd" return "hdd" def choice(self): return [("hdd", _("Hard disk")), ("flash", _("USB Flash")), ("usb-hdd", _("USB Hard Disk"))] class VariableOsInstallRootDev(ReadonlyVariable): def get(self): """Get install root device""" if self.Get('cl_action') == 'system': return self.Select('os_install_disk_dev_base', where='os_install_disk_mount_base', eq="/", limit=1) or '' else: return self.Get('os_root_dev') class VariableOsInstallRootUuid(ReadonlyVariable): def get(self): """UUID корневого устройства""" if self.Get('cl_action') == 'system': root_dev = self.Get('os_install_root_dev') return self.Select('os_install_disk_uuid', where='os_install_disk_dev', eq=root_dev, limit=1) or '' else: root_dev = self.Get('os_root_dev') return self.Select('os_disk_uuid', where='os_disk_dev', eq=root_dev, limit=1) or '' class VariableOsInstallFstabMountConf(DeviceHelper, ReadonlyVariable): """ FStab.conf contains for mount and bind points """ def _commentFstab(self, s, mp, dev): """Generate comment for /etc/fstab each line""" if s.startswith("UUID"): return "# %s was on %s during installation\n%s" % (mp, dev, s) else: return s def formatFstab(self, used, dev, mp, fs, opts, spec): if fs in FileSystemManager.supportFS: fs_orig = FileSystemManager.supportFS[fs].get('orig', fs) else: fs_orig = fs ret = "{dev}\t{mp}\t{fs}\t{opts}\t{spec}".format( dev=used, mp=mp, fs=fs_orig, opts=opts, spec=spec ) if used.startswith("UUID"): return "# %s was on %s during installation\n%s" % (mp, dev, ret) return ret def get(self): devicesForFstab = self.Select([ 'os_install_disk_use', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_options', 'os_install_disk_dev'], where='os_install_disk_mount', func=lambda x: x[0] != "" and x[0] != "swap") devicesForFstab = sorted( devicesForFstab, key=lambda x: self.separateDevice(x[1])) rootLine = "\n".join( self.formatFstab(used, dev, mp, fs, opts, "0 1") for used, mp, fs, opts, dev in devicesForFstab[:1] ) otherLines = "\n".join( self.formatFstab(used, dev, mp, fs, opts, "0 0") for used, mp, fs, opts, dev in devicesForFstab[1:] ) bindData = self.ZipVars('os_install_bind_path', 'os_install_bind_mountpoint') bindLines = "\n".join(map(lambda x: "%s\t%s\tnone\tbind\t0 0" \ % (x[0], x[1]), bindData)) return "\n".join(filter(lambda x: x, [rootLine, otherLines, bindLines])) class VariableOsInstallFstabEfiConf(VariableOsInstallFstabMountConf): """ Переменная содержит часть fstab в которой содержится описание подключения /boot/efi """ def get(self): devicesForFstab = self.Select([ 'os_install_disk_use', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_options', 'os_install_disk_dev'], where='os_install_disk_mount', func=lambda x: x[0].startswith("/boot/efi")) devicesForFstab = sorted( devicesForFstab, key=lambda x: self.separateDevice(x[1])) efiLines = "\n".join( self.formatFstab(used, dev, mp, fs, opts, "0 0") for used, mp, fs, opts, dev in devicesForFstab ) return "\n".join(filter(lambda x: x, [efiLines])) class VariableOsInstallFstabSwapConf(VariableOsInstallFstabMountConf): """ FStab.conf contains swap partition """ def get(self): return "\n".join(map(lambda x: "%s\tnone\tswap\tsw\t0 0" % \ self._commentFstab(x[0], "swap", x[2]), self.Select(['os_install_disk_use', 'os_install_disk_mount', 'os_install_disk_dev'], where='os_install_disk_mount', eq='swap'))) class VariableClInstallType(Variable): """ Installation type (extension variable describe, that install must be to flash or hdd """ type = "choice" value = "" def choice(self): return ["", "flash", "hdd"] def check(self, value): for dn in ("/proc", "/sys", "/dev", "/dev/pts"): if not isMount(dn): raise VariableError(_("%s is not mounted") %dn ) check_fn = '/run/.calculate-rw-check-%d' % os.getpid() try: with open(check_fn,'w') as f: pass os.unlink(check_fn) except (IOError,OSError) as e: raise VariableError(_("Failed to create data in /run")) class VariableOsInstallDiskSingle(Variable): """ Installation disk """ type = "choiceedit" opt = ["--disk", "-d"] metavalue = 'DISK' untrusted = True value = "" def init(self): self.label = _("Installation disk") self.help = _("set the USB Flash device") def choice(self): def generator(): device_names = dict(self.ZipVars('os_device_dev', 'os_device_name')) for disk_dev, disk_type, disk_parent in self.ZipVars( 'os_disk_dev', 'os_disk_type', 'os_disk_parent'): if disk_type == "disk-partition": device_name = device_names.get(disk_parent, _("Unknown")) yield disk_dev, "%s (%s)" % (disk_dev, device_name) return list(generator()) def check(self, value): # проверить, чтобы был выбран именно раздел if value not in self.Get('os_disk_dev'): raise VariableError( _("Wrong device '%s'" % value) ) disktype = self.select('os_disk_type', os_disk_dev=value, limit=1) if disktype and disktype != "disk-partition": raise VariableError( _("Wrong device '%s'" % value)) # проверить, чтобы раздел не использовался системой (не описан в fstab) mp = self.select('os_disk_mount', os_disk_dev=value, limit=1) if mp: raise VariableError( _("The partition {dev} is already in use as {mp}").format( dev=value, mp=mp)) # если система загружена с флешки (не iso) - нельзя переустановить # эту систему root_type = self.Get('os_root_type_ext') if root_type in RootType.LiveFlash: if value == self.Get('os_root_flash_dev'): raise VariableError( _("You cannot install the new system instead current")) # detect using extended partition disk_part = self.select('os_disk_part', os_disk_dev=value, limit=1) if disk_part == 'extended': raise VariableError( _("Unable to use extended partition %s for installation") % value) if "cdrom" in disk_part: raise VariableError(_("Unable to use CDROM %s for installation") % value) if not disk_part or disk_part == 'gpt': raise VariableError(_("You need a disk with a dos " "table for Flash install")) class VariableOsInstallFormatSingleSet(Variable): """ Форматировать Flash """ type = "bool" opt = ["--format"] untrusted = True value = "off" def init(self): self.label = _("Format the USB Flash") self.help = _("perform the formatting of the USB Flash drive") def must_be_formatted(self, dev): fs = self.select('os_disk_format', os_disk_dev=dev, limit=1) if fs != "vfat": return True return False def cannot_be_formatted(self, dev): flash_dev = self.Get('os_root_flash_dev') return flash_dev and dev == flash_dev def check(self, value): devs = self.Get('os_disk_dev') dev = self.Get('os_install_disk_single') if dev not in devs: return if value == "on": if self.cannot_be_formatted(dev): raise VariableError( _("You cannot format the USB Flash which " "contains the current system")) else: if self.must_be_formatted(dev): raise VariableError( _("{device} must be formatted").format(device=dev)) if dev: try: with FlashDistributive(dev) as f: dn = f.getDirectory() df = DiskSpace() free_size = df.get_free(dev) squash_fn = path.join(dn, "livecd.squashfs") if not path.exists(squash_fn): source = self.Get('cl_image') if isinstance(source, IsoDistributive): image_size = source.get_squash_size() if image_size > free_size: raise VariableError( _("Not enough free space on the " "USB Flash")) except DistributiveError: pass class VariableOsInstallBtrfsCompression(Variable): """ Алгоритм сжатия для btrfs, в которых выбрано использовать сжатие """ type = "choiceedit" value = "zstd" def choice(self): return ["zlib","lzo","zstd"] def check(self, value): if not re.search(r"^(zlib|lzo|zstd|(1[0-9]|1-9))$", value): raise VariableError(_("Wrong btrfs compression"))