# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import re from os import path from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable, TableVariable, PasswordError, DataVarsError, VariableInterface) from calculate.install.fs_manager import FileSystemManager from calculate.lib.utils.files import (readFile, getProgPath, process, readLinesFile) from calculate.lib.utils.common import getPasswdUsers, getUserGroups, getGroups, \ CmdlineParams from calculate.lib.utils.common import getValueFromConfig, getValueFromCmdLine from calculate.lib.utils.common import getUserPrimaryGroup from calculate.lib.utils.portage import isPkgInstalled from calculate.lib.utils.device import getUdevDeviceInfo from calculate.lib.encrypt import sha256_crypt from calculate.core.server.admin import Admins from calculate.lib.variables.system import RootType from autopartition import SchemeOpt, SchemeDevices from calculate.lib.encrypt import get_shadow_hash, get_grub_hash from calculate.core.server.admin import Admins import calculate.lib.cl_ini_parser as cl_ini_parser from calculate.lib.cl_lang import setLocalTranslate, _ setLocalTranslate('cl_install3', sys.modules[__name__]) class UserHelper(VariableInterface): """ Locale variables not using for flash installation """ xorg_need = False stub_hash_value = "{SHA256}" def uncompatible(self): """ User setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return _("User configuration unavailable for Flash install") if self.Get('os_install_x11_server_set') == 'off' and self.xorg_need: return _("Autologin is available for Xorg sessions only") return "" class GrubHelper(VariableInterface): grub_passwd_file = "/etc/grub.d/07_passwd" def read_hash_from_passwd(self): """ Получить пароль из конфигурационного файла grub """ data = readFile(self.grub_passwd_file) reRootPwd = re.compile("password_pbkdf2 root (\S+)") pwd = reRootPwd.search(data) if pwd: return pwd.group(1) return "" class VariableOsInstallScratch(ReadonlyVariable): """ Install system in scratch mode """ type = "bool" opt = ['--build'] def get(self): # for installation default - normal system if self.Get('cl_action') == 'system': return "off" else: return self.Get('os_scratch') class VariableOsFormatType(ReadonlyVariable): """ Filesystem format support by calcualte-install """ type = "list" def get(self): """Filesystem format support by calcualte-install""" return FileSystemManager.supportFS.keys() class VariableOsFormatUse(ReadonlyVariable): """ Avialable format by mkfs utility """ type = "list" # (on or off) autoupdate config from install program cl_autoupdate_set = { 'type': "bool", 'value': "off"} def checkFunc(self, fs): if "format" in FileSystemManager.supportFS[fs] and \ path.exists(FileSystemManager.supportFS[fs]["format"]): return "yes" return "no" def get(self): return map(self.checkFunc, self.Get('os_format_type')) class VariableClMigrateRootPwdPlain(GrubHelper, UserHelper, Variable): """ Root password """ type = "password" opt = ["--root-password"] metavalue = 'PASSWORD' untrusted = True check_after = ["cl_grub"] def get(self): shadow_pwd = self.Get('cl_migrate_root_shadow_pwd') if shadow_pwd: return self.stub_hash_value return "" def init(self): self.help = _("specify the root password") self.label = _("Root password") def check(self, value): if not value and not self.Get('cl_migrate_root_shadow_pwd'): raise PasswordError(_("Password for user %s missing") % "root") # если plain пароля нет (есть только хэш), но требуется установить # пароль на grub (cl_grub_passwd_set), при этом нет 07_passwd if (value == self.stub_hash_value and not self.read_hash_from_passwd() and self.GetBool('cl_grub_passwd_set')): raise PasswordError(_("Please specify root password " "for the grub menu")) class VariableClMigrateRootPwd(ReadonlyVariable): """ Хэш root пароля """ def get(self): value = self.Get('cl_migrate_root_pwd_plain') if value and value != UserHelper.stub_hash_value: return get_shadow_hash().hash(value) return self.Get('cl_migrate_root_shadow_pwd') class VariableClMigrateRootShadowPwd(ReadonlyVariable): """ Хэш root пароля из файла /etc/shadow. Если пароль root, то содержит пустую строку """ def get(self): rootPasswd = map(lambda x: x[1], filter("root".__eq__, map(lambda x: x.split(':')[0:2], readLinesFile('/etc/shadow')))) if rootPasswd: rootPasswd = rootPasswd[0] else: rootPasswd = "" # if root password is "root" enc = get_shadow_hash() if rootPasswd: if enc.identify(rootPasswd): if enc.verify("root", rootPasswd): rootPasswd = "" else: rootPasswd = "" return rootPasswd or "" class VariableClGrubPasswdSet(GrubHelper, Variable): """ Использовать при установке системы пароль root как пароль для grub """ type = 'bool' opt = ["--grub-passwd"] def init(self): self.help = _("use root password for editing grub menu") self.label = _("Use root password for editing grub menu") def get(self): if (self.read_hash_from_passwd() or self.Get('os_root_type_ext') in RootType.Live): return Variable.On else: return Variable.Off class VariableClGrubPwd(GrubHelper, Variable): """ Хэш пароля на grub """ opt = ["--passwd"] type = "password" def init(self): self.help = _("set grub password") self.label = _("Grub password") def get(self): system_action = self.Get('cl_action') == "system" if self.GetBool('cl_grub_remove_pwd_set'): return "" use_grub_passwd = self.GetBool('cl_grub_passwd_set') passwd_hash = self.read_hash_from_passwd() if passwd_hash: if not system_action or use_grub_passwd: return passwd_hash if use_grub_passwd: value = self.Get('cl_migrate_root_pwd_plain') if value and value != UserHelper.stub_hash_value: enc = get_grub_hash() return enc.hash(value) return "" def set(self, value): """ Поддержка использвания как шифрованного хэша так и plain """ enc = get_grub_hash() # используется hash if enc.identify(value): return value # отключение if value in ("", "none"): return "" # используется plain return enc.hash(value) def check(self, value): if value and self.GetBool('cl_grub_remove_pwd_set'): raise VariableError( _("Setting the password is uncompatible with removing the password")) class VariableClGrubRemovePwdSet(Variable): """ Удалить пароль из grub """ type = "bool" guitype = "hidden" value = Variable.Off opt = ["--remove-passwd"] def init(self): self.help = _("remove the password for editing the grub menu") self.label = _("Remove the grub menu password") class VariableClInstallHomeCryptSet(UserHelper, Variable): type = 'bool' opt = ["--crypt-home", "-C"] untrusted = True def init(self): self.help = _("encrypt user profiles") self.label = _("Encrypt user profiles") def get(self): return ("off" if self.Get('cl_autologin') else self.Get('cl_home_crypt_set')) def check(self, value): if value == "on" and self.Get('cl_autologin'): raise VariableError( _("User profile encryption is uncompatible with autologin")) class VariableClMigrateData(UserHelper, TableVariable): """ User migrate data table """ type = 'table' opt = ["--user", "-u"] metavalue = 'USER[:ADMIN[:GROUPS]]' source = ['cl_migrate_user', 'cl_migrate_admin', 'cl_migrate_user_groups', 'cl_migrate_user_pwd'] untrusted = True def init(self): self.help = _("add a user to the installed system. USER is username. " "ADMIN is administrator rights (none, all, update, " "none by default). " "GROUPS is list user supplimentary groups " "(comma delimeter). " "Use 'none' value to discard user migration") self.label = _("Migrating users") def set(self, value): if len(value) == 1: if len(value[0]) > 1 and value[0][0] == 'none': return [[]] return value class VariableClMigrateDataBrief(UserHelper, TableVariable): """ User migrate data table for brief view """ source = ['cl_migrate_user', 'cl_migrate_admin', 'cl_migrate_user_groups'] def init(self): self.label = _("Migrating users") class VariableClMigrateUser(UserHelper, Variable): """ Migrate users list """ type = 'list' def init(self): self.label = _("User") def get(self): """ Migrating users (users above 1000 uid) """ return filter("root".__ne__, getPasswdUsers()) def check(self, value): """ Проверка на корректное имя пользователя """ if any(not x for x in value): raise VariableError(_("Username is missing")) class VariableClMigrateAdmin(UserHelper, Variable): """ Migrate users list """ type = 'choice-list' default_value = "none" def init(self): self.label = _("Administrator") def choice(self): return [ ("none", ""), ("all", _("Full access")), ("update", _("System update")), ] def get(self): """ Migrating users (users above 1000 uid) """ admins = Admins(self.parent) return [admins[x] or self.default_value for x in self.Get('cl_migrate_user')] def set(self, value): name_map = {'update':'system_update'} value = map(lambda x:name_map.get(x,x), value) return map(lambda x: x if x else self.default_value, value) class VariableClMigrateUserGroups(UserHelper, Variable): """ Migrate users groups """ type = 'choice-list-list' defaultGroupList = ["users", "wheel", "audio", "cdrom", "video", "cdrw", "usb", "plugdev", "games", "lp", "scanner", "uucp"] def getDefaultGroups(self): return list(set(self.defaultGroupList) & set(getGroups())) def init(self): self.label = _("Groups") def set(self, value): value = map(lambda x: (filter(None, x) if x and any(x) else self.getDefaultGroups()), value) return value def getPrimaryGroup(self, username): pg = getUserPrimaryGroup(username) if pg: return [pg] return [] def get(self): """ User groups """ passwdList = getPasswdUsers() return map(lambda x: (self.getPrimaryGroup(x) + (getUserGroups(x) if x in passwdList else self.getDefaultGroups())), self.Get('cl_migrate_user')) def choice(self): """ Available groups """ return [("", _("Default"))] + [(x, x) for x in getGroups()] class VariableClMigrateUserPwd(UserHelper, Variable): """ Migrate users who need to change passwords """ type = 'password-list' def init(self): self.label = _("Password") def get(self): """ Migrating users passwords """ retList = [] fileName = "/etc/shadow" if os.access(fileName, os.R_OK): migrateusers = self.Get("cl_migrate_user") if migrateusers: lenData = 9 shadowData = filter(lambda x: len(x) == lenData, map(lambda x: x.rstrip().split(":"), open(fileName))) shadowData = filter(lambda x: x[0] in migrateusers, shadowData) shadowData = map(lambda x: (x[0], x[1]), shadowData) shadowUsers = map(lambda x: x[0], shadowData) for userName in migrateusers: if userName in shadowUsers: userData = filter(lambda x: x[0] == userName, shadowData) hashPwd = userData[0][1] if (sha256_crypt.identify(hashPwd) and sha256_crypt.verify("guest", hashPwd)): retList.append("") else: retList.append(hashPwd) else: retList.append("") return retList def check(self, value): """ Check exists password for all migrate users """ for user, pwd in zip(self.Get('cl_migrate_user'), value): if not pwd: raise PasswordError( _("Password for user %s missing") % user) def set(self, value): """ Encrypt passwords """ shadow_hash = get_shadow_hash() return map(lambda x: x if shadow_hash.identify(x) or not x else \ shadow_hash.hash(x), value) class VariableClAutologin(UserHelper, Variable): """ Autologin variable (contains user name for autologin) or empty string if disable """ type = 'choiceedit' opt = ["--autologin", '-A'] metavalue = "USER" xorg_need = True def init(self): self.label = _("Autologin") self.help = _("set an autologin for the user, 'off' for disable") def get(self): # autologin enable for livecd and all install type CMC cmdDomainSet = (getValueFromCmdLine( "calculate", CmdlineParams.DomainPassword) or getValueFromCmdLine( "calculate", CmdlineParams.Domain) or "") if (not cmdDomainSet and self.Get('os_install_root_type') == "livecd") or \ self.Get('os_install_linux_shortname') == "CMC": nonRootUsers = filter(lambda x: x != "root", self.Get('cl_migrate_user')) if nonRootUsers: return nonRootUsers[0] else: return "" return "" def set(self, value): return {'none': '', 'off': ''}.get(value, value) def choice(self): yield ("off", _("No autologin")) for user in (x for x in self.Get('cl_migrate_user') if x != "root"): yield (user,user) def check(self, value): """ Autologin only for migrated non-root users """ if value and not value in self.Get('cl_migrate_user'): raise VariableError(_("User %s does not exist") % value) if value == "root": raise VariableError( _("Autologin is unavailable for user %s") % value) def humanReadable(self): return self.Get() or _("Not used") def uncompatible(self): """ Network setting up unavailable for flash installation """ try: if (self.Get('cl_action') == 'merge' and self.Get('client.cl_remote_host')): return \ _("The autologin is not available with domain workstations") except DataVarsError: pass return UserHelper.uncompatible(self) class VariableClInstallAutoupdateSet(Variable): """ (on or off) autoupdate config from install program for install """ type = "bool" value = "off" class VariableOsInstallMakeopts(Variable): """ Make.conf makeopts """ def get(self): cpunum = self.Get('hr_cpu_num') if cpunum == "1": return "-j1" else: return "-j%d" % (int(cpunum) + 1) class VariableOsGrubConf(ReadonlyVariable): """ DEPRICATED content of current grub.conf """ class VariableOsInstallGrubDevicemapConf(ReadonlyVariable): """ DEPRICATED content of device.map file for grub """ os_install_grub_devicemap_conf = {} class VariableClDistfilesPath(Variable): """ DISTFILES path """ value = '/var/calculate/remote/distfiles' class VariableClPkgdirPath(Variable): """ PKGDIR path """ def get(self): return "/var/calculate/remote/packages/%s/%s" % ( self.Get('os_install_linux_shortname'), self.Get('os_install_arch_machine')) class VariableClInstallDevFrom(Variable): """ Root device of previous installed os """ def set(self, value): """ If device in calculate3.env dev_from not exists set '' """ if value: value = getUdevDeviceInfo(name=value).get('DEVNAME', value) if value in self.Get('os_disk_dev'): return value else: return "" def get(self): if (self.Get('cl_autopartition_set') == 'on' and SchemeOpt.Update in self.Get('cl_autopartition_scheme')): return self.Select('cl_autopartition_disk_dev', where='cl_autopartition_disk_scheme', eq=SchemeDevices.Root2, limit=1) return "" class VariableOsNvidiaMask(ReadonlyVariable): """ Get nvidia card mask versions """ def get(self): image = self.Get('cl_image') try: if image: image = image.convertToDirectory() chrootPath = image.getDirectory() else: chrootPath = self.Get("cl_chroot_path") nvidiaeclass = path.join(chrootPath, 'usr/portage/eclass/nvidia-driver.eclass') if not os.access(nvidiaeclass, os.R_OK): return "" category = "0300" vendor = "10de:" lsPciProg = getProgPath("/usr/sbin/lspci") nvidiacards = filter(lambda x: " %s: " % category in x, process(lsPciProg, "-d", vendor, "-n")) if not nvidiacards: return "" cardsid = \ map(lambda x: x.groups()[0], filter(lambda x: x, map(lambda x: re.search( "[0-9a-fA-F]{4}:([0-9a-fA-F]{4})", x), nvidiacards))) if not cardsid: return "" eclassdata = readFile(nvidiaeclass) drv_categories = re.findall('^drv_([^=]+)="', eclassdata, re.M) drvs = map(lambda x: (x[0], x[1].replace('\\\n', '').split()), re.findall( '\ndrv_(%s)="(.*?)"' % "|".join(drv_categories), eclassdata, re.S)) mask_categories = re.findall('^mask_([^=]+)="', eclassdata, re.M) masks = dict(map(lambda x: (x[0], x[1].replace('\\\n', '')), re.findall('\nmask_(%s)="(.*?)"' % "|".join( drv_categories), eclassdata, re.S))) drvsForCardsid = filter(lambda x: set(x[1]) & set(cardsid), drvs) if drvsForCardsid and drvsForCardsid[0][0] in masks: return masks[drvsForCardsid[0][0]] finally: if image: image.close() return "" class VariableOsInstallLvmSet(ReadonlyVariable): """ Using lvm """ type = "bool" def get(self): for typeDisk in self.Get('os_install_disk_type'): if "lvm" in typeDisk.lower(): return "on" else: return "off" class VariableOsInstallMdadmSet(ReadonlyVariable): """ Using mdadm """ type = "bool" def get(self): for typeDisk in self.Get('os_install_disk_type'): if "raid" in typeDisk.lower(): return "on" else: return "off" class VariableClChrootGrub(ReadonlyVariable): """ Chroot for grub-mkconfig """ def get(self): if self.Get('os_install_scratch') == "on": if self.Get('cl_action') == 'system': return self.Get('cl_target').mdirectory else: return path.join(self.Get('cl_chroot_path'), "mnt/scratch") else: return self.Get('cl_chroot_path') class VariableOsGrub2Path(Variable): """ Get Grub2 Install cmd (grub-install or grub2-install) """ def get(self): # find grub2-install grubInstall = getProgPath('/usr/sbin/grub2-install') if grubInstall: return grubInstall # find grub-install and check, that this is grub2-install (ver 1.99) grubInstall = getProgPath('/usr/sbin/grub-install') if grubInstall and filter(lambda x: "1.99" in x or "2." in x, process(grubInstall, '--version')): return grubInstall return "" class VariableClSetup(Variable): """ Type of setup """ type = "choice" value = "" def choice(self): return ["audio", "network", "locale", "video", "boot", "users", "session", "themes", ""] def humanReadable(self): mapType = {'network': _("network settings"), 'locale': _("localization and time options"), 'video': _("video settings"), 'boot': _("boot parameters"), 'audio': _("audio parameters"), 'themes': _("update themes"), 'session': _("session settings"), 'users': _("user settings")} return mapType.get(self.Get(), "") def check(self, value): if value == "boot" and self.Get('os_install_root_type') == 'livecd': raise VariableError( _("Boot configuration is not available on a LiveCD")) class VariableClLive(Variable): """ Apply live templates """ value = "off" type = "bool" opt = ['--live'] def init(self): self.label = _("Configure dynamic options only") self.help = _("configure dynamic options only") class VariableOsInstallPxe(Variable): """ Installation for PXE loading """ type = "boot" value = "off" untrusted = True def check(self, value): if value == "on": if self.Get('os_linux_system') != "server": raise VariableError( _("PXE install is available for Calculate " "Directory Server only") + '.') for pkg in ['net-misc/dhcp', 'net-ftp/tftp-hpa', 'net-fs/nfs-utils']: if not isPkgInstalled(pkg): raise VariableError( _("For PXE install, you need to install package %s") % pkg) for env_fn in ('/etc/calculate/calculate.env', '/var/lib/calculate/calculate.env'): try: config = cl_ini_parser.iniParser(env_fn) val = config.getVar('server', 'sr_dhcp_set') if val.encode('utf-8') == "on": return except Exception: pass raise VariableError( _("PXE install is only available if the DHCP " "service has been configured first")) class VariableOsInstallPxePath(Variable): """ Path to PXE installation """ value = "/var/calculate/pxe" opt = ['--pxe-path'] def init(self): self.label = _("Installation path") self.help = _("path for PXE install") class VariableOsInstallUefiSet(Variable): """ Install in UEFI """ type = "bool" opt = ['--uefi'] def init(self): self.label = _("UEFI boot") self.help = _("use UEFI boot") def get(self): if self.Get('cl_autopartition_set') == 'on': return self.Get('cl_autopartition_uefi_set') else: if self.Get('os_install_disk_efi') or \ "/boot/efi" in self.Get('os_location_dest'): if self.Get('os_install_arch_machine') == 'x86_64' and \ self.Get('os_install_root_type') != 'flash': return self.Get('os_uefi_set') return 'off' def check(self, value): if value == 'on': if self.Get('os_uefi_set') == 'off' and \ self.Get('os_install_root_type') == 'hdd': raise VariableError( _("Your system must be loaded in UEFI for using this " "bootloader")) if not 'gpt' in self.Get('os_device_table'): raise VariableError( _("GPT is needed for using the UEFI bootloader")) if not (self.Get('os_install_disk_efi') or "/boot/efi" in self.Get('os_location_dest')): raise VariableError( _("A EF00 partition is needed for using " "the UEFI bootloader")) if self.Get('os_install_arch_machine') != 'x86_64': raise VariableError( _("Architecture of the target system must be x86_64")) if self.Get('os_install_root_type') == 'flash': raise VariableError( _("This option not used for Flash install")) def uncompatible(self): """ Uncompatible with autopartition """ if self.Get('cl_autopartition_set') == "on": return \ _("The layout is not available with autopartitioning") if self.Get('os_install_root_type') == 'flash': return \ _("This option not used for Flash install") return "" class VariableOsInstallUefiBriefSet(VariableOsInstallUefiSet): def uncompatible(self): if self.Get('os_install_root_type') == 'flash': return _("This option not used for Flash install") return "" def get(self): return self.Get('os_install_uefi_set') class VariableOsInstallGrubTerminal(Variable): """ Gfxmode """ type = "choice" opt = ['--grub-terminal'] metavalue = "TERMINAL" def init(self): self.label = _("Grub terminal") self.help = _("grub terminal") def get(self): cmdLine = '/proc/cmdline' if 'grub_nogfxmode' in readFile(cmdLine): return 'console' grubDefault = path.join(self.Get('cl_chroot_path'), 'etc/default/grub') if getValueFromConfig(grubDefault, 'GRUB_TERMINAL') == 'console': return 'console' grubCfg = '/boot/grub/grub.cfg' if re.search('^terminal_output\s*console', readFile(grubCfg), re.M): return 'console' return 'gfxterm' def choice(self): return ['gfxterm', 'console'] def uncompatible(self): """ Grub setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return _("Grub configuration unavailable for Flash install") return "" class PackageCheckVariable(ReadonlyVariable): """ Конструктор для переменных проверки установлен ли пакет """ image = False package = None type = "bool" image_variable = "cl_image" prefix_variable = "cl_chroot_path" def get(self): try: if self.image: image = self.Get(self.image_variable) if image: with image as distr: distrPath = image.getDirectory() if isPkgInstalled(self.package, prefix=distrPath): return "on" else: prefix = self.Get(self.prefix_variable) if isPkgInstalled(self.package, prefix=prefix): return "on" except Exception: pass return "off" class VariableOsInstallAlsaSet(PackageCheckVariable): """ Установлен ли media-sound/alsa-utils """ image = True package = "media-sound/alsa-utils" class VariableOsInstallX11ServerSet(PackageCheckVariable): """ Установлен ли x11-base/xorg-server """ image = True package = "x11-base/xorg-server" class VariableOsInstallSplashSet(Variable): """ Переменная отображать splash при загрузки """ type = "bool" value = "on" class FlashUncompatible(VariableInterface): def uncompatible(self): """ Update setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return \ _("Update configuration unavailable for Flash install") return "" try: import calculate.update.variables.update as update class VariableClInstallAutocheckSet(FlashUncompatible, update.VariableClUpdateAutocheckSet): def get(self): return self.Get('update.cl_update_autocheck_set') class VariableClInstallAutocheckInterval(FlashUncompatible, update.VariableClUpdateAutocheckInterval): def get(self): return self.Get('update.cl_update_autocheck_interval') class VariableClInstallCleanpkgSet(FlashUncompatible, update.VariableClUpdateCleanpkgSet): def get(self): return self.Get('update.cl_update_cleanpkg_set') class VariableClInstallOtherSet(FlashUncompatible, update.VariableClUpdateOtherSet): def get(self): return self.Get('update.cl_update_other_set') except ImportError: update = None class VariableClInstallAutocheckSet(FlashUncompatible, Variable): value = "off" class VariableClInstallAutocheckInterval(FlashUncompatible, Variable): value = "" class VariableClInstallCleanpkgSet(FlashUncompatible, Variable): value = "off" class VariableClInstallOtherSet(FlashUncompatible, Variable): value = "off"