You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/net.py

785 lines
25 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2013 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path,readlink
from calculate.lib.datavars import Variable,VariableError,ReadonlyVariable, \
READONLY, TableVariable,FieldValue
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_install3',sys.modules[__name__])
from calculate.lib.utils.ip import (getInterfaces,getIp,getMask,getMac,
cidrToMask,maskToCidr,getIpNet,isDhcpIp,checkIp,checkMask,
getOperState,getPlugged)
from calculate.lib.utils.device import lspci
from calculate.lib.utils.files import listDirectory,readLinesFile,process,\
readFile
from calculate.lib.utils import ip
from calculate.lib.utils.portage import isPkgInstalled
from operator import itemgetter
from itertools import *
from functools import partial
import hashlib
from calculate.install.distr import DistributiveError
class NetHelper:
"""
Network variables not using for flash installation
"""
routing = False
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return \
_("Network configuration is unavailable for Flash install")
if self.routing and not self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin=('off','dhcp'),limit=1):
return _("Network routing configuration is not available if all "
"interfaces are set to DHCP")
return ""
class VariableOsInstallNtp(NetHelper,Variable):
"""
NTP server for system
"""
opt = ['--ntp']
metavalue = "NTP"
value = "ntp0.zenon.net"
def init(self):
self.label = _("NTP server")
self.help = _("set the NTP server for the system")
class VariableOsInstallProxy(NetHelper,Variable):
"""
Proxy for system
"""
value = ""
class VariableOsInstallNetInterfaces(NetHelper,ReadonlyVariable):
"""
Net interface devices
"""
type = "list"
def init(self):
self.label = _("Interface")
def get(self):
return sorted(getInterfaces())
class VariableOsInstallNetInterfacesOrig(NetHelper,ReadonlyVariable):
"""
Net interface devices orig name from udev (enp0s2)
Depricated
"""
type = "list"
def get(self):
return self.Get('os_install_net_interfaces')
class VariableOsNetInterfacesInfo(NetHelper,ReadonlyVariable):
"""
Inforamation about net interfaces
"""
def get(self):
netInterfaces=self.Get("os_net_interfaces")
listInterfacesInfo = []
# Получена ли сеть по DHCP если нет to ip или off
for interface,ipaddr,dhcp in zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_dhcp_set')):
if dhcp == "on":
listInterfacesInfo.append((interface, _("DHCP")))
else:
listInterfacesInfo.append((interface,
ipaddr if ipaddr else _("Off")))
return ", ".join(map(lambda x:"%s (%s)"%(x[0],x[1]),
listInterfacesInfo))
class VariableOsInstallNetData(NetHelper,TableVariable):
"""
Hash for information about net
"""
opt = ["--iface"]
metavalue = "IFACE_SETTINGS"
source = ["os_install_net_interfaces",
"os_install_net_status",
"os_install_net_mask",
"os_install_net_name",
"os_install_net_mac"]
def init(self):
def defaultInterface():
ifaces = getInterfaces()
if ifaces:
return ifaces[0]
else:
return "enp0s0"
self.label = _("Addresses")
# self.help = _("IP address with network (example:%s)")%"192.168.1.1/24"
self.help = _("Network interface, DHCP or IP address and network mask "
"(example: %s)")%(" --iface %s:192.168.1.1:24"%
defaultInterface())
def raiseReadonlyIndexError(self,fieldname="",variablename="",
value=""):
"""
Behavior on change readonly index
"""
raise VariableError(_("Network interface %s not found")%value)
class VariableOsInstallNetHostname(NetHelper,Variable):
"""
Computer hostname
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[0]
class VariableOsInstallNetFqdn(NetHelper,Variable):
"""
Full host name
"""
opt = ['--hostname']
metavalue = "HOSTNAME"
def init(self):
self.label = _("Hostname")
self.help = _("set either the short or the full hostname")
def set(self,value):
if "." in value:
return value
else:
return "%s.%s"%(value,self.Get('os_install_net_domain'))
def check(self,value):
maxfqdn = 254
if len(value) > maxfqdn:
raise VariableError(
_("The hostname length should be less than %d")%maxfqdn)
def get(self):
if path.exists('/proc/self/fd/1') and \
readlink('/proc/self/fd/1') == '/dev/console' and \
self.Get('os_root_dev') == '/dev/nfs':
return "calculate.local"
return self.Get('os_net_fqdn')
class VariableOsInstallNetDomain(NetHelper,Variable):
"""
Domain on install system
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[2]
class VariableOsInstallNetAllow(NetHelper,Variable):
"""
Allowed network
"""
def get(self):
"""Allowed network"""
return self.Get("os_net_allow")
class VariableOsInstallNetName(NetHelper,ReadonlyVariable):
"""
Net device names
"""
type = "list"
def init(self):
self.label = _("Name")
def get(self):
rePci = re.compile(r"(\d\d:\d\d\.\d)(?:/[^/]+){2}$")
def getPci(interface):
pathname = path.realpath(path.join('/sys/class/net',
interface))
pci = rePci.search(pathname)
if pci:
return pci.group(1)
else:
return ""
pciEthernet = lspci(shortInfo=True)
return map(lambda x:"{vendor} {name}".format(**x),
map(lambda x:pciEthernet.get(getPci(x),
{'vendor':_("Unknown"),
'name':_("vendor")}),
self.Get('os_install_net_interfaces')))
class VariableOsInstallNetMac(NetHelper,ReadonlyVariable):
"""
Net devices mac (Example: 01:02:03:04:05:06)
"""
type = "list"
def init(self):
self.label = _("MAC")
def get(self):
return map(lambda x:getMac(x).lower(),
self.Get('os_install_net_interfaces'))
class VariableOsInstallNetStatus(NetHelper,Variable):
"""
Net status (dhcp,ip,or off)
"""
type = "choiceedit-list"
def init(self):
self.label = _("IP address")
def get(self):
return map(self.getDefaultValue,
self.Get('os_install_net_interfaces'))
def getDefaultValue(self,iface):
def statusValue(ipaddr,dhcp):
if not getPlugged(iface):
return 'off'
if dhcp == "on":
return "dhcp"
elif ipaddr:
return ipaddr
else:
if getOperState(iface) == 'down':
return "off"
else:
return "dhcp"
rootDevNfs = self.Get('os_root_dev') == '/dev/nfs'
return statusValue(getIp(iface),"on" \
if rootDevNfs or isDhcpIp(iface) else "off")
def set(self,value):
value = map(lambda x:x.lower() if x else x,value)
ifaces = self.Get('os_install_net_interfaces')
return map(lambda x:self.getDefaultValue(x[1]) \
if x[0] == "auto" else x[0],
zip(value,ifaces))
def check(self,value):
for status in value:
if status not in map(lambda x:x[0],self.choice()) and \
not checkIp(status):
raise VariableError(_("Wrong IP address %s")%status)
def choice(self):
return (("dhcp",_("DHCP")),
("off", _("Disabled")),
("auto", _("Auto")))
class VariableOsInstallNetIp(NetHelper,ReadonlyVariable):
"""
IP for all network interfaces
"""
type = "list"
def init(self):
self.label = _("IP address")
def get(self):
return map(lambda x:"" if x[1].lower() == "off" else
getIp(x[0]) if x[1].lower() == "dhcp" else x[1],
zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_status')))
#def check(self,value):
# dhcps = self.Get('os_install_net_dhcp_set')
# wrongIp = filter(lambda x:x[0] and not checkIp(x[0]),
# zip(value,dhcps))
# if wrongIp:
# if wrongIp[0][0]:
# raise VariableError(_("Wrong IP address %s")%wrongIp[0][0])
class VariableOsInstallNetNetwork(NetHelper,ReadonlyVariable):
"""
Network for ip (Example:192.168.0.0/16)
"""
type = "list"
def init(self):
self.label = _("Network")
def get(self):
return map(lambda x:getIpNet(x[0],x[1]) if x[0] and x[1] else "",
zip(self.Get('os_install_net_ip'),
self.Get('os_install_net_mask')))
class VariableOsInstallNetCidr(NetHelper,ReadonlyVariable):
"""
CIDR of interfaces
"""
type = "list"
def init(self):
self.label = _("CIDR")
def get(self):
"""
Get CIDR of ip,net (Example: 24)
"""
return map(lambda x:maskToCidr(x) if x else '',
self.Get('os_install_net_mask'))
class VariableOsInstallNetMask(NetHelper,Variable):
"""
Net mask of interfaces (Example:255.255.0.0)
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mask")
def get(self):
return map(lambda x:cidrToMask(getMask(x)),
self.Get('os_install_net_interfaces'))
def set(self,value):
"""
Convert to mask CIDR value
"""
def convertCidrToMask(x):
if x and x.isdigit() and int(x) in range(0,33):
return cidrToMask(int(x))
else:
return x
res = map(convertCidrToMask,value)
return res
def check(self,value):
dhcps = self.Get('os_install_net_status')
wrongMask = filter(lambda x:(x[0] or not x[1] in ("off","dhcp")) and \
not checkMask(x[0]),
zip(value,dhcps))
if wrongMask:
raise VariableError(_("Wrong mask %s")%wrongMask[0][0])
def choice(self):
return ["255.255.255.255",
"255.255.255.0",
"255.255.0.0",
"255.0.0.0",
"0.0.0.0"]
class VariableOsInstallNetDhcpSet(NetHelper,Variable):
"""
Describe ip was get by DHCP or manualy
"""
type = "boolauto-list"
def init(self):
self.label = _("DHCP")
def get(self):
return map(lambda x:"on" if x == "dhcp" else "off",
self.Get('os_install_net_status'))
class VariableOsInstallNetRouteData(NetHelper,TableVariable):
"""
Route table data
"""
opt = ["--route"]
metavalue = "NETROUTE"
source = ['os_install_net_route_network',
'os_install_net_route_gw',
'os_install_net_route_dev',
'os_install_net_route_src']
routing = True
def humanReadable(self):
return self.Get()
def init(self):
self.label = _("Routing")
self.help = \
_("add a routing rule (specified as "
"NETWORK:[GATEWAY][:DEV[:SOURCE]])")
def get(self,hr=False):
"""Routing hash"""
interfaces = self.Get('os_install_net_interfaces')
interfaces_status = self.Get('os_install_net_status')
interfaces_network = self.Get('os_install_net_network')
staticInterface = \
map(itemgetter(0,2),
filter(lambda x:not x[1] in ("off","dhcp"),
zip(interfaces,interfaces_status,interfaces_network)))
route_data = []
if staticInterface:
staticInterface,skipNet = zip(*staticInterface)
return map(lambda x:[x[0],
x[1].get('via',''),
x[1].get('dev',''),
x[1].get('src','')],
ifilter(lambda x:not x[0] in skipNet,
ip.getRouteTable(staticInterface))) or [[]]
return [[]]
def getHumanReadableAuto(self):
return Variable.getHumanReadableAuto(self)
def setValue(self,value,force=False):
"""
Standard action for set value
"""
self.value = self.set(value)
self.wasSet = True
self.invalid = False
# run check
if not force:
self._check()
class VariableOsInstallNetRouteNetwork(FieldValue,NetHelper,Variable):
"""
Net for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 0
def init(self):
self.label = _("Network")
def choice(self):
return ["default"] #+self.Get('os_install_net_network')
def check(self,value):
##########################
# detect duplicate network
##########################
for wrongnet in ifilterfalse(ip.checkNet,
ifilter("default".__ne__,
value)):
raise VariableError(_("Wrong network %s")%wrongnet)
dupNetwork = list(set(filter(lambda x:value.count(x)>1,
value)))
if dupNetwork:
raise VariableError(
_("Network '%s' is used more than once")%dupNetwork[0])
class VariableOsInstallNetRouteGw(FieldValue,NetHelper,Variable):
"""
Gateway for route table record
"""
source_variable = "os_install_net_route_data"
column = 1
def init(self):
self.label = _("Gateway")
def check(self,value):
#############################
# search unreachable gateways
#############################
NET,GW = 0,1
netsGw = zip(self.Get('os_install_net_route_network'),
value)
nets = filter(lambda x:x and x != "default",
chain(self.Get('os_install_net_route_network'),
self.Get('os_install_net_network')))
for wrongip in ifilterfalse(ip.checkIp,value):
raise VariableError(_("Wrong gateway IP %s")%wrongip)
wrongGws = map(lambda x:x[GW],
filter(lambda x:not ip.isIpInNet(x[GW],
*(set(nets) - set(x[NET]))),
filter(lambda x:x[GW],
netsGw)))
if wrongGws:
raise VariableError(_("Gateways %s are unreachable")%
(",".join(wrongGws)))
class VariableOsInstallNetRouteDev(FieldValue,NetHelper,Variable):
"""
Device for route table record
"""
type = "choice-list"
source_variable = "os_install_net_route_data"
column = 2
def init(self):
self.label = _("Interface")
def choice(self):
return self.Get('os_install_net_interfaces')
class VariableOsInstallNetRouteSrc(FieldValue,NetHelper,Variable):
"""
Source ip for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 3
def init(self):
self.label = _("Source IP")
def choice(self):
return [""]+self.Get('os_install_net_ip')
def check(self,value):
for wrongip in ifilterfalse(ip.checkIp,
ifilter(None,value)):
raise VariableError(_("Wrong source IP %s")%wrongip)
ipAddrs = self.Get('os_install_net_ip')
wrongIps = filter(lambda x: x and not x in ipAddrs,
value)
if wrongIps:
raise VariableError(
_("Wrong IP address %s in the specified source IP")%
(",".join(wrongIps)))
class VariableOsInstallNetRoute(NetHelper,ReadonlyVariable):
"""
Data by route for conf.d/net
"""
def performRouteData(self,performFunc):
routeMatrix = zip(self.Get('os_install_net_route_network'),
self.Get('os_install_net_route_gw'),
self.Get('os_install_net_route_dev'),
self.Get('os_install_net_route_src'))
DEV,IP,CIDR,NET = 0,1,2,1
return map(lambda x:performFunc(x[DEV],x[NET],routeMatrix),
# union ip and mask to ip/net
map(lambda x:(x[DEV],ip.getIpNet(x[IP],cidr=x[CIDR])) \
if x[IP] and x[CIDR] else (x[DEV],""),
#filter(lambda x:x[IP] and x[CIDR],
zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_cidr'))))
def get(self):
"""Route info for conf.d/net"""
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin="off")
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
def getRouteForInterfaceConf(interface,net,routeMatrix):
NET,GW,DEV,SRC = 0,1,2,3
return "\n".join(
# build string for route from net,gateway,dev and src
map(lambda x:"{net}{gateway}{src}".format(
net=x[NET],
gateway=" via %s"%x[GW] if x[GW] else "",
src=" src %s"%x[SRC] if x[SRC] else ""),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
filter(lambda x:(interface==x[DEV] or defaultDev and \
interface==defaultDev) \
and net!=x[NET],routeMatrix)))
return self.performRouteData(getRouteForInterfaceConf)
class VariableOsInstallNetNmroute(VariableOsInstallNetRoute):
"""
Data by route for NetworkManager
"""
mode = READONLY
def get(self):
"""Route info for system-connections of NetworkManager"""
defaultDev = 0
workIfaces = self.Select('os_install_net_interfaces',
where='os_install_net_status',
_notin="off")
if len(workIfaces) == 1:
defaultDev = workIfaces[0]
def getRouteForInterfaceNM(interface,net,routeMatrix):
NET,GW,DEV,SRC = 0,1,2,3
defaultGw = map(lambda x:"%s;"%x[GW],
filter(lambda x:interface==x[DEV] and \
x[NET]=="default",
routeMatrix))
return "{0}\n".format(defaultGw[0] if defaultGw else "") + \
"\n".join(
# build string for route from net,gateway,dev and src
map(lambda x:"routes{num}={ip};{cidr};{gateway};0;".format(
num=x[0]+1,
ip=x[1][NET].partition('/')[0],
cidr=x[1][NET].partition('/')[2],
gateway=x[1][GW] if x[1][GW] else "0.0.0.0"),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
enumerate(
filter(lambda x:(interface==x[DEV] or defaultDev and \
interface==defaultDev) and net!=x[NET] and \
x[NET]!="default",routeMatrix))))
return self.performRouteData(getRouteForInterfaceNM)
class VariableOsInstallNetConfAvailable(NetHelper,Variable):
"""
Available net configuration
"""
type = "list"
def get(self):
mapNetConf = (('networkmanager','net-misc/networkmanager',
_("NetworkManager")),
('openrc','',_('OpenRC')))
image = self.Get('cl_image')
if image:
with image as distr:
try:
distrPath = image.getDirectory()
return map(itemgetter(0,2),
filter(lambda x:not x[1] or isPkgInstalled(x[1],
prefix=distrPath),
mapNetConf))
except DistributiveError as e:
pass
return sorted(map(itemgetter(0,2),mapNetConf[-1:]),key=itemgetter(1))
class VariableOsInstallNetConf(NetHelper,Variable):
"""
Net setup (networkmanager or openrc)
"""
type = "choice"
opt = ["--netconf"]
metavalue = "NETMANAGER"
def init(self):
self.label = _("Network manager")
self.help = _("network manager")
def get(self):
"""Net setup (networkmanager or openrc)"""
if filter(lambda x:x.lower() == ("networkmanager"),
listDirectory('/etc/runlevels/boot')+
listDirectory('/etc/runlevels/default')) \
or self.Get('os_root_type') == "livecd":
nm = "networkmanager"
else:
nm = ""
for val,comment in self.Get('os_install_net_conf_available'):
if nm == val and not (self.Get('os_root_dev') == '/dev/nfs' and \
self.Get('os_install_root_type') == "livecd"):
return nm
else:
return "openrc"
def choice(self):
return self.Get('os_install_net_conf_available')
class VariableOsInstallNetDnsSearch(NetHelper,Variable):
"""
Dns search
"""
opt = ["--domain-search"]
metavalue = "DOMAINS"
def init(self):
self.label = _("Search domains")
self.help = _("search domains (comma-separated)")
def isDNSByDHCP(self):
"""
If first interface get ip by DHCP dns must be DHCP
"""
statuses = self.Get('os_install_net_status')
if statuses:
if statuses[0] == "dhcp":
return True
return False
def set(self,value):
return " ".join(re.split('[; ,]',value))
def get(self):
"""Get current name servers"""
dnsSearch = " ".join(
map(lambda x:x.strip().partition("search")[2].strip(),
filter(lambda x:x.lstrip().startswith("search"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else dnsSearch
def humanReadable(self):
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetDns(VariableOsInstallNetDnsSearch):
"""
Dns servers
"""
opt = ["--dns"]
metavalue = "DNS"
def init(self):
self.label = _("Domain name server")
self.help = _("domain name server (comma-separated)")
def set(self,value):
return " ".join(re.split('[; ,]',value))
def get(self):
dnsIps = filter(ip.checkIp,
map(lambda x:x.strip().partition("nameserver")[2].strip(),
filter(lambda x:x.lstrip().startswith("nameserver"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else " ".join(dnsIps)
def check(self,value):
reIp = re.compile(ip.IP_ADDR)
if any(ifilterfalse(reIp.match,value.split(' '))):
raise VariableError(_("Wrong IP address for DNS"))
def humanReadable(self):
return self.Get() or (_("Get via DHCP")
if self.isDNSByDHCP()
else _("Not used"))
class VariableOsInstallNetSettings(NetHelper,Variable):
"""
Net service configured
"""
type = "choice"
value = ""
def choice(self):
return [("","")]+self.Get('os_install_net_conf_available')
class VariableOsInstallPxeIp(Variable):
"""
IP адрес PXE сервера
"""
type = "choice"
opt = ["--ip"]
metavalue = "IP"
def init(self):
self.label = _("PXE server IP")
self.help = "set IP address for PXE server"
def get(self):
for ipaddr in ifilter(None, self.Get('os_install_net_ip')):
return ipaddr
else:
return ""
def choice(self):
return filter(None, self.Get('os_install_net_ip'))