# -*- coding: utf-8 -*-
import sys
import re
from os import path, readlink
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
READONLY, TableVariable, FieldValue,
VariableInterface, HumanReadable)
from calculate.lib.cl_lang import setLocalTranslate, _
setLocalTranslate('cl_install3', sys.modules[__name__])
from calculate.lib.utils.ip import (getInterfaces, getIp, getMask, getMac,
cidrToMask, maskToCidr, getIpNet, isDhcpIp,
checkIp, checkMask,
getOperState, getPlugged)
from calculate.lib.utils.device import lspci
from calculate.lib.utils.files import (listDirectory, readLinesFile)
from calculate.lib.utils import ip
from calculate.lib.utils.portage import isPkgInstalled
from operator import itemgetter
from itertools import *
from calculate.install.distr import DistributiveError
class NetHelper(VariableInterface):
Network variables not using for flash installation
routing = False
def uncompatible(self):
Network setting up unavailable for flash installation
if self.Get('os_install_root_type') == 'flash':
return \
_("Network configuration is unavailable for Flash install")
if self.routing and not self.Select('os_install_net_interfaces',
_notin=('off', 'dhcp'), limit=1):
return _("Network routing configuration is not available if all "
"interfaces are set to DHCP")
return ""
class VariableOsInstallNtp(NetHelper, Variable):
NTP server for system
opt = ['--ntp']
metavalue = "NTP"
value = ""
def init(self):
self.label = _("NTP server") = _("set the NTP server for the system")
class VariableOsInstallProxy(NetHelper, Variable):
Proxy for system
value = ""
class VariableOsInstallNetInterfaces(NetHelper, ReadonlyVariable):
Net interface devices
type = "list"
def init(self):
self.label = _("Interface")
def get(self):
return sorted(getInterfaces())
class VariableOsInstallNetInterfacesOrig(NetHelper, ReadonlyVariable):
Net interface devices orig name from udev (enp0s2)
type = "list"
def get(self):
return self.Get('os_install_net_interfaces')
class VariableOsNetInterfacesInfo(NetHelper, ReadonlyVariable):
Inforamation about net interfaces
9 years ago
def get(self):
9 years ago
listInterfacesInfo = []
# Получена ли сеть по DHCP если нет to ip или off
9 years ago
for interface, ipaddr, dhcp in zip(
if dhcp == "on":
listInterfacesInfo.append((interface, _("DHCP")))
ipaddr if ipaddr else _("Off")))
return ", ".join(map(lambda x: "%s (%s)" % (x[0], x[1]),
9 years ago
class VariableOsInstallNetData(NetHelper, TableVariable):
Hash for information about net
opt = ["--iface"]
metavalue = "IFACE_SETTINGS"
source = ["os_install_net_interfaces",
def init(self):
def defaultInterface():
ifaces = getInterfaces()
if ifaces:
return ifaces[0]
return "enp0s0"
self.label = _("Addresses")
9 years ago
# = _("IP address with network (example:%s)")%"" = _("Network interface, DHCP or IP address and network mask "
9 years ago
"(example: %s)") % (" --iface %s:" %
9 years ago
def raiseReadonlyIndexError(self, fieldname="", variablename="",
Behavior on change readonly index
9 years ago
raise VariableError(_("Network interface %s not found") % value)
9 years ago
class VariableOsInstallNetHostname(NetHelper, Variable):
Computer hostname
9 years ago
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[0]
9 years ago
class VariableOsInstallNetFqdn(NetHelper, Variable):
Full host name
opt = ['--hostname']
metavalue = "HOSTNAME"
def init(self):
self.label = _("Hostname")
12 years ago = _("set either the short or the full hostname")
def set(self, value):
if "." in value:
return value
9 years ago
return "%s.%s" % (value, self.Get('os_install_net_domain'))
9 years ago
def check(self, value):
maxfqdn = 254
if len(value) > maxfqdn:
raise VariableError(
9 years ago
_("The hostname length should be less than %d") % maxfqdn)
def get(self):
if path.exists('/proc/self/fd/1') and \
9 years ago
readlink('/proc/self/fd/1') == '/dev/console' and \
self.Get('os_root_dev') == '/dev/nfs':
return "calculate.local"
return self.Get('os_net_fqdn')
class VariableOsInstallNetDomain(NetHelper, Variable):
Domain on install system
9 years ago
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[2]
9 years ago
class VariableOsInstallNetAllow(NetHelper, Variable):
Allowed network
9 years ago
def get(self):
"""Allowed network"""
return self.Get("os_net_allow")
class VariableOsInstallNetName(NetHelper, ReadonlyVariable):
Net device names
type = "list"
def init(self):
self.label = _("Name")
def get(self):
rePci = re.compile(r"(\d\d:\d\d\.\d)(?:/[^/]+){2}$")
9 years ago
def getPci(interface):
pathname = path.realpath(path.join('/sys/class/net',
9 years ago
pci =
if pci:
return ""
9 years ago
pciEthernet = lspci(shortInfo=True)
9 years ago
return map(lambda x: "{vendor} {name}".format(**x),
map(lambda x: pciEthernet.get(getPci(x),
{'vendor': _("Unknown"),
'name': _("vendor")}),
class VariableOsInstallNetMac(NetHelper, ReadonlyVariable):
Net devices mac (Example: 01:02:03:04:05:06)
type = "list"
def init(self):
self.label = _("MAC")
def get(self):
return map(lambda x: getMac(x).lower(),
9 years ago
class VariableOsInstallNetStatus(NetHelper, Variable):
Net status (dhcp,ip,or off)
type = "choiceedit-list"
def init(self):
self.label = _("IP address")
def get(self):
return map(self.getDefaultValue,
9 years ago
def getDefaultValue(self, iface):
def statusValue(ipaddr, dhcp):
if not getPlugged(iface):
return 'off'
if dhcp == "on":
return "dhcp"
elif ipaddr:
return ipaddr
if getOperState(iface) == 'down':
return "off"
return "dhcp"
rootDevNfs = self.Get('os_root_dev') == '/dev/nfs'
9 years ago
return statusValue(getIp(iface), "on" \
if rootDevNfs or isDhcpIp(iface) else "off")
9 years ago
def set(self, value):
value = map(lambda x: x.lower() if x else x, value)
ifaces = self.Get('os_install_net_interfaces')
9 years ago
return map(lambda x: self.getDefaultValue(x[1]) \
if x[0] == "auto" else x[0],
zip(value, ifaces))
def check(self, value):
for status in value:
9 years ago
if status not in map(lambda x: x[0], self.choice()) and \
not checkIp(status):
raise VariableError(_("Wrong IP address %s") % status)
def choice(self):
9 years ago
return (("dhcp", _("DHCP")),
("off", _("Disabled")),
("auto", _("Auto")))
class VariableOsInstallNetIp(NetHelper, ReadonlyVariable):
IP for all network interfaces
type = "list"
def init(self):
self.label = _("IP address")
def get(self):
9 years ago
return map(lambda x: "" if x[1].lower() == "off" else
getIp(x[0]) if x[1].lower() == "dhcp" else x[1],
# def check(self,value):
# dhcps = self.Get('os_install_net_dhcp_set')
# wrongIp = filter(lambda x:x[0] and not checkIp(x[0]),
# zip(value,dhcps))
# if wrongIp:
# if wrongIp[0][0]:
# raise VariableError(_("Wrong IP address %s")%wrongIp[0][0])
class VariableOsInstallNetNetwork(NetHelper, ReadonlyVariable):
Network for ip (Example:
type = "list"
def init(self):
self.label = _("Network")
def get(self):
9 years ago
return map(lambda x: getIpNet(x[0], x[1]) if x[0] and x[1] else "",
9 years ago
class VariableOsInstallNetCidr(NetHelper, ReadonlyVariable):
CIDR of interfaces
type = "list"
def init(self):
self.label = _("CIDR")
def get(self):
Get CIDR of ip,net (Example: 24)
9 years ago
return map(lambda x: maskToCidr(x) if x else '',
9 years ago
class VariableOsInstallNetMask(NetHelper, Variable):
Net mask of interfaces (Example:
type = "choiceedit-list"
def init(self):
self.label = _("Mask")
def get(self):
9 years ago
return map(lambda x: cidrToMask(getMask(x)),
9 years ago
def set(self, value):
Convert to mask CIDR value
9 years ago
def convertCidrToMask(x):
9 years ago
if x and x.isdigit() and int(x) in range(0, 33):
return cidrToMask(int(x))
return x
res = map(convertCidrToMask, value)
return res
def check(self, value):
dhcps = self.Get('os_install_net_status')
9 years ago
wrongMask = filter(lambda x: (x[0] or not x[1] in ("off", "dhcp")) and \
not checkMask(x[0]),
zip(value, dhcps))
if wrongMask:
9 years ago
raise VariableError(_("Wrong mask %s") % wrongMask[0][0])
def choice(self):
return ["",
class VariableOsInstallNetDhcpSet(NetHelper, Variable):
Describe ip was get by DHCP or manualy
type = "boolauto-list"
def init(self):
self.label = _("DHCP")
def get(self):
9 years ago
return map(lambda x: "on" if x == "dhcp" else "off",
9 years ago
class VariableOsInstallNetRouteData(NetHelper, TableVariable):
Route table data
opt = ["--route"]
metavalue = "NETROUTE"
source = ['os_install_net_route_network',
routing = True
def humanReadable(self):
return self.Get()
def init(self):
self.label = _("Routing") = \
_("add a routing rule (specified as "
9 years ago
def get(self, hr=HumanReadable.No):
"""Routing hash"""
interfaces = self.Get('os_install_net_interfaces')
interfaces_status = self.Get('os_install_net_status')
interfaces_network = self.Get('os_install_net_network')
staticInterface = \
9 years ago
map(itemgetter(0, 2),
filter(lambda x: not x[1] in ("off", "dhcp"),
zip(interfaces, interfaces_status, interfaces_network)))
route_data = []
if staticInterface:
9 years ago
staticInterface, skipNet = zip(*staticInterface)
return map(lambda x: [x[0],
x[1].get('via', ''),
x[1].get('dev', ''),
x[1].get('src', '')],
ifilter(lambda x: not x[0] in skipNet,
ip.getRouteTable(staticInterface))) or [[]]
return [[]]
def getHumanReadableAuto(self):
return Variable.getHumanReadableAuto(self)
def setValue(self, value, force=False):
Standard action for set value
self.value = self.set(value)
9 years ago
self.wasSet = True
self.invalid = False
# run check
if not force:
9 years ago
class VariableOsInstallNetRouteNetwork(NetHelper, FieldValue, Variable):
Net for route table record
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 0
def init(self):
self.label = _("Network")
def choice(self):
9 years ago
return ["default"] # +self.Get('os_install_net_network')
9 years ago
def check(self, value):
# detect duplicate network
for wrongnet in ifilterfalse(ip.checkNet,
9 years ago
raise VariableError(_("Wrong network %s") % wrongnet)
dupNetwork = list(set(filter(lambda x: value.count(x) > 1,
if dupNetwork:
raise VariableError(
9 years ago
_("Network '%s' is used more than once") % dupNetwork[0])
9 years ago
class VariableOsInstallNetRouteGw(NetHelper, FieldValue, Variable):
Gateway for route table record
source_variable = "os_install_net_route_data"
column = 1
def init(self):
self.label = _("Gateway")
9 years ago
def check(self, value):
# search unreachable gateways
9 years ago
NET, GW = 0, 1
netsGw = zip(self.Get('os_install_net_route_network'),
9 years ago
nets = filter(lambda x: x and x != "default",
for wrongip in ifilterfalse(ip.checkIp, value):
raise VariableError(_("Wrong gateway IP %s") % wrongip)
wrongGws = map(lambda x: x[GW],
filter(lambda x: not ip.isIpInNet(x[GW],
*(set(nets) - set(
filter(lambda x: x[GW],
if wrongGws:
9 years ago
raise VariableError(_("Gateways %s are unreachable") %
9 years ago
class VariableOsInstallNetRouteDev(NetHelper, FieldValue, Variable):
Device for route table record
type = "choice-list"
source_variable = "os_install_net_route_data"
column = 2
def init(self):
self.label = _("Interface")
def choice(self):
return self.Get('os_install_net_interfaces')
class VariableOsInstallNetRouteSrc(NetHelper, FieldValue, Variable):
Source ip for route table record
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 3
def init(self):
self.label = _("Source IP")
def choice(self):
9 years ago
return [""] + self.Get('os_install_net_ip')
def check(self, value):
for wrongip in ifilterfalse(ip.checkIp,
9 years ago
ifilter(None, value)):
raise VariableError(_("Wrong source IP %s") % wrongip)
ipAddrs = self.Get('os_install_net_ip')
wrongIps = filter(lambda x: x and not x in ipAddrs,
9 years ago
if wrongIps:
raise VariableError(
9 years ago
_("Wrong IP address %s in the specified source IP") %
class VariableOsInstallNetRoute(NetHelper, ReadonlyVariable):
Data by route for conf.d/net
9 years ago
def performRouteData(self, performFunc):
routeMatrix = zip(self.Get('os_install_net_route_network'),
9 years ago
DEV, IP, CIDR, NET = 0, 1, 2, 1
return map(lambda x: performFunc(x[DEV], x[NET], routeMatrix),
# union ip and mask to ip/net
map(lambda x: (x[DEV], ip.getIpNet(x[IP], cidr=x[CIDR])) \
if x[IP] and x[CIDR] else (x[DEV], ""),
# filter(lambda x:x[IP] and x[CIDR],
def get(self):
"""Route info for conf.d/net"""
12 years ago
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
9 years ago
12 years ago
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
9 years ago
def getRouteForInterfaceConf(interface, net, routeMatrix):
NET, GW, DEV, SRC = 0, 1, 2, 3
return "\n".join(
9 years ago
# build string for route from net,gateway,dev and src
map(lambda x: "{net}{gateway}{src}".format(
gateway=" via %s" % x[GW] if x[GW] else "",
src=" src %s" % x[SRC] if x[SRC] else ""),
# filter by interface and discard direct routes
# example: for discard net
filter(lambda x: (interface == x[DEV] or defaultDev and
interface == defaultDev) \
and net != x[NET], routeMatrix)))
return self.performRouteData(getRouteForInterfaceConf)
class VariableOsInstallNetNmroute(VariableOsInstallNetRoute):
Data by route for NetworkManager
def get(self):
"""Route info for system-connections of NetworkManager"""
12 years ago
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
9 years ago
12 years ago
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
9 years ago
def getRouteForInterfaceNM(interface, net, routeMatrix):
NET, GW, DEV, SRC = 0, 1, 2, 3
defaultGw = map(lambda x: "%s;" % x[GW],
filter(lambda x: interface == x[DEV] and \
x[NET] == "default",
return "{0}\n".format(defaultGw[0] if defaultGw else "") + \
9 years ago
# build string for route from net,gateway,dev and src
x: "routes{num}={ip};{cidr};{gateway};0;".format(
num=x[0] + 1,
gateway=x[1][GW] if x[1][GW] else ""),
# filter by interface and discard direct routes
# example: for discard net
filter(lambda x: (interface == x[
DEV] or defaultDev and
interface == defaultDev) and net !=
NET] and \
x[NET] != "default",
return self.performRouteData(getRouteForInterfaceNM)
class VariableOsInstallNetConfAvailable(NetHelper, Variable):
Available net configuration
type = "list"
def get(self):
9 years ago
mapNetConf = (('networkmanager', 'net-misc/networkmanager',
('openrc', '', _('OpenRC')))
image = self.Get('cl_image')
if image:
with image as distr:
distrPath = image.getDirectory()
9 years ago
return map(itemgetter(0, 2),
filter(lambda x: not x[1] or isPkgInstalled(x[1],
except DistributiveError as e:
9 years ago
return sorted(map(itemgetter(0, 2), mapNetConf[-1:]), key=itemgetter(1))
9 years ago
class VariableOsInstallNetConf(NetHelper, Variable):
Net setup (networkmanager or openrc)
type = "choice"
opt = ["--netconf"]
metavalue = "NETMANAGER"
def init(self):
self.label = _("Network manager") = _("network manager")
def get(self):
"""Net setup (networkmanager or openrc)"""
9 years ago
if filter(lambda x: x.lower() == "networkmanager",
listDirectory('/etc/runlevels/boot') +
listDirectory('/etc/runlevels/default')) \
or self.Get('os_root_type') == "livecd":
nm = "networkmanager"
nm = ""
9 years ago
for val, comment in self.Get('os_install_net_conf_available'):
if nm == val and not (self.Get('os_root_dev') == '/dev/nfs' and
'os_install_root_type') == "livecd"):
return nm
return "openrc"
def choice(self):
return self.Get('os_install_net_conf_available')
class VariableOsInstallNetDnsSearch(NetHelper, Variable):
Dns search
opt = ["--domain-search"]
metavalue = "DOMAINS"
def init(self):
self.label = _("Search domains")
12 years ago = _("search domains (comma-separated)")
def isDNSByDHCP(self):
If first interface get ip by DHCP dns must be DHCP
statuses = self.Get('os_install_net_status')
if statuses:
if statuses[0] == "dhcp":
return True
return False
def set(self, value):
return " ".join(re.split('[; ,]', value))
def get(self):
"""Get current name servers"""
dnsSearch = " ".join(
9 years ago
map(lambda x: x.strip().partition("search")[2].strip(),
filter(lambda x: x.lstrip().startswith("search"),
return "" if self.isDNSByDHCP() else dnsSearch
def humanReadable(self):
9 years ago
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetDns(VariableOsInstallNetDnsSearch):
Dns servers
opt = ["--dns"]
metavalue = "DNS"
def init(self):
12 years ago
self.label = _("Domain name server") = _("domain name server (comma-separated)")
9 years ago
def set(self, value):
return " ".join(re.split('[; ,]', value))
def get(self):
dnsIps = filter(ip.checkIp,
9 years ago
map(lambda x: x.strip().partition("nameserver")[
lambda x: x.lstrip().startswith("nameserver"),
12 years ago
return "" if self.isDNSByDHCP() else " ".join(dnsIps)
9 years ago
def check(self, value):
reIp = re.compile(ip.IP_ADDR)
9 years ago
if any(ifilterfalse(reIp.match, value.split(' '))):
12 years ago
raise VariableError(_("Wrong IP address for DNS"))
def humanReadable(self):
9 years ago
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetSettings(NetHelper, Variable):
Net service configured
type = "choice"
value = ""
def choice(self):
9 years ago
return [("", "")] + self.Get('os_install_net_conf_available')
class VariableOsInstallPxeIp(Variable):
IP адрес PXE сервера
type = "choice"
opt = ["--ip"]
metavalue = "IP"
def init(self):
self.label = _("PXE server IP") = "set IP address for PXE server"
def get(self):
for ipaddr in ifilter(None, self.Get('os_install_net_ip')):
return ipaddr
return ""
def choice(self):
return filter(None, self.Get('os_install_net_ip'))