# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import re from os import path from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable, TableVariable, PasswordError, DataVarsError, VariableInterface) from calculate.install.fs_manager import FileSystemManager from calculate.lib.utils.files import (readFile, getProgPath, process, readLinesFile) from calculate.lib.utils.common import getPasswdUsers, getUserGroups, getGroups, \ CmdlineParams from calculate.lib.utils.common import getValueFromConfig, getValueFromCmdLine from calculate.lib.utils.common import getUserPrimaryGroup from calculate.lib.utils.portage import isPkgInstalled from calculate.lib.utils.device import getUdevDeviceInfo from crypt import crypt from autopartition import SchemeOpt, SchemeDevices from calculate.lib.encrypt import encrypt import calculate.lib.cl_ini_parser as cl_ini_parser from calculate.lib.cl_lang import setLocalTranslate, _ setLocalTranslate('cl_install3', sys.modules[__name__]) class UserHelper(VariableInterface): """ Locale variables not using for flash installation """ xorg_need = False def uncompatible(self): """ User setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return _("User configuration unavailable for Flash install") if self.Get('os_install_x11_server_set') == 'off' and self.xorg_need: return _("Autologin is available for Xorg sessions only") return "" class VariableOsInstallScratch(ReadonlyVariable): """ Install system in scratch mode """ type = "bool" opt = ['--build'] def get(self): # for installation default - normal system if self.Get('cl_action') == 'system': return "off" else: return self.Get('os_scratch') class VariableOsFormatType(ReadonlyVariable): """ Filesystem format support by calcualte-install """ type = "list" def get(self): """Filesystem format support by calcualte-install""" return FileSystemManager.supportFS.keys() class VariableOsFormatUse(ReadonlyVariable): """ Avialable format by mkfs utility """ type = "list" # (on or off) autoupdate config from install program cl_autoupdate_set = { 'type': "bool", 'value': "off"} def checkFunc(self, fs): if "format" in FileSystemManager.supportFS[fs] and \ path.exists(FileSystemManager.supportFS[fs]["format"]): return "yes" return "no" def get(self): return map(self.checkFunc, self.Get('os_format_type')) class VariableClMigrateRootPwd(UserHelper, Variable): """ Root password """ type = "password" opt = ["--root-password"] metavalue = 'PASSWORD' untrusted = True def init(self): self.help = _("specify the root password") self.label = _("Root password") def get(self): rootPasswd = map(lambda x: x[1], filter("root".__eq__, map(lambda x: x.split(':')[0:2], readLinesFile('/etc/shadow')))) if rootPasswd: rootPasswd = rootPasswd[0] else: rootPasswd = "" # if root password is "root" if rootPasswd: salt = "".join(rootPasswd.rpartition("$")[:1]) if salt and crypt("root", salt) == rootPasswd: rootPasswd = "" return rootPasswd or "" def set(self, value): """ Encrypt password """ reCheck = re.compile("^\$[^$]+\$[^$]+\$.*$") encryptObj = encrypt() if reCheck.match(value) or not value: return value else: return encryptObj.getHashPasswd(value, "shadow_ssha256") def check(self, value): if not value: raise PasswordError(_("Password for user %s missing") % "root") class VariableClInstallHomeCryptSet(UserHelper, Variable): type = 'bool' opt = ["--crypt-home", "-C"] untrusted = True def init(self): self.help = _("encrypt user profiles") self.label = _("Encrypt user profiles") def get(self): return ("off" if self.Get('cl_autologin') else self.Get('cl_home_crypt_set')) def check(self, value): if value == "on" and self.Get('cl_autologin'): raise VariableError( _("User profile encryption is uncompatible with autologin")) class VariableClMigrateData(UserHelper, TableVariable): """ User migrate data table """ type = 'table' opt = ["--user", "-u"] metavalue = 'USER[:GROUPS]' source = ['cl_migrate_user', 'cl_migrate_user_groups', 'cl_migrate_user_pwd'] untrusted = True def init(self): self.help = _("add a user to the installed system") self.label = _("Migrating users") class VariableClMigrateDataBrief(UserHelper, TableVariable): """ User migrate data table for brief view """ source = ['cl_migrate_user', 'cl_migrate_user_groups'] def init(self): self.label = _("Migrating users") class VariableClMigrateUser(UserHelper, Variable): """ Migrate users list """ type = 'list' def init(self): self.label = _("Users") def get(self): """ Migrating users (users above 1000 uid) """ return filter("root".__ne__, getPasswdUsers()) class VariableClMigrateUserGroups(UserHelper, Variable): """ Migrate users groups """ type = 'choice-list-list' defaultGroupList = ["users", "wheel", "audio", "cdrom", "video", "cdrw", "usb", "plugdev", "games", "lp", "scanner", "uucp"] def getDefaultGroups(self): return list(set(self.defaultGroupList) & set(getGroups())) def init(self): self.label = _("Groups") def set(self, value): value = map(lambda x: (filter(None, x) if x and any(x) else self.getDefaultGroups()), value) return value def getPrimaryGroup(self, username): pg = getUserPrimaryGroup(username) if pg: return [pg] return [] def get(self): """ User groups """ passwdList = getPasswdUsers() return map(lambda x: (self.getPrimaryGroup(x) + (getUserGroups(x) if x in passwdList else self.getDefaultGroups())), self.Get('cl_migrate_user')) def choice(self): """ Available groups """ return [("", _("Default"))] + [(x, x) for x in getGroups()] class VariableClMigrateUserPwd(UserHelper, Variable): """ Migrate users who need to change passwords """ type = 'password-list' def init(self): self.label = _("Password") def get(self): """ Migrating users passwords """ retList = [] fileName = "/etc/shadow" if os.access(fileName, os.R_OK): migrateusers = self.Get("cl_migrate_user") if migrateusers: lenData = 9 shadowData = filter(lambda x: len(x) == lenData, map(lambda x: x.rstrip().split(":"), open(fileName))) shadowData = filter(lambda x: x[0] in migrateusers, shadowData) shadowData = map(lambda x: (x[0], x[1]), shadowData) shadowUsers = map(lambda x: x[0], shadowData) for userName in migrateusers: if userName in shadowUsers: userData = filter(lambda x: x[0] == userName, shadowData) hashPwd = userData[0][1] retList.append(hashPwd) else: retList.append("") return retList def check(self, value): """ Check exists password for all migrate users """ for user, pwd in zip(self.Get('cl_migrate_user'), value): if not pwd: raise PasswordError( _("Password for user %s missing") % user) def set(self, value): """ Encrypt passwords """ reCheck = re.compile("^\$[^$]+\$[^$]+\$.*$") encryptObj = encrypt() return map(lambda x: x if reCheck.match(x) or not x else \ encryptObj.getHashPasswd(x, "shadow_ssha256"), value) class VariableClAutologin(UserHelper, Variable): """ Autologin variable (contains user name for autologin) or empty string if disable """ type = 'choiceedit' opt = ["--autologin", '-A'] metavalue = "USER" xorg_need = True def init(self): self.label = _("Autologin") self.help = _("set an autologin for the user, 'off' for disable") def get(self): # autologin enable for livecd and all install type CMC cmdDomainSet = (getValueFromCmdLine( "calculate", CmdlineParams.DomainPassword) or getValueFromCmdLine( "calculate", CmdlineParams.Domain) or "") if (not cmdDomainSet and self.Get('os_install_root_type') == "livecd") or \ self.Get('os_install_linux_shortname') == "CMC": nonRootUsers = filter(lambda x: x != "root", self.Get('cl_migrate_user')) if nonRootUsers: return nonRootUsers[0] else: return "" return "" def set(self, value): return {'none': '', 'off': ''}.get(value, value) def choice(self): yield ("off", _("No autologin")) for user in (x for x in self.Get('cl_migrate_user') if x != "root"): yield (user,user) def check(self, value): """ Autologin only for migrated non-root users """ if value and not value in self.Get('cl_migrate_user'): raise VariableError(_("User %s does not exist") % value) if value == "root": raise VariableError( _("Autologin is unavailable for user %s") % value) def humanReadable(self): return self.Get() or _("Not used") def uncompatible(self): """ Network setting up unavailable for flash installation """ try: if (self.Get('cl_action') == 'merge' and self.Get('client.cl_remote_host')): return \ _("The autologin is not available with domain workstations") except DataVarsError: pass return UserHelper.uncompatible(self) class VariableClInstallAutoupdateSet(Variable): """ (on or off) autoupdate config from install program for install """ type = "bool" value = "off" class VariableOsInstallMakeopts(Variable): """ Make.conf makeopts """ def get(self): cpunum = self.Get('hr_cpu_num') if cpunum == "1": return "-j1" else: return "-j%d" % (int(cpunum) + 1) class VariableOsGrubConf(ReadonlyVariable): """ DEPRICATED content of current grub.conf """ class VariableOsInstallGrubDevicemapConf(ReadonlyVariable): """ DEPRICATED content of device.map file for grub """ os_install_grub_devicemap_conf = {} class VariableClDistfilesPath(Variable): """ DISTFILES path """ value = '/var/calculate/remote/distfiles' class VariableClPkgdirPath(Variable): """ PKGDIR path """ def get(self): return "/var/calculate/remote/packages/%s/%s" % ( self.Get('os_install_linux_shortname'), self.Get('os_install_arch_machine')) class VariableClInstallDevFrom(Variable): """ Root device of previous installed os """ def set(self, value): """ If device in calculate3.env dev_from not exists set '' """ if value: value = getUdevDeviceInfo(name=value).get('DEVNAME', value) if value in self.Get('os_disk_dev'): return value else: return "" def get(self): if (self.Get('cl_autopartition_set') == 'on' and SchemeOpt.Update in self.Get('cl_autopartition_scheme')): return self.Select('cl_autopartition_disk_dev', where='cl_autopartition_disk_scheme', eq=SchemeDevices.Root2, limit=1) return "" class VariableOsNvidiaMask(ReadonlyVariable): """ Get nvidia card mask versions """ def get(self): image = self.Get('cl_image') try: if image: image = image.convertToDirectory() chrootPath = image.getDirectory() else: chrootPath = self.Get("cl_chroot_path") nvidiaeclass = path.join(chrootPath, 'usr/portage/eclass/nvidia-driver.eclass') if not os.access(nvidiaeclass, os.R_OK): return "" category = "0300" vendor = "10de:" lsPciProg = getProgPath("/usr/sbin/lspci") nvidiacards = filter(lambda x: " %s: " % category in x, process(lsPciProg, "-d", vendor, "-n")) if not nvidiacards: return "" cardsid = \ map(lambda x: x.groups()[0], filter(lambda x: x, map(lambda x: re.search( "[0-9a-fA-F]{4}:([0-9a-fA-F]{4})", x), nvidiacards))) if not cardsid: return "" eclassdata = readFile(nvidiaeclass) drv_categories = re.findall('^drv_([^=]+)="', eclassdata, re.M) drvs = map(lambda x: (x[0], x[1].replace('\\\n', '').split()), re.findall( '\ndrv_(%s)="(.*?)"' % "|".join(drv_categories), eclassdata, re.S)) mask_categories = re.findall('^mask_([^=]+)="', eclassdata, re.M) masks = dict(map(lambda x: (x[0], x[1].replace('\\\n', '')), re.findall('\nmask_(%s)="(.*?)"' % "|".join( drv_categories), eclassdata, re.S))) drvsForCardsid = filter(lambda x: set(x[1]) & set(cardsid), drvs) if drvsForCardsid and drvsForCardsid[0][0] in masks: return masks[drvsForCardsid[0][0]] finally: if image: image.close() return "" class VariableOsInstallLvmSet(ReadonlyVariable): """ Using lvm """ type = "bool" def get(self): for typeDisk in self.Get('os_install_disk_type'): if "lvm" in typeDisk.lower(): return "on" else: return "off" class VariableOsInstallMdadmSet(ReadonlyVariable): """ Using mdadm """ type = "bool" def get(self): for typeDisk in self.Get('os_install_disk_type'): if "raid" in typeDisk.lower(): return "on" else: return "off" class VariableClChrootGrub(ReadonlyVariable): """ Chroot for grub-mkconfig """ def get(self): if self.Get('os_install_scratch') == "on": if self.Get('cl_action') == 'system': return self.Get('cl_target').mdirectory else: return path.join(self.Get('cl_chroot_path'), "mnt/scratch") else: return self.Get('cl_chroot_path') class VariableOsGrub2Path(Variable): """ Get Grub2 Install cmd (grub-install or grub2-install) """ def get(self): # find grub2-install grubInstall = getProgPath('/usr/sbin/grub2-install') if grubInstall: return grubInstall # find grub-install and check, that this is grub2-install (ver 1.99) grubInstall = getProgPath('/usr/sbin/grub-install') if grubInstall and filter(lambda x: "1.99" in x or "2." in x, process(grubInstall, '--version')): return grubInstall return "" class VariableClSetup(Variable): """ Type of setup """ type = "choice" value = "" def choice(self): return ["audio", "network", "locale", "video", "boot", "users", "session", ""] def humanReadable(self): mapType = {'network': _("network settings"), 'locale': _("localization and time options"), 'video': _("video settings"), 'boot': _("boot parameters"), 'audio': _("audio parameters"), 'session': _("session settings"), 'users': _("user settings")} return mapType.get(self.Get(), "") def check(self, value): if value == "boot" and self.Get('os_install_root_type') == 'livecd': raise VariableError( _("Boot configuration is not available on a LiveCD")) class VariableClLive(Variable): """ Apply live templates """ value = "off" type = "bool" opt = ['--live'] def init(self): self.label = _("Configure dynamic options only") self.help = _("configure dynamic options only") class VariableOsInstallPxe(Variable): """ Installation for PXE loading """ type = "boot" value = "off" untrusted = True def check(self, value): if value == "on": if self.Get('os_linux_system') != "server": raise VariableError( _("PXE install is available for Calculate " "Directory Server only") + '.') for pkg in ['net-misc/dhcp', 'net-ftp/tftp-hpa', 'net-fs/nfs-utils']: if not isPkgInstalled(pkg): raise VariableError( _("For PXE install, you need to install package %s") % pkg) for env_fn in ('/etc/calculate/calculate.env', '/var/lib/calculate/calculate.env'): try: config = cl_ini_parser.iniParser(env_fn) val = config.getVar('server', 'sr_dhcp_set') if val.encode('utf-8') == "on": return except Exception: pass raise VariableError( _("PXE install is only available if the DHCP " "service has been configured first")) class VariableOsInstallPxePath(Variable): """ Path to PXE installation """ value = "/var/calculate/pxe" opt = ['--pxe-path'] def init(self): self.label = _("Installation path") self.help = _("path for PXE install") class VariableOsInstallUefiSet(Variable): """ Install in UEFI """ type = "bool" opt = ['--uefi'] def init(self): self.label = _("UEFI boot") self.help = _("use UEFI boot") def get(self): if self.Get('cl_autopartition_set') == 'on': return self.Get('cl_autopartition_uefi_set') else: if self.Get('os_install_disk_efi') or \ "/boot/efi" in self.Get('os_location_dest'): if self.Get('os_install_arch_machine') == 'x86_64' and \ self.Get('os_install_root_type') != 'flash': return self.Get('os_uefi_set') return 'off' def check(self, value): if value == 'on': if self.Get('os_uefi_set') == 'off' and \ self.Get('os_install_root_type') == 'hdd': raise VariableError( _("Your system must be loaded in UEFI for using this " "bootloader")) if not 'gpt' in self.Get('os_device_table'): raise VariableError( _("GPT is needed for using the UEFI bootloader")) if not (self.Get('os_install_disk_efi') or "/boot/efi" in self.Get('os_location_dest')): raise VariableError( _("A EF00 partition is needed for using " "the UEFI bootloader")) if self.Get('os_install_arch_machine') != 'x86_64': raise VariableError( _("Architecture of the target system must be x86_64")) if self.Get('os_install_root_type') == 'flash': raise VariableError( _("This option not used for Flash install")) def uncompatible(self): """ Uncompatible with autopartition """ if self.Get('cl_autopartition_set') == "on": return \ _("The layout is not available with autopartitioning") if self.Get('os_install_root_type') == 'flash': return \ _("This option not used for Flash install") return "" class VariableOsInstallUefiBriefSet(VariableOsInstallUefiSet): def uncompatible(self): if self.Get('os_install_root_type') == 'flash': return _("This option not used for Flash install") return "" def get(self): return self.Get('os_install_uefi_set') class VariableOsInstallGrubTerminal(Variable): """ Gfxmode """ type = "choice" opt = ['--grub-terminal'] metavalue = "TERMINAL" def init(self): self.label = _("Grub terminal") self.help = _("grub terminal") def get(self): cmdLine = '/proc/cmdline' if 'grub_nogfxmode' in readFile(cmdLine): return 'console' grubDefault = path.join(self.Get('cl_chroot_path'), 'etc/default/grub') if getValueFromConfig(grubDefault, 'GRUB_TERMINAL') == 'console': return 'console' grubCfg = '/boot/grub/grub.cfg' if re.search('^terminal_output\s*console', readFile(grubCfg), re.M): return 'console' return 'gfxterm' def choice(self): return ['gfxterm', 'console'] def uncompatible(self): """ Grub setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return _("Grub configuration unavailable for Flash install") return "" class PackageCheckVariable(ReadonlyVariable): """ Конструктор для переменных проверки установлен ли пакет """ image = False package = None type = "bool" image_variable = "cl_image" prefix_variable = "cl_chroot_path" def get(self): try: if self.image: image = self.Get(self.image_variable) if image: with image as distr: distrPath = image.getDirectory() if isPkgInstalled(self.package, prefix=distrPath): return "on" else: prefix = self.Get(self.prefix_variable) if isPkgInstalled(self.package, prefix=prefix): return "on" except Exception: pass return "off" class VariableOsInstallAlsaSet(PackageCheckVariable): """ Установлен ли media-sound/alsa-utils """ image = True package = "media-sound/alsa-utils" class VariableOsInstallX11ServerSet(PackageCheckVariable): """ Установлен ли x11-base/xorg-server """ image = True package = "x11-base/xorg-server" class VariableOsInstallSplashSet(Variable): """ Переменная отображать splash при загрузки """ type = "bool" value = "on" class FlashUncompatible(VariableInterface): def uncompatible(self): """ Update setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return \ _("Update configuration unavailable for Flash install") return "" try: import calculate.update.variables.update as update class VariableClInstallAutocheckSet(FlashUncompatible, update.VariableClUpdateAutocheckSet): def get(self): return self.Get('update.cl_update_autocheck_set') class VariableClInstallAutocheckInterval(FlashUncompatible, update.VariableClUpdateAutocheckInterval): def get(self): return self.Get('update.cl_update_autocheck_interval') class VariableClInstallCleanpkgSet(FlashUncompatible, update.VariableClUpdateCleanpkgSet): def get(self): return self.Get('update.cl_update_cleanpkg_set') class VariableClInstallOtherSet(FlashUncompatible, update.VariableClUpdateOtherSet): def get(self): return self.Get('update.cl_update_other_set') except ImportError: update = None class VariableClInstallAutocheckSet(FlashUncompatible, Variable): value = "off" class VariableClInstallAutocheckInterval(FlashUncompatible, Variable): value = "" class VariableClInstallCleanpkgSet(FlashUncompatible, Variable): value = "off" class VariableClInstallOtherSet(FlashUncompatible, Variable): value = "off"