You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-install/pym/cl_fill_install.py

673 lines
28 KiB

#-*- coding: utf-8 -*-
# Copyright 2010 Mir Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import cl_overriding
from cl_datavars import glob_attr
from os.path import join as pathjoin, exists as pathexists
from os import readlink,listdir,access,R_OK
from cl_utils import isMount
from cl_distr import DistributiveRepository
from cl_fill import clLocale
class fillVars(object, glob_attr):
nonTransferedDirs = ["/","/bin", "/dev", "/etc",
"/lib", "/lib32", "/lib64",
"/opt", "/proc", "/sbin",
"/sys", "/usr", "/var"]
def get_os_net_interfaces_info(self):
"""Информация о существующих сетевых интерфейсах"""
netInterfaces=self.Get("os_net_interfaces")
listInterfacesInfo = []
# Получена ли сеть по DHCP если нет to ip или off
for interfaces in netInterfaces:
fdhcpLeases = "/var/lib/dhcp/dhclient.leases"
if access(fdhcpLeases, R_OK) and\
interfaces in open(fdhcpLeases).read():
listInterfacesInfo.append((interfaces, "DHCP"))
continue
fdhcpInfo = "/var/lib/dhcpcd/dhcpcd-%s.info"%interfaces
fdhcpLease = "/var/lib/dhcpcd/dhcpcd-%s.lease"%interfaces
if pathexists(fdhcpInfo) or pathexists(fdhcpLease):
listInterfacesInfo.append((interfaces, "DHCP"))
# Если интерфейс без DHCP
if not (interfaces, "DHCP") in listInterfacesInfo:
# Находим ip
res = self._runos("/sbin/ifconfig %s"%interfaces)
ip = ""
for j in res:
sIP=re.search('addr:([0-9\.]+).+',j)
if sIP:
ip = sIP.group(1)
listInterfacesInfo.append((interfaces, ip))
break
if not ip:
listInterfacesInfo.append((interfaces, "Off"))
return ", ".join(map(lambda x:"%s (%s)"%(x[0],x[1]),
listInterfacesInfo))
def get_os_device_hash(self):
diskIdPath = '/dev/disk/by-id'
usbdevices = \
map(lambda x: readlink(pathjoin(diskIdPath,x)).rpartition('/')[2],
filter(lambda x: x.startswith('usb-'),listdir(diskIdPath)))
reWrongDevice = re.compile("|".join(['sr','fd','ram','loop']))
devices = filter( lambda x: not reWrongDevice.search(x),
listdir('/sys/block'))
device_hash = {}
for mapnum,device in enumerate(sorted(devices)):
device_hash[device] = {}
device_hash[device]['map'] = mapnum
if device in usbdevices:
removablePath = '/sys/block/%s/removable'%device
if os.access(removablePath,R_OK) and \
open(removablePath,'r').read().strip() == "1":
devtype = "flash"
else:
devtype = "usb-hdd"
else:
devtype = "hdd"
device_hash[device]['type'] = devtype
return device_hash
def get_os_disk_hash(self):
reSdaPart = re.compile("^/dev/sd([a-z])(\d+)$")
devices = self.Get('os_device_hash').keys()
disks = reduce( lambda x,y: x +
map( lambda x: "/dev/%s"%x,
filter(lambda x: y in x,listdir('/sys/block/%s'%y))),
devices, [] )
disk_hash = {}
# fill grub info
for dev in disks:
disk_hash[dev] = {}
match = reSdaPart.match(dev)
if match:
disk_hash[dev]['grub'] = "%d,%d" % \
(ord(match.groups()[0])-ord('a'),
int(match.groups()[1])-1)
curDevice = None
# parse all parted lines started with Disk and started with number
execStr = '/usr/sbin/parted -l'
res = self._runos(execStr,env={"LANG":"C"},ret_list=True)
if res is False:
cl_overriding.printERROR("Cann't execute '%s'"%execStr)
cl_overriding.exit(1)
partedLines = filter(lambda x: x.startswith("Disk") or
x.strip()[:1].isdigit(), res )
for line in partedLines:
# split data
parts = filter(lambda x: x, line.strip().split(' '))
# if start device description
if parts[0] == "Disk":
curDevice = parts[1][:-1]
continue
# if first part is number then it is partition description
if parts[0].isdigit():
# part name it is devicename + partition number
partition = curDevice + parts[0]
# create entry if hash hasn't it
if not partition in disk_hash:
disk_hash[partition] = {}
disk_hash[partition]['part'] = parts[4]
disk_hash[partition]['size'] = parts[3]
# fill format, name and uuid
execStr = '/sbin/blkid'
res = self._runos(execStr,ret_list=True)
if res is False:
cl_overriding.printERROR("Cann't execute '%s'"%execStr)
cl_overriding.exit(1)
# map attribute name of blkid to disk_hash
blkid_hash = {'LABEL':'name',
'UUID':'uuid',
'TYPE':'format'}
for line in res:
# split line and discard empty elements
parts = filter(lambda x: x, line.strip().split(' '))
if len(parts)>1 and parts[0][:-1] in disks:
dev = parts[0][:-1]
for i in parts[1:]:
key,op,value = i.partition('=')
if key in blkid_hash:
key = blkid_hash[key]
disk_hash[dev][key] = value[1:-1]
mountOptionsList = filter(lambda x: x[0] in disk_hash.keys(),
map(lambda y: (y[0],filter(lambda z: z!="rw", y[3].split(','))),
map(lambda x: filter(lambda x: x ,x.replace('\t',' ').split(' ')),
filter(lambda x: x.strip() and not x.strip().startswith('#'),
open("/etc/fstab").readlines()))))
dictOptionList = {}
dictOptionList.update(mountOptionsList)
for dev in disk_hash.keys():
if dev in dictOptionList:
disk_hash[dev]['options'] = ",".join(dictOptionList[dev])
return disk_hash
def get_os_disk_dev(self):
"""List of available partition devices"""
return sorted(self.Get('os_disk_hash').keys())
def getAttributeFromHash(self,var,attr):
hash = self.Get(var)
return map(lambda x: hash[x][attr] if attr in hash[x] else "",
sorted(hash.keys()))
def get_os_disk_uuid(self):
"""List uudi for partition devices"""
return self.getAttributeFromHash('os_disk_hash','uuid')
def get_os_disk_options(self):
"""List mount options for partition devices"""
return self.getAttributeFromHash('os_disk_hash','options')
def get_os_install_disk_options(self):
"""List mount options for partition devices of installed os"""
return self.Get('os_disk_options')
def isFstabMount(self,path):
"""Get mount point or device from fstab"""
absPath = os.path.abspath(path)
# convert fstab to
# [['/dev/sda3', '/', '', 'reiserfs', 'noatime', '', '', '0', '2\n'],
# ['/dev/sda5', '/var/calculate', 'reiserfs', 'noatime', '0', '0\n']]
listFstab = map(lambda x: filter(lambda x: x,
x.replace('\t',' ').split(' ')),
filter(lambda x: not x.startswith('#') and x.strip(),
open("/etc/fstab")))
# get mount point or device or dir
return filter(lambda x: x!=absPath,
reduce(lambda x,y: y,
filter(lambda x: absPath in x and x[1] != "none",
map(lambda x: [x[0],
x[1] if x[2] != "swap" else "swap"],
listFstab)),[""]))[0]
def get_os_install_disk_mount(self):
"""List mounted points for installed system"""
rootdev = self.Get('os_install_root_dev')
disk_hash = self.Get('os_disk_hash')
fstabHasSwap = self.isFstabMount('swap')
def getMountPoint(disk):
if disk == rootdev:
return "/"
elif not fstabHasSwap and "format" in disk_hash[disk] and \
"swap" in disk_hash[disk]['format']:
return "swap"
else:
mount_point = self.isFstabMount(disk)
if mount_point in self.nonTransferedDirs:
return ""
else:
return mount_point
return map(lambda x: getMountPoint(x),
sorted(self.Get('os_disk_hash').keys()))
def get_os_disk_mount(self):
"""List mounted points for current operation system"""
disk_hash = self.Get('os_disk_hash')
return map(lambda x: self.isFstabMount(x) or "",
sorted(self.Get('os_disk_hash').keys()))
def get_os_disk_format(self):
"""List filesystem for partition devices"""
return self.getAttributeFromHash('os_disk_hash','format')
def get_os_install_disk_format(self):
"""Install list filesystem for partition devices"""
return self.Get('os_disk_format')
def get_os_disk_grub(self):
"""List grub id for partition devices"""
return self.getAttributeFromHash('os_disk_hash','grub')
def get_os_disk_part(self):
"""Type of partition devices (primary, extended or logical)"""
return self.getAttributeFromHash('os_disk_hash','part')
def get_os_install_disk_perform_format(self):
"""Needformat partitions"""
return self.getAttributeFromHash('os_disk_hash','needformat')
def get_os_disk_size(self):
"""Partition size"""
return self.getAttributeFromHash('os_disk_hash','size')
def get_os_disk_name(self):
"""Label of partitions"""
return self.getAttributeFromHash('os_disk_hash','name')
def get_os_device_dev(self):
"""Devices"""
return sorted(self.Get('os_device_hash').keys())
def get_os_device_type(self):
"""Device type (hdd,cdrom,usb-flash)"""
return self.getAttributeFromHash('os_device_hash','type')
def get_os_device_map(self):
"""Map number for grub"""
return self.getAttributeFromHash('os_device_hash','map')
def get_os_install_grub_devicemap_info(self):
"""Content of device.map file for grub"""
14 years ago
return "\n".join(map(lambda x: "(hd%s) /dev/%s" % (x[0],x[1]),
filter(lambda x:x[2] != "flash",
zip(self.Get('os_device_map'),
self.Get('os_device_dev'),
self.Get('os_device_type')))))
def get_os_install_fstab_mount_info(self):
"""Information about mount points for fstab"""
devicesForFstab = sorted(filter(lambda x:x[1] != "" and x[1] != "swap",
14 years ago
zip(self.Get('os_disk_dev'),
self.Get('os_install_disk_mount'),
self.Get('os_install_disk_format'),
self.Get('os_install_disk_options'))),
14 years ago
lambda x,y: cmp(x[1],y[1]))
# rootLine one string, but it correct work if devicesForFstab is empty
rootLine = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 2" %
(x[0],x[1],x[2],x[3]),
devicesForFstab[:1]))
otherLines = "\n".join(map(lambda x: "%s\t%s\t%s\t%s\t0 0" %
(x[0],x[1],x[2],x[3]),
devicesForFstab[1:]))
bindData = zip(self.Get('os_install_bind_dir'),
self.Get('os_install_bind_mountpoint'))
bindLines = "\n".join(map(lambda x: "%s\t%s\tnone\tbind\t0 0"\
%(x[0],x[1]), bindData))
return "\n".join(filter(lambda x: x, [rootLine,otherLines,bindLines]))
#def get_os_net_config_info(self):
#"""Content of /etc/conf.d/net file"""
#def getNetGateways():
#"""Get the network gateways"""
#execStr = 'route -n'
#res = self._runos(execStr,env={"LANG":"C"},ret_list=True)
#if res is False:
#cl_overriding.printERROR("Cann't execute '%s'"%execStr)
#cl_overriding.exit(1)
#flagData = False
## retData = {'eth0':'10.0.0.1' ...}
#retData = {}
#for line in res:
#if flagData:
#routeData = map(lambda y: y.strip(),
#filter(lambda x: x, line.split(" ")))
#if len(routeData) == 8:
#if routeData[3] == "UG" and routeData[7]!="lo":
#retData[routeData[7]] = routeData[1]
#if line.startswith('Destination'):
#flagData = True
#return retData
#strIpAllow = self.Get('os_net_ip')
#strNetAllow = self.Get('os_net_allow')
#strInterfInfo = self.Get('os_net_interfaces_info')
#systemName = self.Get('os_linux_shortname')
#ret = ""
#retList = []
#gatewaysData = getNetGateways()
#if strIpAllow and strNetAllow:
#listDevInfo = map(lambda x: x.split(" "),strInterfInfo.split(","))
#listDevInfo = map(lambda x: (x[0], x[1][1:-1]),listDevInfo)
#listDevInfo = filter(lambda x: x[1]!="Off" ,listDevInfo)
#listIpNet = map(lambda x: (x[0],"%s/%s"%(x[1],x[2])),
#zip(listDevInfo,strIpAllow.split(","),
#map(lambda x: x.partition("/")[2],
#strNetAllow.split(","))))
#flagNotPlug = not systemName in ("CLS")
#for info, ipNet in listIpNet:
#dev, data = info
#if data=="DHCP":
#retList.append('config_%s=( "dhcp" )'%dev)
#if flagNotPlug:
#retList.append('modules_%s=( "!plug" )'%dev)
#else:
#retList.append('config_%s=( %s )'%(dev,ipNet))
#if gatewaysData and dev in gatewaysData:
#retList.append('routes_%s=( "default gw %s" )'\
#%(dev,gatewaysData[dev]))
#if flagNotPlug:
#retList.append('modules_%s=( "!plug" )'%dev)
#return "\n".join(retList)
def get_os_install_fstab_swap_info(self):
"""Information about swap for fstab"""
14 years ago
return "\n".join(map(lambda x: "%s\tnone\tswap\tsw\t0 0"%x[0],
filter(lambda x: x[1] == "swap",
zip(self.Get('os_disk_dev'),
self.Get('os_install_disk_mount')))))
14 years ago
def get_os_install_linux_system(self):
14 years ago
"""Get linux system (server or desktop)"""
mapNameSystem = {'CDS':'server',
'CLD':'desktop',
'CLDG':'desktop',
'CLDX':'desktop',
'CLS':'desktop',
'CSS':'server'
}
shortName = self.Get('os_install_linux_shortname')
14 years ago
if shortName in mapNameSystem:
return mapNameSystem[shortName]
else:
return ""
def get_os_install_kernel_scheduler(self):
"""Get scheduler for install root device"""
14 years ago
root_dev = filter(lambda x: x[1] == '/',
zip(self.Get('os_disk_dev'),
self.Get('os_install_disk_mount')))
14 years ago
if root_dev:
root_dev = root_dev[0][0]
root_dev = filter(lambda x: x in root_dev,
self.Get('os_device_dev'))
if root_dev:
root_dev = root_dev[0]
pathScheduler = '/sys/block/%s/queue/scheduler'%root_dev
if access(pathScheduler,R_OK):
res = re.search("\[([^\]]+)\]",
open(pathScheduler).read(),re.S)
if res:
return res.groups()[0]
return "cfq"
def get_os_install_kernel_attr(self):
14 years ago
"""Kernel attributes"""
# TODO: on usb-hdd install must be "delay=5"
return ""
def get_os_install_kernel_resume(self):
"""install kernel resume parameter"""
14 years ago
for dev, install in zip(self.Get('os_disk_dev'),
self.Get('os_install_disk_mount')):
14 years ago
if install == "swap":
return "resume=%s"%dev
return ""
def get_os_install_arch_machine(self):
"""install maching architecture"""
return self.Get('os_arch_machine')
def get_cl_image(self):
"""Get image file from distributive repository"""
distRep = DistributiveRepository(self.Get('cl_image_path'))
return distRep.getBestDistributive(
march=self.Get('os_install_arch_machine'),
shortname=self.Get('os_install_linux_shortname').lower()) or ""
def get_os_install_linux_shortname(self):
"""Shortname of installation os"""
return self.Get('os_linux_shortname')
def get_os_install_linux_ver(self):
"""Linux version of installation os"""
imagename = self.Get('cl_image')
res = DistributiveRepository.reDistName.search(imagename)
if res:
return res.groupdict()['ver']
else:
if self.Get('os_install_linux_shortname') == \
self.Get('os_linux_shortname'):
return self.Get('os_linux_ver')
return ""
def get_cl_pkgdir_path(self):
return "/var/calculate/remote/packages/%s/%s/%s" % (
self.Get('os_linux_shortname'),
self.Get('os_linux_ver'),
self.Get('os_install_arch_machine'))
def get_os_install_linux_subname(self):
"""Subname of installation os"""
linuxShortName = self.Get("os_install_linux_shortname")
if linuxShortName:
dictLinuxSubName = {"CLD":"KDE", "CLDX":"XFCE", "CLDG":"GNOME"}
if linuxShortName in dictLinuxSubName.keys():
return dictLinuxSubName[linuxShortName]
else:
return ""
else:
return ""
def get_os_install_linux_name(self):
"""Name of installation os"""
linuxShortName = self.Get("os_install_linux_shortname")
if linuxShortName:
dictLinuxName = {"CLD":"Calculate Linux Desktop",
"CLDX":"Calculate Linux Desktop",
"CLDG":"Calculate Linux Desktop",
"CDS":"Calculate Directory Server",
"CLS":"Calculate Linux Scratch",
"CSS":"Calculate Scratch Server",
"Gentoo":"Gentoo"}
if linuxShortName in dictLinuxName.keys():
return dictLinuxName[linuxShortName]
else:
return "Linux"
else:
return "Linux"
def get_os_bind_hash(self):
"""List mounted points for current operation system"""
# convert fstab to
# [['/dev/sda3', '/', '', 'reiserfs', 'noatime', '', '', '0', '2\n'],
# ['/dev/sda5', '/var/calculate', 'reiserfs', 'noatime', '0', '0\n']]
listFstab = map(lambda x: filter(lambda x: x,
x.replace('\t',' ').split(' ')),
filter(lambda x: not x.startswith('#') and x.strip(),
open("/etc/fstab")))
return dict(map(lambda x:[x[0],x[1]],
filter(lambda x: "bind" in x[3],
listFstab)))
def get_os_bind_dir(self):
"""Directories for bind"""
return sorted(self.Get('os_bind_hash').keys())
def get_os_bind_mountpoint(self):
"""Mountpoint for directories bind"""
bindhash = self.Get('os_bind_hash')
return [ bindhash[i] for i in sorted(bindhash.keys()) ]
def get_os_install_bind_dir(self):
"""Install directories for bind"""
return map(lambda x:x[0],
# skip nonTransferedDirs
filter(lambda x: not x[0] in self.nonTransferedDirs and
not x[1] in self.nonTransferedDirs,
zip(self.Get('os_bind_dir'),
self.Get('os_bind_mountpoint'))))
def get_os_install_bind_mountpoint(self):
"""Mountpoint for install directories bind"""
return map(lambda x:x[1],
# skip nonTransferedDirs
filter(lambda x: not x[0] in self.nonTransferedDirs and
not x[1] in self.nonTransferedDirs,
zip(self.Get('os_bind_dir'),
self.Get('os_bind_mountpoint'))))
def get_os_install_root_dev(self):
return self.Get('os_root_dev')
def get_os_install_clock_timezone(self):
"""timezone for clock"""
zoneinfodir = "/usr/share/zoneinfo/"
localtimefile = "/etc/localtime"
# try get timezone from kernel calculate param
timezone = self.getValueFromCmdLine("calculate",2)
if timezone and \
os.path.exists(os.path.join(zoneinfodir,timezone)):
return timezone
# get timezone from localtime symlink
if os.path.lexists(localtimefile):
return os.readlink(localtimefile).replace(zoneinfodir,"")
return "UTC"
def get_os_install_clock_type(self):
"""type of clock (UTC or local)"""
clockTypeFile = ['/etc/conf.d/clock','/etc/conf.d/hwclock']
for f in clockTypeFile:
clock = self.getValueFromConfig(f,"clock")
if clock:
if clock.upper() == 'UTC':
return clock.upper()
elif clock.lower() == 'local':
return clock.lower()
return "local"
def get_os_install_x11_height(self):
"""Vertical resolution"""
return self.Get('os_x11_height')
def get_os_install_x11_width(self):
"""Horizontal resolution"""
return self.Get('os_x11_width')
def get_os_install_x11_resolution(self):
"""Xorg resolution"""
return "%sx%s"%(self.Get('os_x11_width'),self.Get('os_x11_height'))
def get_os_install_x11_video_drv(self):
"""Video driver used by xorg"""
return self.Get('os_x11_video_drv')
def get_os_install_x11_composite(self):
"""On/off composite"""
defaultCompositeOn = ["nvidia","intel"]
if self.Get('os_x11_video_drv') in defaultCompositeOn:
defaultComposite = "on"
else:
defaultComposite = "off"
state = self.get_composite_from_xorgconf()
return state or defaultComposite
def get_os_install_linguas(self):
"""Current linguas"""
def get_linguas(lines):
linguas = map(lambda x:x.strip().rpartition('=')[-1].strip('"\''),
filter(lambda x: x.startswith("LINGUAS="),
lines))
return linguas[-1] if linguas else ""
makeconf = '/etc/make.conf'
infocommand = 'emerge --info'
# get linguas from make.conf, emerge --info or default
return get_linguas(open(makeconf,'r')) or \
get_linguas(self._runos(infocommand)) or \
"bg en de es fr it pl pt_BR ru uk"
def get_os_install_locale_consolefont(self):
"""consolefont"""
locale = clLocale()
return locale.getFieldByLang("consolefont",
self.Get('os_install_locale_lang'))
def get_os_install_locale_keymap(self):
"""keymap of locale (used for /etc/conf.d/keymaps)"""
locale = clLocale()
# get keymap from boot calculate param (keymap specified
# by lang)
keymapConfd = '/etc/conf.d/keymaps'
keymap = self.getValueFromCmdLine("calculate",1)
if locale.isLangExists(keymap):
return locale.getFieldByLang('keymap',keymap)
# get keymap by os_install_locale_lang
keymap = self.getValueFromConfig(keymapConfd,'KEYMAP')
if keymap:
return keymap
return locale.getFieldByLang("keymap",
self.Get("os_install_locale_lang"))
def get_os_install_locale_dumpkeys(self):
"""dumpkeys charset for keymap"""
locale = clLocale()
# is specified keymap support by locale hash
if self.Get('os_install_locale_keymap') in locale.getFields('keymap'):
return locale.getFieldByKeymap("dumpkeys_charset",
self.Get('os_install_locale_keymap'))
else:
return locale.getFieldByLang("dumpkeys_charset",
self.Get('os_install_locale_lang'))
def get_os_install_locale_locale(self):
"""locale (example: ru_RU.UTF-8)"""
return self.Get('os_locale_locale')
def get_os_install_locale_lang(self):
"""lang (example: ru_RU)"""
locale = clLocale()
return locale.getLangByField("locale",
self.Get('os_install_locale_locale'))
def get_os_install_locale_language(self):
"""language (example: ru)"""
locale = clLocale()
return locale.getFieldByLang("language",
self.Get('os_install_locale_lang'))
def get_os_install_locale_xkb(self):
"""xkb layouts (example: en,ru)"""
locale = clLocale()
return locale.getFieldByLang("xkblayout",
self.Get('os_install_locale_lang'))
def get_os_install_locale_xkbname(self):
"""названия используемых раскладок клавиатуры для X"""
localeXkb = self.Get("os_install_locale_xkb")
if localeXkb:
return localeXkb.split("(")[0]
return ""
def get_os_install_net_hostname(self):
"""Computer hostname"""
return self.Get("os_net_hostname")
def get_os_install_net_allow(self):
"""Allowed network"""
return self.Get("os_net_allow")
def get_os_install_net_ip(self):
"""Ip for all network interfaces"""
return self.Get("os_net_ip")
def get_os_install_net_interfaces(self):
"""Net interfaces"""
return self.Get("os_net_interfaces")
def get_os_install_net_domain(self):
"""Domain"""
return self.Get("os_net_domain")
def get_os_install_root_type(self):
"""Type of device for install"""
rootdev = self.Get('os_install_root_dev')
devicetype = map(lambda x: x[1],
filter(lambda x:x[0] in rootdev,
zip(self.Get('os_device_dev'),
self.Get('os_device_type'))))
if devicetype:
return devicetype[0]
else:
return self.Get('os_root_type')