You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/system.py

845 lines
26 KiB

# -*- coding: utf-8 -*-
# Copyright 2008-2015 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
TableVariable, PasswordError,
DataVarsError, VariableInterface)
from calculate.install.fs_manager import FileSystemManager
from calculate.lib.utils.files import (readFile, getProgPath, process,
readLinesFile)
from calculate.lib.utils.common import getPasswdUsers, getUserGroups, getGroups
from calculate.lib.utils.common import getValueFromConfig, getValueFromCmdLine
from calculate.lib.utils.common import getUserPrimaryGroup
from calculate.lib.utils.portage import isPkgInstalled
from calculate.lib.utils.device import getUdevDeviceInfo
from crypt import crypt
from calculate.lib.encrypt import encrypt
import calculate.lib.cl_ini_parser as cl_ini_parser
from calculate.lib.cl_lang import setLocalTranslate, _
setLocalTranslate('cl_install3', sys.modules[__name__])
class UserHelper(VariableInterface):
"""
Locale variables not using for flash installation
"""
xorg_need = False
def uncompatible(self):
"""
User setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return _("User configuration unavailable for Flash install")
if self.Get('os_install_x11_server_set') == 'off' and self.xorg_need:
return _("Autologin is available for Xorg sessions only")
return ""
class VariableOsInstallScratch(ReadonlyVariable):
"""
Install system in scratch mode
"""
type = "bool"
opt = ['--build']
def get(self):
# for installation default - normal system
if self.Get('cl_action') == 'system':
return "off"
else:
return self.Get('os_scratch')
class VariableOsFormatType(ReadonlyVariable):
"""
Filesystem format support by calcualte-install
"""
type = "list"
def get(self):
"""Filesystem format support by calcualte-install"""
return FileSystemManager.supportFS.keys()
class VariableOsFormatUse(ReadonlyVariable):
"""
Avialable format by mkfs utility
"""
type = "list"
# (on or off) autoupdate config from install program
cl_autoupdate_set = {
'type': "bool",
'value': "off"}
def checkFunc(self, fs):
if "format" in FileSystemManager.supportFS[fs] and \
path.exists(FileSystemManager.supportFS[fs]["format"]):
return "yes"
return "no"
def get(self):
return map(self.checkFunc, self.Get('os_format_type'))
class VariableClMigrateRootPwd(UserHelper, Variable):
"""
Root password
"""
type = "password"
opt = ["--root-password"]
metavalue = 'PASSWORD'
untrusted = True
def init(self):
self.help = _("specify the root password")
self.label = _("Root password")
def get(self):
rootPasswd = map(lambda x: x[1],
filter("root".__eq__,
map(lambda x: x.split(':')[0:2],
readLinesFile('/etc/shadow'))))
if rootPasswd:
rootPasswd = rootPasswd[0]
else:
rootPasswd = ""
# if root password is "root"
if rootPasswd:
salt = "".join(rootPasswd.rpartition("$")[:1])
if salt and crypt("root", salt) == rootPasswd:
rootPasswd = ""
return rootPasswd or ""
def set(self, value):
"""
Encrypt password
"""
reCheck = re.compile("^\$[^$]+\$[^$]+\$.*$")
encryptObj = encrypt()
if reCheck.match(value) or not value:
return value
else:
return encryptObj.getHashPasswd(value, "shadow_ssha256")
def check(self, value):
if not value:
raise PasswordError(_("Password for user %s missing") % "root")
class VariableClInstallHomeCryptSet(UserHelper, Variable):
type = 'bool'
opt = ["--crypt-home", "-C"]
untrusted = True
def init(self):
self.help = _("encrypt user profiles")
self.label = _("Encrypt user profiles")
def get(self):
return ("off" if self.Get('cl_autologin')
else self.Get('cl_home_crypt_set'))
def check(self, value):
if value == "on" and self.Get('cl_autologin'):
raise VariableError(
_("User profile encryption is uncompatible with autologin"))
class VariableClMigrateData(UserHelper, TableVariable):
"""
User migrate data table
"""
type = 'table'
opt = ["--user", "-u"]
metavalue = 'USER[:GROUPS]'
source = ['cl_migrate_user', 'cl_migrate_user_groups',
'cl_migrate_user_pwd']
untrusted = True
def init(self):
self.help = _("add a user to the installed system")
self.label = _("Migrating users")
class VariableClMigrateDataBrief(UserHelper, TableVariable):
"""
User migrate data table for brief view
"""
source = ['cl_migrate_user', 'cl_migrate_user_groups']
def init(self):
self.label = _("Migrating users")
class VariableClMigrateUser(UserHelper, Variable):
"""
Migrate users list
"""
type = 'list'
def init(self):
self.label = _("Users")
def get(self):
"""
Migrating users (users above 1000 uid)
"""
return filter("root".__ne__, getPasswdUsers())
class VariableClMigrateUserGroups(UserHelper, Variable):
"""
Migrate users groups
"""
type = 'choice-list-list'
defaultGroupList = ["users", "wheel", "audio", "cdrom", "video",
"cdrw", "usb", "plugdev", "games", "lp", "scanner",
"uucp"]
def getDefaultGroups(self):
return list(set(self.defaultGroupList) & set(getGroups()))
def init(self):
self.label = _("Groups")
def set(self, value):
value = map(lambda x: x \
if x and any(x) else self.getDefaultGroups(),
value)
return value
def getPrimaryGroup(self, username):
pg = getUserPrimaryGroup(username)
if pg:
return [pg]
return []
def get(self):
"""
User groups
"""
passwdList = getPasswdUsers()
return map(lambda x: (self.getPrimaryGroup(x) + (getUserGroups(x)
if x in passwdList else self.getDefaultGroups())),
self.Get('cl_migrate_user'))
def choice(self):
"""
Available groups
"""
return getGroups()
class VariableClMigrateUserPwd(UserHelper, Variable):
"""
Migrate users who need to change passwords
"""
type = 'password-list'
def init(self):
self.label = _("Password")
def get(self):
"""
Migrating users passwords
"""
retList = []
fileName = "/etc/shadow"
if os.access(fileName, os.R_OK):
migrateusers = self.Get("cl_migrate_user")
if migrateusers:
lenData = 9
shadowData = filter(lambda x: len(x) == lenData,
map(lambda x: x.rstrip().split(":"),
open(fileName)))
shadowData = filter(lambda x: x[0] in migrateusers, shadowData)
shadowData = map(lambda x: (x[0], x[1]), shadowData)
shadowUsers = map(lambda x: x[0], shadowData)
for userName in migrateusers:
if userName in shadowUsers:
userData = filter(lambda x: x[0] == userName,
shadowData)
hashPwd = userData[0][1]
retList.append(hashPwd)
else:
retList.append("")
return retList
def check(self, value):
"""
Check exists password for all migrate users
"""
for user, pwd in zip(self.Get('cl_migrate_user'), value):
if not pwd:
raise PasswordError(
_("Password for user %s missing") % user)
def set(self, value):
"""
Encrypt passwords
"""
reCheck = re.compile("^\$[^$]+\$[^$]+\$.*$")
encryptObj = encrypt()
return map(lambda x: x if reCheck.match(x) or not x else \
encryptObj.getHashPasswd(x, "shadow_ssha256"),
value)
class VariableClAutologin(UserHelper, Variable):
"""
Autologin variable (contains user name for autologin) or
empty string if disable
"""
type = 'choiceedit'
opt = ["--autologin", '-A']
metavalue = "USER"
xorg_need = True
def init(self):
self.label = _("Autologin")
self.help = _("add an autologin user to the installed system")
def get(self):
# autologin enable for livecd and all install type CMC
cmdDomainSet = (getValueFromCmdLine("calculate", "domain_pw") or
getValueFromCmdLine("calculate", "domain") or "")
if (not cmdDomainSet and
self.Get('os_install_root_type') == "livecd") or \
self.Get('os_install_linux_shortname') == "CMC":
nonRootUsers = filter(lambda x: x != "root",
self.Get('cl_migrate_user'))
if nonRootUsers:
return nonRootUsers[0]
else:
return ""
return ""
def set(self, value):
return {'none': ''}.get(value, value)
def choice(self):
return [""] + filter(lambda x: x != "root", self.Get('cl_migrate_user'))
def check(self, value):
"""
Autologin only for migrated non-root users
"""
if value and not value in self.Get('cl_migrate_user'):
raise VariableError(_("User %s does not exist") % value)
if value == "root":
raise VariableError(
_("Autologin is unavailable for user %s") % value)
def humanReadable(self):
return self.Get() or _("Not used")
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
try:
if (self.Get('cl_action') == 'merge' and
self.Get('client.cl_remote_host')):
return \
_("The autologin is not available with domain workstations")
except DataVarsError:
pass
return UserHelper.uncompatible(self)
class VariableClInstallAutoupdateSet(Variable):
"""
(on or off) autoupdate config from install program for install
"""
type = "bool"
value = "off"
class VariableOsInstallMakeopts(Variable):
"""
Make.conf makeopts
"""
def get(self):
cpunum = self.Get('hr_cpu_num')
if cpunum == "1":
return "-j1"
else:
return "-j%d" % (int(cpunum) + 1)
class VariableOsGrubConf(ReadonlyVariable):
"""
DEPRICATED content of current grub.conf
"""
class VariableOsInstallGrubDevicemapConf(ReadonlyVariable):
"""
DEPRICATED content of device.map file for grub
"""
os_install_grub_devicemap_conf = {}
class VariableClDistfilesPath(Variable):
"""
DISTFILES path
"""
value = '/var/calculate/remote/distfiles'
class VariableClPkgdirPath(Variable):
"""
PKGDIR path
"""
def get(self):
return "/var/calculate/remote/packages/%s/%s" % (
self.Get('os_install_linux_shortname'),
self.Get('os_install_arch_machine'))
class VariableClInstallDevFrom(Variable):
"""
Root device of previous installed os
"""
def set(self, value):
"""
If device in calculate3.env dev_from not exists set ''
"""
if value:
value = getUdevDeviceInfo(name=value).get('DEVNAME', value)
if value in self.Get('os_disk_dev'):
return value
else:
return ""
def get(self):
if (self.Get('cl_autopartition_set') == 'on' and
"root" in self.Get('cl_autopartition_scheme')):
return self.Select('cl_autopartition_disk_dev',
where='cl_autopartition_disk_scheme', eq='root',
limit=1)
return ""
class VariableOsNvidiaMask(ReadonlyVariable):
"""
Get nvidia card mask versions
"""
def get(self):
image = self.Get('cl_image')
try:
if image:
image = image.convertToDirectory()
chrootPath = image.getDirectory()
else:
chrootPath = self.Get("cl_chroot_path")
nvidiaeclass = path.join(chrootPath,
'usr/portage/eclass/nvidia-driver.eclass')
if not os.access(nvidiaeclass, os.R_OK):
return ""
category = "0300"
vendor = "10de:"
lsPciProg = getProgPath("/usr/sbin/lspci")
nvidiacards = filter(lambda x: " %s: " % category in x,
process(lsPciProg, "-d", vendor, "-n"))
if not nvidiacards:
return ""
cardsid = \
map(lambda x: x.groups()[0],
filter(lambda x: x,
map(lambda x: re.search(
"[0-9a-fA-F]{4}:([0-9a-fA-F]{4})", x),
nvidiacards)))
if not cardsid:
return ""
eclassdata = readFile(nvidiaeclass)
drv_categories = re.findall('^drv_([^=]+)="', eclassdata, re.M)
drvs = map(lambda x: (x[0], x[1].replace('\\\n', '').split()),
re.findall(
'\ndrv_(%s)="(.*?)"' % "|".join(drv_categories),
eclassdata, re.S))
mask_categories = re.findall('^mask_([^=]+)="', eclassdata, re.M)
masks = dict(map(lambda x: (x[0], x[1].replace('\\\n', '')),
re.findall('\nmask_(%s)="(.*?)"' % "|".join(
drv_categories),
eclassdata, re.S)))
drvsForCardsid = filter(lambda x: set(x[1]) & set(cardsid), drvs)
if drvsForCardsid and drvsForCardsid[0][0] in masks:
return masks[drvsForCardsid[0][0]]
finally:
if image:
image.close()
return ""
class VariableOsInstallLvmSet(ReadonlyVariable):
"""
Using lvm
"""
type = "bool"
def get(self):
for typeDisk in self.Get('os_install_disk_type'):
if "lvm" in typeDisk.lower():
return "on"
else:
return "off"
class VariableOsInstallMdadmSet(ReadonlyVariable):
"""
Using mdadm
"""
type = "bool"
def get(self):
for typeDisk in self.Get('os_install_disk_type'):
if "raid" in typeDisk.lower():
return "on"
else:
return "off"
class VariableClChrootGrub(ReadonlyVariable):
"""
Chroot for grub-mkconfig
"""
def get(self):
if self.Get('os_install_scratch') == "on":
if self.Get('cl_action') == 'system':
return self.Get('cl_target').mdirectory
else:
return path.join(self.Get('cl_chroot_path'), "mnt/scratch")
else:
return self.Get('cl_chroot_path')
class VariableOsGrub2Path(Variable):
"""
Get Grub2 Install cmd (grub-install or grub2-install)
"""
def get(self):
# find grub2-install
grubInstall = getProgPath('/usr/sbin/grub2-install')
if grubInstall:
return grubInstall
# find grub-install and check, that this is grub2-install (ver 1.99)
grubInstall = getProgPath('/usr/sbin/grub-install')
if grubInstall and filter(lambda x: "1.99" in x or "2." in x,
process(grubInstall, '--version')):
return grubInstall
return ""
class VariableClSetup(Variable):
"""
Type of setup
"""
type = "choice"
value = ""
def choice(self):
return ["audio", "network", "locale", "video", "boot", "users",
"session", ""]
def humanReadable(self):
mapType = {'network': _("network settings"),
'locale': _("localization and time options"),
'video': _("video settings"),
'boot': _("boot parameters"),
'audio': _("audio parameters"),
'session': _("session settings"),
'users': _("user settings")}
return mapType.get(self.Get(), "")
def check(self, value):
if value == "boot" and self.Get('os_install_root_type') == 'livecd':
raise VariableError(
_("Boot configuration is not available on a LiveCD"))
class VariableClLive(Variable):
"""
Apply live templates
"""
value = "off"
type = "bool"
opt = ['--live']
def init(self):
self.label = _("Configure dynamic options only")
self.help = _("configure dynamic options only")
class VariableOsInstallPxe(Variable):
"""
Installation for PXE loading
"""
type = "boot"
value = "off"
untrusted = True
def check(self, value):
if value == "on":
if self.Get('os_linux_system') != "server":
raise VariableError(
_("PXE install is available for Calculate "
"Directory Server only") + '.')
for pkg in ['net-misc/dhcp', 'net-ftp/tftp-hpa',
'net-fs/nfs-utils']:
if not isPkgInstalled(pkg):
raise VariableError(
_("For PXE install, you need to install package %s")
% pkg)
try:
config = cl_ini_parser.iniParser('/etc/calculate/calculate.env')
val = config.getVar('server', 'sr_dhcp_set')
if val.encode('utf-8') == "on":
return
except Exception:
pass
raise VariableError(
_("PXE install is only available if the DHCP "
"service has been configured first"))
class VariableOsInstallPxePath(Variable):
"""
Path to PXE installation
"""
value = "/var/calculate/pxe"
opt = ['--pxe-path']
def init(self):
self.label = _("Installation path")
self.help = _("path for PXE install")
class VariableOsInstallUefiSet(Variable):
"""
Install in UEFI
"""
type = "bool"
opt = ['--uefi']
def init(self):
self.label = _("UEFI boot")
self.help = _("use UEFI boot")
def get(self):
if self.Get('cl_autopartition_set') == 'on':
return self.Get('cl_autopartition_uefi_set')
else:
if self.Get('os_install_disk_efi') or \
"/boot/efi" in self.Get('os_location_dest'):
if self.Get('os_install_arch_machine') == 'x86_64' and \
self.Get('os_install_root_type') != 'flash':
return self.Get('os_uefi_set')
return 'off'
def check(self, value):
if value == 'on':
if self.Get('os_uefi_set') == 'off' and \
self.Get('os_install_root_type') == 'hdd':
raise VariableError(
_("Your system must be loaded in UEFI for using this "
"bootloader"))
if not 'gpt' in self.Get('os_device_table'):
raise VariableError(
_("GPT is needed for using the UEFI bootloader"))
if not (self.Get('os_install_disk_efi') or
"/boot/efi" in self.Get('os_location_dest')):
raise VariableError(
_("A EF00 partition is needed for using "
"the UEFI bootloader"))
if self.Get('os_install_arch_machine') != 'x86_64':
raise VariableError(
_("Architecture of the target system must be x86_64"))
if self.Get('os_install_root_type') == 'flash':
raise VariableError(
_("This option not used for Flash install"))
def uncompatible(self):
"""
Uncompatible with autopartition
"""
if self.Get('cl_autopartition_set') == "on":
return \
_("The layout is not available with autopartitioning")
if self.Get('os_install_root_type') == 'flash':
return \
_("This option not used for Flash install")
return ""
class VariableOsInstallUefiBriefSet(VariableOsInstallUefiSet):
def uncompatible(self):
if self.Get('os_install_root_type') == 'flash':
return _("This option not used for Flash install")
return ""
def get(self):
return self.Get('os_install_uefi_set')
class VariableOsInstallGrubTerminal(Variable):
"""
Gfxmode
"""
type = "choice"
opt = ['--grub-terminal']
metavalue = "TERMINAL"
def init(self):
self.label = _("Grub terminal")
self.help = _("grub terminal")
def get(self):
cmdLine = '/proc/cmdline'
if 'grub_nogfxmode' in readFile(cmdLine):
return 'console'
grubDefault = path.join(self.Get('cl_chroot_path'),
'etc/default/grub')
if getValueFromConfig(grubDefault, 'GRUB_TERMINAL') == 'console':
return 'console'
grubCfg = '/boot/grub/grub.cfg'
if re.search('^terminal_output\s*console', readFile(grubCfg), re.M):
return 'console'
return 'gfxterm'
def choice(self):
return ['gfxterm', 'console']
def uncompatible(self):
"""
Grub setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return _("Grub configuration unavailable for Flash install")
return ""
class PackageCheckVariable(ReadonlyVariable):
"""
Конструктор для переменных проверки установлен ли пакет
"""
image = False
package = None
type = "bool"
image_variable = "cl_image"
prefix_variable = "cl_chroot_path"
def get(self):
try:
if self.image:
image = self.Get(self.image_variable)
if image:
with image as distr:
distrPath = image.getDirectory()
if isPkgInstalled(self.package, prefix=distrPath):
return "on"
else:
prefix = self.Get(self.prefix_variable)
if isPkgInstalled(self.package, prefix=prefix):
return "on"
except Exception:
pass
return "off"
class VariableOsInstallAlsaSet(PackageCheckVariable):
"""
Установлен ли media-sound/alsa-utils
"""
image = True
package = "media-sound/alsa-utils"
class VariableOsInstallX11ServerSet(PackageCheckVariable):
"""
Установлен ли x11-base/xorg-server
"""
image = True
package = "x11-base/xorg-server"
class FlashUncompatible(VariableInterface):
def uncompatible(self):
"""
Update setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return \
_("Update configuration unavailable for Flash install")
return ""
try:
import calculate.update.variables.update as update
class VariableClInstallAutocheckSet(FlashUncompatible,
update.VariableClUpdateAutocheckSet):
def get(self):
return self.Get('update.cl_update_autocheck_set')
class VariableClInstallAutocheckInterval(FlashUncompatible,
update.VariableClUpdateAutocheckInterval):
def get(self):
return self.Get('update.cl_update_autocheck_interval')
class VariableClInstallCleanpkgSet(FlashUncompatible,
update.VariableClUpdateCleanpkgSet):
def get(self):
return self.Get('update.cl_update_cleanpkg_set')
class VariableClInstallOtherSet(FlashUncompatible,
update.VariableClUpdateOtherSet):
def get(self):
return self.Get('update.cl_update_other_set')
except ImportError:
update = None
class VariableClInstallAutocheckSet(FlashUncompatible, Variable):
value = "off"
class VariableClInstallAutocheckInterval(FlashUncompatible, Variable):
value = ""
class VariableClInstallCleanpkgSet(FlashUncompatible, Variable):
value = "off"
class VariableClInstallOtherSet(FlashUncompatible, Variable):
value = "off"