You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/net.py

851 lines
27 KiB

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import re
from os import path, readlink
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
READONLY, TableVariable, FieldValue,
VariableInterface, HumanReadable)
from calculate.lib.cl_lang import setLocalTranslate, _
setLocalTranslate('cl_install3', sys.modules[__name__])
from calculate.lib.utils.ip import (getInterfaces, getIp, getMask, getMac,
cidrToMask, maskToCidr, getIpNet, isDhcpIp,
checkIp, checkMask, isSlaveInterface,
getOperState, getPlugged)
import calculate.lib.utils.device as device
from calculate.lib.utils.device import lspci
from calculate.lib.utils.files import (listDirectory, readLinesFile)
from calculate.lib.utils import ip
from calculate.lib.utils.portage import isPkgInstalled
from operator import itemgetter
from itertools import *
from calculate.install.distr import DistributiveError
class NetHelper(VariableInterface):
"""
Network variables not using for flash installation
"""
routing = False
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return \
_("Network configuration is unavailable for Flash install")
if self.routing and not self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin=('off', 'dhcp'), limit=1):
return _("Network routing configuration is not available if all "
"interfaces are set to DHCP")
return ""
class VariableOsInstallNtp(NetHelper, Variable):
"""
NTP server for system
"""
opt = ['--ntp']
metavalue = "NTP"
value = "ntp0.zenon.net"
def init(self):
self.label = _("NTP server")
self.help = _("set the NTP server for the system")
class VariableOsInstallProxy(NetHelper, Variable):
"""
Proxy for system
"""
value = ""
class VariableOsInstallNetInterfaces(NetHelper, ReadonlyVariable):
"""
Net interface devices
"""
type = "list"
def init(self):
self.label = _("Interface")
def get(self):
return sorted(getInterfaces())
class VariableOsInstallNetInterfacesOrig(NetHelper, ReadonlyVariable):
"""
Net interface devices orig name from udev (enp0s2)
Depricated
"""
type = "list"
def get(self):
return self.Get('os_install_net_interfaces')
class VariableOsNetInterfacesInfo(NetHelper, ReadonlyVariable):
"""
Inforamation about net interfaces
"""
def get(self):
self.Get("os_net_interfaces")
listInterfacesInfo = []
# Получена ли сеть по DHCP если нет to ip или off
for interface, ipaddr, dhcp in zip(
self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_dhcp_set')):
if dhcp == "on":
listInterfacesInfo.append((interface, _("DHCP")))
else:
listInterfacesInfo.append((interface,
ipaddr if ipaddr else _("Off")))
return ", ".join(map(lambda x: "%s (%s)" % (x[0], x[1]),
listInterfacesInfo))
class VariableOsInstallNetData(NetHelper, TableVariable):
"""
Hash for information about net
"""
opt = ["--iface"]
metavalue = "IFACE_SETTINGS"
source = ["os_install_net_interfaces",
"os_install_net_status",
"os_install_net_mask",
"os_install_net_name",
"os_install_net_mac"]
def init(self):
def defaultInterface():
ifaces = getInterfaces()
if ifaces:
return ifaces[0]
else:
return "enp0s0"
self.label = _("Addresses")
# self.help = _("IP address with network (example:%s)")%"192.168.1.1/24"
self.help = _("Network interface, DHCP or IP address and network mask "
"(example: %s)") % (" --iface %s:192.168.1.1:24" %
defaultInterface())
def raiseReadonlyIndexError(self, fieldname="", variablename="",
value=""):
"""
Behavior on change readonly index
"""
raise VariableError(_("Network interface %s not found") % value)
class VariableOsInstallNetHostname(NetHelper, Variable):
"""
Computer hostname
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[0]
class VariableOsInstallNetFqdn(NetHelper, Variable):
"""
Full host name
"""
opt = ['--hostname']
metavalue = "HOSTNAME"
def init(self):
self.label = _("Hostname")
self.help = _("set either the short or the full hostname")
def set(self, value):
if "." in value:
return value
else:
return "%s.%s" % (value, self.Get('os_install_net_domain'))
def check(self, value):
maxfqdn = 254
if " " in value:
raise VariableError(_("Wrong hostname"))
if len(value) > maxfqdn:
raise VariableError(
_("The hostname length should be less than %d") % maxfqdn)
def get(self):
if path.exists('/proc/self/fd/1') and \
readlink('/proc/self/fd/1') == '/dev/console' and \
self.Get('os_root_dev') == '/dev/nfs':
return "calculate.local"
return self.Get('os_net_fqdn')
class VariableOsInstallNetDomain(NetHelper, Variable):
"""
Domain on install system
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[2]
class VariableOsInstallNetAllow(NetHelper, Variable):
"""
Allowed network
"""
def get(self):
"""Allowed network"""
return self.Get("os_net_allow")
class VariableOsInstallNetName(NetHelper, ReadonlyVariable):
"""
Net device names
"""
type = "list"
def init(self):
self.label = _("Name")
def get(self):
rePci = re.compile(r"(\d\d:\d\d\.\d)(?:/[^/]+){2}$")
def getPci(interface):
pathname = path.realpath(
device.sysfs.syspath(device.sysfs.Path.ClassNet, interface))
pci = rePci.search(pathname)
if pci:
return pci.group(1)
else:
return ""
pciEthernet = lspci(shortInfo=True)
return map(lambda x: "{vendor} {name}".format(**x),
map(lambda x: pciEthernet.get(getPci(x),
{'vendor': _("Unknown"),
'name': _("vendor")}),
self.Get('os_install_net_interfaces')))
class VariableOsInstallNetMac(NetHelper, ReadonlyVariable):
"""
Net devices mac (Example: 01:02:03:04:05:06)
"""
type = "list"
def init(self):
self.label = _("MAC")
def get(self):
return map(lambda x: getMac(x).lower(),
self.Get('os_install_net_interfaces'))
class VariableOsInstallNetStatus(NetHelper, Variable):
"""
Net status (dhcp,ip,or off)
"""
type = "choiceedit-list"
def init(self):
self.label = _("IP address")
def get(self):
return map(self.getDefaultValue,
self.Get('os_install_net_interfaces'))
def getDefaultValue(self, iface):
def statusValue(ipaddr, dhcp):
if not getPlugged(iface):
return 'off'
if isSlaveInterface(iface):
return 'off'
if dhcp == "on":
return "dhcp"
elif ipaddr:
return ipaddr
else:
if getOperState(iface) == 'down':
return "off"
else:
return "dhcp"
rootDevNfs = self.Get('os_root_dev') == '/dev/nfs'
return statusValue(getIp(iface), "on" \
if rootDevNfs or isDhcpIp(iface) else "off")
def set(self, value):
value = map(lambda x: x.lower() if x else x, value)
ifaces = self.Get('os_install_net_interfaces')
return map(lambda x: self.getDefaultValue(x[1]) \
if x[0] == "auto" else x[0],
zip(value, ifaces))
def check(self, value):
for status in value:
if status not in map(lambda x: x[0], self.choice()) and \
not checkIp(status):
raise VariableError(_("Wrong IP address %s") % status)
def choice(self):
return (("dhcp", _("DHCP")),
("off", _("Disabled")),
("auto", _("Auto")))
class VariableOsInstallNetIp(NetHelper, ReadonlyVariable):
"""
IP for all network interfaces
"""
type = "list"
def init(self):
self.label = _("IP address")
def get(self):
return map(lambda x: "" if x[1].lower() == "off" else
getIp(x[0]) if x[1].lower() == "dhcp" else x[1],
zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_status')))
# def check(self,value):
# dhcps = self.Get('os_install_net_dhcp_set')
# wrongIp = filter(lambda x:x[0] and not checkIp(x[0]),
# zip(value,dhcps))
# if wrongIp:
# if wrongIp[0][0]:
# raise VariableError(_("Wrong IP address %s")%wrongIp[0][0])
class VariableOsInstallNetNetwork(NetHelper, ReadonlyVariable):
"""
Network for ip (Example:192.168.0.0/16)
"""
type = "list"
def init(self):
self.label = _("Network")
def get(self):
return map(lambda x: getIpNet(x[0], x[1]) if x[0] and x[1] else "",
zip(self.Get('os_install_net_ip'),
self.Get('os_install_net_mask')))
class VariableOsInstallNetCidr(NetHelper, ReadonlyVariable):
"""
CIDR of interfaces
"""
type = "list"
def init(self):
self.label = _("CIDR")
def get(self):
"""
Get CIDR of ip,net (Example: 24)
"""
return map(lambda x: maskToCidr(x) if x else '',
self.Get('os_install_net_mask'))
class VariableOsInstallNetMask(NetHelper, Variable):
"""
Net mask of interfaces (Example:255.255.0.0)
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mask")
def get(self):
return map(lambda x: cidrToMask(getMask(x)),
self.Get('os_install_net_interfaces'))
def set(self, value):
"""
Convert to mask CIDR value
"""
def convertCidrToMask(x):
if x and x.isdigit() and int(x) in range(0, 33):
return cidrToMask(int(x))
else:
return x
res = map(convertCidrToMask, value)
return res
def check(self, value):
dhcps = self.Get('os_install_net_status')
wrongMask = filter(lambda x: (x[0] or not x[1] in ("off", "dhcp")) and \
not checkMask(x[0]),
zip(value, dhcps))
if wrongMask:
raise VariableError(_("Wrong mask %s") % wrongMask[0][0])
def choice(self):
return ["255.255.255.255",
"255.255.255.0",
"255.255.0.0",
"255.0.0.0",
"0.0.0.0"]
class VariableOsInstallNetDhcpSet(NetHelper, Variable):
"""
Describe ip was get by DHCP or manualy
"""
type = "boolauto-list"
def init(self):
self.label = _("DHCP")
def get(self):
return map(lambda x: "on" if x == "dhcp" else "off",
self.Get('os_install_net_status'))
class VariableOsInstallNetRouteData(NetHelper, TableVariable):
"""
Route table data
"""
opt = ["--route"]
metavalue = "NETROUTE"
source = ['os_install_net_route_network',
'os_install_net_route_gw',
'os_install_net_route_dev',
'os_install_net_route_src']
routing = True
def humanReadable(self):
return self.Get()
def init(self):
self.label = _("Routing")
self.help = \
_("add a routing rule (specified as "
"NETWORK:GATEWAY[:DEV[:SOURCE]])")
def get(self, hr=HumanReadable.No):
"""Routing hash"""
interfaces = self.Get('os_install_net_interfaces')
interfaces_status = self.Get('os_install_net_status')
interfaces_network = self.Get('os_install_net_network')
staticInterface = \
map(itemgetter(0, 2),
filter(lambda x: not x[1] in ("off", "dhcp"),
zip(interfaces, interfaces_status, interfaces_network)))
route_data = []
if staticInterface:
staticInterface, skipNet = zip(*staticInterface)
return map(lambda x: [x[0],
x[1].get('via', ''),
x[1].get('dev', ''),
x[1].get('src', '')],
ifilter(lambda x: not x[0] in skipNet,
ip.getRouteTable(staticInterface))) or [[]]
return [[]]
def getHumanReadableAuto(self):
return Variable.getHumanReadableAuto(self)
def setValue(self, value, force=False):
"""
Standard action for set value
"""
interfaces = self.Get('os_install_net_interfaces')
if len(interfaces) == 1:
wrapper = lambda x: x if x else [x[0],x[1],interfaces[0],x[3]]
else:
wrapper = lambda x: x
self.value = self.set(map(wrapper,value))
self.wasSet = True
self.invalid = False
# run check
if not force:
self._check()
class VariableOsInstallNetRouteNetwork(NetHelper, FieldValue, Variable):
"""
Net for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 0
def init(self):
self.label = _("Network")
def choice(self):
return ["default"] # +self.Get('os_install_net_network')
def check(self, value):
##########################
# detect duplicate network
##########################
for wrongnet in ifilterfalse(ip.checkNet,
ifilter("default".__ne__,
value)):
raise VariableError(_("Wrong network %s") % wrongnet)
dupNetwork = list(set(filter(lambda x: value.count(x) > 1,
value)))
if dupNetwork:
raise VariableError(
_("Network '%s' is used more than once") % dupNetwork[0])
class VariableOsInstallNetRouteGw(NetHelper, FieldValue, Variable):
"""
Gateway for route table record
"""
source_variable = "os_install_net_route_data"
column = 1
def init(self):
self.label = _("Gateway")
def check(self, value):
#############################
# search unreachable gateways
#############################
NET, GW = 0, 1
netsGw = zip(self.Get('os_install_net_route_network'),
value)
nets = filter(lambda x: x and x != "default",
chain(self.Get('os_install_net_route_network'),
self.Get('os_install_net_network')))
for wrongip in ifilterfalse(ip.checkIp, value):
raise VariableError(_("Wrong gateway IP %s") % wrongip)
wrongGws = map(lambda x: x[GW],
filter(lambda x: not ip.isIpInNet(x[GW],
*(set(nets) - set(
x[NET]))),
filter(lambda x: x[GW],
netsGw)))
if wrongGws:
raise VariableError(_("Gateways %s are unreachable") %
(",".join(wrongGws)))
class VariableOsInstallNetRouteDev(NetHelper, FieldValue, Variable):
"""
Device for route table record
"""
type = "choice-list"
source_variable = "os_install_net_route_data"
column = 2
def init(self):
self.label = _("Interface")
def choice(self):
return self.Get('os_install_net_interfaces')
class VariableOsInstallNetRouteSrc(NetHelper, FieldValue, Variable):
"""
Source ip for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 3
def init(self):
self.label = _("Source IP")
def choice(self):
return [""] + self.Get('os_install_net_ip')
def check(self, value):
for wrongip in ifilterfalse(ip.checkIp,
ifilter(None, value)):
raise VariableError(_("Wrong source IP %s") % wrongip)
ipAddrs = self.Get('os_install_net_ip')
wrongIps = filter(lambda x: x and not x in ipAddrs,
value)
if wrongIps:
raise VariableError(
_("Wrong IP address %s in the specified source IP") %
(",".join(wrongIps)))
class VariableOsInstallNetRoute(NetHelper, ReadonlyVariable):
"""
Data by route for conf.d/net
"""
def performRouteData(self, performFunc):
routeMatrix = zip(self.Get('os_install_net_route_network'),
self.Get('os_install_net_route_gw'),
self.Get('os_install_net_route_dev'),
self.Get('os_install_net_route_src'))
DEV, IP, CIDR, NET = 0, 1, 2, 1
return map(lambda x: performFunc(x[DEV], x[NET], routeMatrix),
# union ip and mask to ip/net
map(lambda x: (x[DEV], ip.getIpNet(x[IP], cidr=x[CIDR])) \
if x[IP] and x[CIDR] else (x[DEV], ""),
# filter(lambda x:x[IP] and x[CIDR],
zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_cidr'))))
def get(self):
"""Route info for conf.d/net"""
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin="off")
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
def getRouteForInterfaceConf(interface, net, routeMatrix):
NET, GW, DEV, SRC = 0, 1, 2, 3
return "\n".join(
# build string for route from net,gateway,dev and src
map(lambda x: "{net}{gateway}{src}".format(
net=x[NET],
gateway=" via %s" % x[GW] if x[GW] else "",
src=" src %s" % x[SRC] if x[SRC] else ""),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
filter(lambda x: (interface == x[DEV] or defaultDev and
interface == defaultDev) \
and net != x[NET], routeMatrix)))
return self.performRouteData(getRouteForInterfaceConf)
class VariableOsInstallNetNmroute(VariableOsInstallNetRoute):
"""
Data by route for NetworkManager
"""
mode = READONLY
def get(self):
"""Route info for system-connections of NetworkManager"""
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin="off")
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
def getRouteForInterfaceNM(interface, net, routeMatrix):
NET, GW, DEV, SRC = 0, 1, 2, 3
defaultGw = map(lambda x: "%s;" % x[GW],
filter(lambda x: interface == x[DEV] and \
x[NET] == "default",
routeMatrix))
return "{0}\n".format(defaultGw[0] if defaultGw else "") + \
"\n".join(
# build string for route from net,gateway,dev and src
map(lambda
x: "routes{num}={ip};{cidr};{gateway};0;".format(
num=x[0] + 1,
ip=x[1][NET].partition('/')[0],
cidr=x[1][NET].partition('/')[2],
gateway=x[1][GW] if x[1][GW] else "0.0.0.0"),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
enumerate(
filter(lambda x: (interface == x[
DEV] or defaultDev and
interface == defaultDev) and net !=
x[
NET] and \
x[NET] != "default",
routeMatrix))))
return self.performRouteData(getRouteForInterfaceNM)
class VariableOsInstallNetConfAvailable(NetHelper, Variable):
"""
Available net configuration
"""
type = "list"
def get(self):
mapNetConf = (('networkmanager', 'net-misc/networkmanager',
_("NetworkManager")),
('openrc', '', _('OpenRC')))
image = self.Get('cl_image')
if image:
with image as distr:
try:
distrPath = image.getDirectory()
return map(itemgetter(0, 2),
filter(lambda x: not x[1] or isPkgInstalled(x[1],
prefix=distrPath),
mapNetConf))
except DistributiveError as e:
pass
return sorted(map(itemgetter(0, 2), mapNetConf[-1:]), key=itemgetter(1))
class VariableOsInstallNetConf(NetHelper, Variable):
"""
Net setup (networkmanager or openrc)
"""
type = "choice"
opt = ["--netconf"]
metavalue = "NETMANAGER"
def init(self):
self.label = _("Network manager")
self.help = _("network manager")
def get(self):
"""Net setup (networkmanager or openrc)"""
if filter(lambda x: x.lower() == "networkmanager",
listDirectory('/etc/runlevels/boot') +
listDirectory('/etc/runlevels/default')) \
or self.Get('os_root_type') == "livecd":
nm = "networkmanager"
else:
nm = ""
for val, comment in self.Get('os_install_net_conf_available'):
if nm == val and not (self.Get('os_root_dev') == '/dev/nfs' and
self.Get(
'os_install_root_type') == "livecd"):
return nm
else:
return "openrc"
def choice(self):
return self.Get('os_install_net_conf_available')
class VariableOsInstallNetDnsSearch(NetHelper, Variable):
"""
Dns search
"""
opt = ["--domain-search"]
metavalue = "DOMAINS"
def init(self):
self.label = _("Search domains")
self.help = _("search domains (comma-separated)")
def isDNSByDHCP(self):
"""
If first interface get ip by DHCP dns must be DHCP
"""
statuses = self.Get('os_install_net_status')
if statuses:
if statuses[0] == "dhcp":
return True
return False
def set(self, value):
return " ".join(re.split('[; ,]', value))
def get(self):
"""Get current name servers"""
dnsSearch = " ".join(
map(lambda x: x.strip().partition("search")[2].strip(),
filter(lambda x: x.lstrip().startswith("search"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else dnsSearch
def humanReadable(self):
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetDns(VariableOsInstallNetDnsSearch):
"""
Dns servers
"""
opt = ["--dns"]
metavalue = "DNS"
def init(self):
self.label = _("Domain name server")
self.help = _("domain name server (comma-separated)")
def set(self, value):
return " ".join(re.split('[; ,]', value))
def get(self):
dnsIps = filter(ip.checkIp,
map(lambda x: x.strip().partition("nameserver")[
2].strip(),
filter(
lambda x: x.lstrip().startswith("nameserver"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else " ".join(dnsIps)
def check(self, value):
reIp = re.compile(ip.IP_ADDR)
if any(ifilterfalse(reIp.match, value.split(' '))):
raise VariableError(_("Wrong IP address for DNS"))
def humanReadable(self):
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetSettings(NetHelper, Variable):
"""
Net service configured
"""
type = "choice"
value = ""
def choice(self):
return [("", "")] + self.Get('os_install_net_conf_available')
class VariableOsInstallPxeIp(Variable):
"""
IP адрес PXE сервера
"""
type = "choice"
opt = ["--ip"]
metavalue = "IP"
def init(self):
self.label = _("PXE server IP")
self.help = "set IP address for PXE server"
def get(self):
for ipaddr in ifilter(None, self.Get('os_install_net_ip')):
return ipaddr
else:
return ""
def choice(self):
return filter(None, self.Get('os_install_net_ip'))