You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/install/variables/net.py

617 lines
19 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path
from calculate.lib.datavars import Variable,VariableError,ReadonlyVariable, \
READONLY
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_install3',sys.modules[__name__])
from calculate.lib.utils.ip import (getInterfaces,getIp,getMask,getMac,
cidrToMask,maskToCidr,getIpNet,isDhcpIp,checkIp,checkMask)
from calculate.lib.utils.device import lspci
from calculate.lib.utils.files import listDirectory,readLinesFile
from calculate.lib.utils import ip
from calculate.lib.utils.portage import isPkgInstalled
from operator import itemgetter
from itertools import *
class FieldValue:
"""
Table column variable
"""
type = "list"
source_variable = ""
column = 0
def get(self):
sourceVar = self.Get(self.source_variable)
if any(sourceVar):
return zip(*sourceVar)[self.column]
else:
return []
class NetVariable(Variable):
"""
Network variables not using for flash installation
"""
def uncompatible(self):
"""
Network setting up unavailable for flash installation
"""
if self.Get('os_install_root_type') == 'flash':
return \
_("Network configuration unavailable for flash installation")
return ""
class VariableOsInstallNtp(NetVariable):
"""
NTP server for system
"""
opt = ['--ntp']
metavalue = "NTP"
value = "ntp0.zenon.net"
def init(self):
self.label = _("NTP server")
self.help = _("set the ntp server for the system")
class VariableOsInstallProxy(NetVariable):
"""
Proxy for system
"""
value = ""
class VariableOsInstallNetInterfaces(NetVariable):
"""
Net interface devices
"""
type = "list"
mode = READONLY
def init(self):
self.label = _("Interface")
def get(self):
return getInterfaces()
class VariableOsNetInterfacesInfo(NetVariable):
"""
Inforamation about net interfaces
"""
def get(self):
netInterfaces=self.Get("os_net_interfaces")
listInterfacesInfo = []
# Получена ли сеть по DHCP если нет to ip или off
for interface,ipaddr,dhcp in zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_dhcp_set')):
if dhcp == "on":
listInterfacesInfo.append((interface, _("DHCP")))
else:
listInterfacesInfo.append((interface,
ipaddr if ipaddr else _("Off")))
return ", ".join(map(lambda x:"%s (%s)"%(x[0],x[1]),
listInterfacesInfo))
class VariableOsInstallNetData(NetVariable):
"""
Hash for information about net
"""
type = "table"
opt = ["--ip"]
metavalue = "IFACE:IP:MASK"
source = ["os_install_net_interfaces",
"os_install_net_dhcp_set",
"os_install_net_ip",
"os_install_net_mask",
"os_install_net_name",
"os_install_net_mac"]
def init(self):
self.label = _("Addresses")
self.help = _("ip address with net (example:%s)")%"192.168.1.1/24"
class VariableOsInstallNetHostname(NetVariable):
"""
Computer hostname
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[0]
class VariableOsInstallNetFqdn(NetVariable):
"""
Full host name
"""
opt = ['--hostname']
def init(self):
self.label = _("Hostname")
self.help = _("set the short hostname or full hostname")
def set(self,value):
if "." in value:
return value
else:
return "%s.%s"%(value,self.Get('os_install_net_domain'))
def check(self,value):
maxfqdn = 254
if len(value) > maxfqdn:
raise VariableError(
_("Hostname length should be less that %d")%maxfqdn)
def get(self):
return self.Get('os_net_fqdn')
class VariableOsInstallNetDomain(NetVariable):
"""
Domain on install system
"""
def get(self):
return self.Get('os_install_net_fqdn').partition('.')[2]
class VariableOsInstallNetAllow(NetVariable):
"""
Allowed network
"""
def get(self):
"""Allowed network"""
return self.Get("os_net_allow")
class VariableOsInstallNetName(NetVariable):
"""
Net device names
"""
type = "list"
mode = READONLY
def init(self):
self.label = _("Name")
def get(self):
rePci = re.compile(r"(\d\d:\d\d\.\d)(?:/[^/]+){2}$")
def getPci(interface):
pathname = path.realpath(path.join('/sys/class/net',
interface))
pci = rePci.search(pathname)
if pci:
return pci.group(1)
else:
return ""
pciEthernet = lspci(shortInfo=True)
return map(lambda x:"{vendor} {name}".format(**x),
map(lambda x:pciEthernet.get(getPci(x),
{'vendor':_("Unknown"),
'name':_("vendor")}),
self.Get('os_install_net_interfaces')))
class VariableOsInstallNetMac(NetVariable):
"""
Net devices mac (Example: 01:02:03:04:05:06)
"""
type = "list"
mode = READONLY
def init(self):
self.label = _("MAC")
def get(self):
return map(lambda x:getMac(x),
self.Get('os_install_net_interfaces'))
class VariableOsInstallNetIp(NetVariable):
"""
IP for all network interfaces
"""
type = "list"
def init(self):
self.label = _("IP address")
def get(self):
return map(lambda x:getIp(x),
self.Get('os_install_net_interfaces'))
def check(self,value):
wrongIp = filter(lambda x:not checkIp(x),value)
if wrongIp:
raise VariableError(_("Wrong IP address %s")%wrongIp[0])
class VariableOsInstallNetNetwork(NetVariable):
"""
Network for ip (Example:192.168.0.0/16)
"""
type = "list"
mode = READONLY
def init(self):
self.label = _("Network")
def get(self):
return map(lambda x:getIpNet(x[0],x[1]) if x[0] else "",
zip(self.Get('os_install_net_ip'),
self.Get('os_install_net_mask')))
class VariableOsInstallNetCidr(NetVariable):
"""
CIDR of interfaces
"""
type = "list"
mode = READONLY
def init(self):
self.label = _("CIDR")
def get(self):
"""
Get CIDR of ip,net (Example: 24)
"""
return map(lambda x:maskToCidr(x),
self.Get('os_install_net_mask'))
class VariableOsInstallNetMask(NetVariable):
"""
Net mask of interfaces (Example:255.255.0.0)
"""
type = "choiceedit-list"
def init(self):
self.label = _("Mask")
def get(self):
return map(lambda x:cidrToMask(getMask(x)),
self.Get('os_install_net_interfaces'))
def set(self,value):
"""
Convert to mask CIDR value
"""
def convertCidrToMask(x):
if x and x.isdigit() and int(x) in range(0,33):
return cidrToMask(int(x))
else:
return x
res = map(convertCidrToMask,value)
return res
def check(self,value):
wrongIp = filter(lambda x:not checkMask(x),value)
if wrongIp:
raise VariableError(_("Wrong mask %s")%wrongIp[0])
def choice(self):
return ["255.255.255.255",
"255.255.255.0",
"255.255.0.0",
"255.0.0.0",
"0.0.0.0"]
class VariableOsInstallNetDhcpSet(NetVariable):
"""
Describe ip was get by DHCP or manualy
"""
type = "boolauto-list"
def init(self):
self.label = _("DHCP")
def get(self):
return map(lambda x:"on" if isDhcpIp(x) else "off",
self.Get('os_install_net_interfaces'))
def set(self,value):
"""
Get method of receiving ip (dhcp on/off)
"""
return map(lambda x:("on"
if isDhcpIp(x[0]) else "off") if x[1] == "auto" else x[1],
zip(self.Get('os_install_net_interfaces'),
value))
class VariableOsInstallNetRouteData(NetVariable):
"""
Route table data
"""
opt = ["--route"]
metavalue = "NETWORK:[GATEWAY][:DEV[:SOURCE]]"
type = "table"
source = ['os_install_net_route_network',
'os_install_net_route_gw',
'os_install_net_route_dev',
'os_install_net_route_src']
def init(self):
self.label = _("Routing")
self.help = _("add a routing rule")
def get(self):
"""Routing hash"""
interfaces = self.Get('os_install_net_interfaces')
interfaces_dhcp = self.Get('os_install_net_dhcp_set')
interfaces_network = self.Get('os_install_net_network')
staticInterface = \
map(itemgetter(0,2),
filter(lambda x:x[1] == "off",
zip(interfaces,interfaces_dhcp,interfaces_network)))
route_data = []
if staticInterface:
staticInterface,skipNet = zip(*staticInterface)
return map(lambda x:[x[0],
x[1].get('via',''),
x[1].get('dev',''),
x[1].get('src','')],
ifilter(lambda x:not x[0] in skipNet,
ip.getRouteTable(staticInterface))) or [[]]
return [[]]
class VariableOsInstallNetRouteNetwork(FieldValue,NetVariable):
"""
Net for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 0
def init(self):
self.label = _("Network")
def choice(self):
return ["default"]+self.Get('os_install_net_network')
def check(self,value):
##########################
# detect duplicate network
##########################
dupNetwork = list(set(filter(lambda x:value.count(x)>1,
value)))
if dupNetwork:
raise VariableError(_("Network '%s' is used twice")%dupNetwork[0])
class VariableOsInstallNetRouteGw(FieldValue,NetVariable):
"""
Gateway for route table record
"""
source_variable = "os_install_net_route_data"
column = 1
def init(self):
self.label = _("Gateway")
def check(self,value):
#############################
# search unreachable gateways
#############################
NET,GW = 0,1
netsGw = zip(self.Get('os_install_net_route_network'),
value)
nets = filter(lambda x:x and x != "default",
self.Get('os_install_net_route_network'))
wrongGws = map(lambda x:x[GW],
filter(lambda x:not ip.isIpInNet(x[GW],
*(set(nets) - set(x[NET]))),
filter(lambda x:x[GW],
netsGw)))
if wrongGws:
raise VariableError(_("Gateways %s is unreachable")%
(",".join(wrongGws)))
class VariableOsInstallNetRouteDev(FieldValue,NetVariable):
"""
Device for route table record
"""
type = "choice-list"
source_variable = "os_install_net_route_data"
column = 2
def init(self):
self.label = _("Interface")
def choice(self):
return self.Get('os_install_net_interfaces')
class VariableOsInstallNetRouteSrc(FieldValue,NetVariable):
"""
Source ip for route table record
"""
type = "choiceedit-list"
source_variable = "os_install_net_route_data"
column = 3
def init(self):
self.label = _("Source")
def choice(self):
return [""]+self.Get('os_install_net_ip')
def check(self,value):
ipAddrs = self.Get('os_install_net_ip')
wrongIps = filter(lambda x: x and not x in ipAddrs,
value)
if wrongIps:
raise VariableError(_("Wrong ip addresse %s in source IP")%
(",".join(wrongIps)))
class VariableOsInstallNetRoute(NetVariable):
"""
Data by route for conf.d/net
"""
mode = READONLY
def performRouteData(self,performFunc):
routeMatrix = zip(self.Get('os_install_net_route_network'),
self.Get('os_install_net_route_gw'),
self.Get('os_install_net_route_dev'),
self.Get('os_install_net_route_src'))
DEV,IP,CIDR,NET = 0,1,2,1
return map(lambda x:performFunc(x[DEV],x[NET],routeMatrix),
# union ip and mask to ip/net
map(lambda x:(x[DEV],ip.getIpNet(x[IP],cidr=x[CIDR])),
filter(lambda x:x[IP] and x[CIDR],
zip(self.Get('os_install_net_interfaces'),
self.Get('os_install_net_ip'),
self.Get('os_install_net_cidr')))))
def get(self):
"""Route info for conf.d/net"""
def getRouteForInterfaceConf(interface,net,routeMatrix):
NET,GW,DEV,SRC = 0,1,2,3
return "\n".join(
# build string for route from net,gateway,dev and src
map(lambda x:"{net}{gateway}{src}".format(
net=x[NET],
gateway=" via %s"%x[GW] if x[GW] else "",
src=" src %s"%x[SRC] if x[SRC] else ""),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
filter(lambda x:interface==x[DEV] and net!=x[NET],routeMatrix)))
return self.performRouteData(getRouteForInterfaceConf)
class VariableOsInstallNetNmroute(VariableOsInstallNetRoute):
"""
Data by route for NetworkManager
"""
mode = READONLY
def get(self):
"""Route info for system-connections of NetworkManager"""
def getRouteForInterfaceNM(interface,net,routeMatrix):
NET,GW,DEV,SRC = 0,1,2,3
defaultGw = map(lambda x:"%s;"%x[GW],
filter(lambda x:interface==x[DEV] and \
x[NET]=="default",
routeMatrix))
return "{0}\n".format(defaultGw[0] if defaultGw else "") + \
"\n".join(
# build string for route from net,gateway,dev and src
map(lambda x:"routes{num}={ip};{cidr};{gateway};0;".format(
num=x[0]+1,
ip=x[1][NET].partition('/')[0],
cidr=x[1][NET].partition('/')[2],
gateway=x[1][GW] if x[1][GW] else "0.0.0.0"),
# filter by interface and discard direct routes
# example: for 192.168.1.5/24 discard 192.168.1.0/24 net
enumerate(
filter(lambda x:interface==x[DEV] and net!=x[NET] and \
x[NET]!="default",routeMatrix))))
return self.performRouteData(getRouteForInterfaceNM)
class VariableOsInstallNetConfAvailable(NetVariable):
"""
Available net configuration
"""
type = "list"
def get(self):
mapNetConf = (('networkmanager','net-misc/networkmanager',
_("NetworkManager")),
('openrc','',_('OpenRC')))
image = self.Get('cl_image')
if image:
with image as distr:
distrPath = image.getDirectory()
return map(itemgetter(0,2),
filter(lambda x:not x[1] or isPkgInstalled(x[1],
prefix=distrPath),
mapNetConf))
return map(itemgetter(0,2),mapNetConf[-1:])
class VariableOsInstallNetConf(NetVariable):
"""
Net setup (networkmanager or openrc)
"""
type = "choice"
opt = ["--netconf"]
metavalue = "NETMANAGER"
def init(self):
self.label = _("Network manager")
self.help = _("network manager")
def get(self):
"""Net setup (networkmanager or openrc)"""
if filter(lambda x:x.lower() == ("networkmanager"),
listDirectory('/etc/runlevels/boot')+
listDirectory('/etc/runlevels/default')) \
or self.Get('os_install_root_type') == "livecd":
nm = "networkmanager"
else:
nm = ""
for val,comment in self.Get('os_install_net_conf_available'):
if nm == val:
return nm
else:
return "openrc"
def choice(self):
return self.Get('os_install_net_conf_available')
class VariableOsInstallNetDnsSearch(NetVariable):
"""
Dns search
"""
def isDNSByDHCP(self):
"""
If first interface get ip by DHCP dns must be DHCP
"""
dhcps = self.Get('os_install_net_dhcp_set')
if dhcps:
if dhcps[0] == "on":
return True
return False
def get(self):
"""Get current name servers"""
dnsSearch = " ".join(
map(lambda x:x.strip().partition("search")[2].strip(),
filter(lambda x:x.lstrip().startswith("search"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else ",".join(dnsSearch)
class VariableOsInstallNetDns(VariableOsInstallNetDnsSearch):
"""
Dns servers
"""
def get(self):
dnsIps = filter(ip.checkIp,
map(lambda x:x.strip().partition("nameserver")[2].strip(),
filter(lambda x:x.lstrip().startswith("nameserver"),
readLinesFile('/etc/resolv.conf'))))
return "" if self.isDNSByDHCP() else ",".join(dnsIps)
def check(self,value):
reIp = re.compile(ip.IP_ADDR)
if filter(reIp.match,value.split(',')):
raise VariableError(_("Wrong DNS IP addresses"))
class VariableOsInstallNetSettings(NetVariable):
"""
Net service configured
"""
value = ""