# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import re from os import path, readlink from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable, READONLY, TableVariable, FieldValue, VariableInterface, HumanReadable) from calculate.lib.cl_lang import setLocalTranslate, _ from calculate.lib.variables.system import RootType setLocalTranslate('cl_install3', sys.modules[__name__]) from calculate.lib.utils.ip import (getInterfaces, getIp, getMask, getMac, cidrToMask, maskToCidr, getIpNet, isDhcpIp, checkIp, checkMask, isSlaveInterface, getOperState, getPlugged) import calculate.lib.utils.device as device from calculate.lib.utils.device import lspci from calculate.lib.utils.files import (listDirectory, readLinesFile) from calculate.lib.utils import ip from calculate.lib.utils.portage import isPkgInstalled from operator import itemgetter from itertools import * from ..distr import DistributiveError class NetHelper(VariableInterface): """ Network variables not using for flash installation """ routing = False def uncompatible(self): """ Network setting up unavailable for flash installation """ if self.Get('os_install_root_type') == 'flash': return \ _("Network configuration is unavailable for Flash install") if self.routing and not self.Select('os_install_net_interfaces', where='os_install_net_status', _notin=('off', 'dhcp'), limit=1): return _("Network routing configuration is not available if all " "interfaces are set to DHCP") if self.Get('cl_network_migrate_set') == 'on': return _("Network settings unavailable with use settings migration") return "" class VariableClNetworkConfigureSet(Variable): """ Выполнять ли настройку сети шаблонами """ type = "bool" opt = ["--network"] def init(self): self.label = _("Reconfigure network") self.help = _("reconfigure network") def get(self): if self.GetBool("cl_network_migrate_set"): return "off" else: return "on" class VariableClNetworkMigrateSet(Variable): """ Использовать миграцию для переноса настроек """ type = "bool" element = "radio" def init(self): self.label = _("Network") self.help = _("use the network migration") def choice(self): return [("on", _("Migrate network settings")), ("off", _("Manually network configuration"))] def get(self): for manvar in ("os_install_net_conf", "os_install_net_data", "os_install_net_fqdn", "os_install_ntp", "os_install_net_dns", "os_install_net_dns_search", "os_install_net_route_data"): if self.is_console_set(manvar): return "off" else: return "on" class VariableOsInstallNtp(NetHelper, Variable): """ NTP server for system """ opt = ['--ntp'] metavalue = "NTP" value = "ntp0.zenon.net" def init(self): self.label = _("NTP server") self.help = _("set the NTP server for the system") class VariableOsInstallProxy(NetHelper, Variable): """ Proxy for system """ value = "" class VariableOsInstallNetType(NetHelper, ReadonlyVariable): """ Тип сетевого устройства: сейчас eth или wlan """ type = "list" def _getType(self, iface): if ip.isWireless(iface): return "wlan" return "eth" def get(self): interfaces = self.Get('os_install_net_interfaces') return [self._getType(x) for x in interfaces] class VariableOsInstallNetInterfaces(NetHelper, ReadonlyVariable): """ Net interface devices """ type = "list" def init(self): self.label = _("Interface") def get(self): return sorted(getInterfaces()) class VariableOsInstallNetInterfacesOrig(NetHelper, ReadonlyVariable): """ Net interface devices orig name from udev (enp0s2) Depricated """ type = "list" def get(self): return self.Get('os_install_net_interfaces') class VariableOsNetInterfacesInfo(NetHelper, ReadonlyVariable): """ Inforamation about net interfaces """ def get(self): self.Get("os_net_interfaces") listInterfacesInfo = [] # Получена ли сеть по DHCP если нет to ip или off for interface, ipaddr, dhcp in zip( self.Get('os_install_net_interfaces'), self.Get('os_install_net_ip'), self.Get('os_install_net_dhcp_set')): if dhcp == "on": listInterfacesInfo.append((interface, _("DHCP"))) else: listInterfacesInfo.append((interface, ipaddr if ipaddr else _("Off"))) return ", ".join(("%s (%s)" % (x[0], x[1]) for x in listInterfacesInfo)) class VariableOsInstallNetData(NetHelper, TableVariable): """ Hash for information about net """ opt = ["--iface"] metavalue = "IFACE_SETTINGS" source = ["os_install_net_interfaces", "os_install_net_status", "os_install_net_mask", "os_install_net_name", "os_install_net_mac"] def init(self): def defaultInterface(): ifaces = getInterfaces() if ifaces: return ifaces[0] else: return "enp0s0" self.label = _("Addresses") # self.help = _("IP address with network (example:%s)")%"192.168.1.1/24" self.help = _("Network interface, DHCP or IP address and network mask " "(example: %s)") % (" --iface %s:192.168.1.1:24" % defaultInterface()) def raiseReadonlyIndexError(self, fieldname="", variablename="", value=""): """ Behavior on change readonly index """ raise VariableError(_("Network interface %s not found") % value) class VariableOsInstallNetHostname(NetHelper, Variable): """ Computer hostname """ def get(self): return self.Get('os_install_net_fqdn').partition('.')[0] class VariableOsInstallNetFqdn(NetHelper, Variable): """ Full host name """ opt = ['--hostname'] metavalue = "HOSTNAME" def init(self): self.label = _("Hostname") self.help = _("set either the short or the full hostname") def set(self, value): if "." in value: return value else: return "%s.%s" % (value, self.Get('os_install_net_domain')) def check(self, value): maxfqdn = 254 if " " in value: raise VariableError(_("Wrong hostname")) if len(value) > maxfqdn: raise VariableError( _("The hostname length should be less than %d") % maxfqdn) def get(self): if path.exists('/proc/self/fd/1') and \ readlink('/proc/self/fd/1') == '/dev/console' and \ self.Get('os_root_dev') == '/dev/nfs': return "calculate.local" return self.Get('os_net_fqdn') class VariableOsInstallNetDomain(NetHelper, Variable): """ Domain on install system """ def get(self): return self.Get('os_install_net_fqdn').partition('.')[2] class VariableOsInstallNetAllow(NetHelper, Variable): """ Allowed network """ def get(self): """Allowed network""" return self.Get("os_net_allow") class VariableOsInstallNetName(NetHelper, ReadonlyVariable): """ Net device names """ type = "list" def init(self): self.label = _("Name") def get(self): rePci = re.compile(r"(\d\d:\d\d\.\d)(?:/[^/]+){2}$") def getPci(interface): pathname = path.realpath( device.sysfs.syspath(device.sysfs.Path.ClassNet, interface)) pci = rePci.search(pathname) if pci: return pci.group(1) else: return "" pciEthernet = lspci(shortInfo=True) return ["{vendor} {name}".format(**x) for x in [pciEthernet.get(getPci(y), {'vendor': _("Unknown"), 'name': _("vendor")}) for y in self.Get('os_install_net_interfaces')]] class VariableOsInstallNetMacType(NetHelper, ReadonlyVariable): """ Net devices mac (Example: local/OUI) """ type = "list" reLocal = re.compile("^.[2367abef]:.*$", re.I) def init(self): self.label = _("Mac type") def _mactype(self, mac): if not mac: return "" if not self.reLocal.match(mac): return "OUI" else: return "local" def get(self): return [self._mactype(x) for x in self.Get('os_install_net_mac')] class VariableOsInstallNetMac(NetHelper, ReadonlyVariable): """ Net devices mac (Example: 01:02:03:04:05:06) """ type = "list" def init(self): self.label = _("MAC") def get(self): return [getMac(x).lower() for x in self.Get('os_install_net_interfaces')] class VariableOsInstallNetStatus(NetHelper, Variable): """ Net status (dhcp,ip,or off) """ type = "choiceedit-list" def init(self): self.label = _("IP address") def get(self): return [self.getDefaultValue(x) for x in self.Get('os_install_net_interfaces')] def getDefaultValue(self, iface): def statusValue(ipaddr, dhcp): if not getPlugged(iface): return 'off' if isSlaveInterface(iface): return 'off' if dhcp == "on": return "dhcp" elif ipaddr: return ipaddr else: if getOperState(iface) == 'down': return "off" else: return "dhcp" rootDevNfs = self.Get('os_root_dev') == '/dev/nfs' return statusValue(getIp(iface), "on" \ if rootDevNfs or isDhcpIp(iface) else "off") def set(self, value): value = (x.lower() if x else x for x in value) ifaces = self.Get('os_install_net_interfaces') return [self.getDefaultValue(x[1]) if x[0] == "auto" else x[0] for x in zip(value, ifaces)] def check(self, value): for status in value: if status not in (x[0] for x in self.choice()) and not checkIp(status): raise VariableError(_("Wrong IP address %s") % status) def choice(self): return (("dhcp", _("DHCP")), ("off", _("Disabled")), ("auto", _("Auto"))) class VariableOsInstallNetIp(NetHelper, ReadonlyVariable): """ IP for all network interfaces """ type = "list" def init(self): self.label = _("IP address") def get(self): return ["" if x[1].lower() == "off" else getIp(x[0]) if x[1].lower() == "dhcp" else x[1] for x in zip(self.Get('os_install_net_interfaces'), self.Get('os_install_net_status'))] # def check(self,value): # dhcps = self.Get('os_install_net_dhcp_set') # wrongIp = filter(lambda x:x[0] and not checkIp(x[0]), # zip(value,dhcps)) # if wrongIp: # if wrongIp[0][0]: # raise VariableError(_("Wrong IP address %s")%wrongIp[0][0]) class VariableOsInstallNetNetwork(NetHelper, ReadonlyVariable): """ Network for ip (Example:192.168.0.0/16) """ type = "list" def init(self): self.label = _("Network") def get(self): return [getIpNet(x[0], x[1]) if x[0] and x[1] else "" for x in zip(self.Get('os_install_net_ip'), self.Get('os_install_net_mask'))] class VariableOsInstallNetCidr(NetHelper, ReadonlyVariable): """ CIDR of interfaces """ type = "list" def init(self): self.label = _("CIDR") def get(self): """ Get CIDR of ip,net (Example: 24) """ return [maskToCidr(x) if x else '' for x in self.Get('os_install_net_mask')] class VariableOsInstallNetMask(NetHelper, Variable): """ Net mask of interfaces (Example:255.255.0.0) """ type = "choiceedit-list" def init(self): self.label = _("Mask") def get(self): return [cidrToMask(getMask(x)) for x in self.Get('os_install_net_interfaces')] def set(self, value): """ Convert to mask CIDR value """ def convertCidrToMask(x): if x and x.isdigit() and int(x) in range(0, 33): return cidrToMask(int(x)) else: return x res = [convertCidrToMask(x) for x in value] return res def check(self, value): dhcps = self.Get('os_install_net_status') wrongMask = [x for x in zip(value, dhcps) if (x[0] or not x[1] in ("off", "dhcp")) and not checkMask(x[0])] if wrongMask: raise VariableError(_("Wrong mask %s") % wrongMask[0][0]) def choice(self): return ["255.255.255.255", "255.255.255.0", "255.255.0.0", "255.0.0.0", "0.0.0.0"] class VariableOsInstallNetDhcpSet(NetHelper, Variable): """ Describe ip was get by DHCP or manualy """ type = "boolauto-list" def init(self): self.label = _("DHCP") def get(self): return ["on" if x == "dhcp" else "off" for x in self.Get('os_install_net_status')] class VariableOsInstallNetRouteData(NetHelper, TableVariable): """ Route table data """ opt = ["--route"] metavalue = "NETROUTE" source = ['os_install_net_route_network', 'os_install_net_route_gw', 'os_install_net_route_dev', 'os_install_net_route_src'] routing = True def humanReadable(self): return self.Get() def init(self): self.label = _("Routing") self.help = \ _("add a routing rule (specified as " "NETWORK:GATEWAY[:DEV[:SOURCE]])") def get(self, hr=HumanReadable.No): """Routing hash""" interfaces = self.Get('os_install_net_interfaces') interfaces_status = self.Get('os_install_net_status') interfaces_network = self.Get('os_install_net_network') staticInterface = [itemgetter(0, 2)(x) for x in zip(interfaces, interfaces_status, interfaces_network) if not x[1] in ("off", "dhcp")] route_data = [] if staticInterface: staticInterface, skipNet = list(zip(*staticInterface)) return [[x[0], x[1].get('via', ''), x[1].get('dev', ''), x[1].get('src', '')] for x in ip.getRouteTable(staticInterface) if not x[0] in skipNet] or [[]] return [[]] def getHumanReadableAuto(self): return Variable.getHumanReadableAuto(self) def setValue(self, value, force=False): """ Standard action for set value """ interfaces = self.Get('os_install_net_interfaces') if len(interfaces) == 1: wrapper = lambda x: x if not x else [x[0],x[1],interfaces[0],x[3]] else: wrapper = lambda x: x self.value = self.set([wrapper(x) for x in value]) self.wasSet = True self.invalid = False # run check if not force: self._check() class VariableOsInstallNetRouteNetwork(NetHelper, FieldValue, Variable): """ Net for route table record """ type = "choiceedit-list" source_variable = "os_install_net_route_data" column = 0 def init(self): self.label = _("Network") def choice(self): return ["default"] # +self.Get('os_install_net_network') def check(self, value): ########################## # detect duplicate network ########################## for wrongnet in filterfalse(ip.checkNet, filter("default".__ne__, value)): raise VariableError(_("Wrong network %s") % wrongnet) dupNetwork = list(set([x for x in value if value.count(x) > 1])) if dupNetwork: raise VariableError( _("Network '%s' is used more than once") % dupNetwork[0]) class VariableOsInstallNetRouteGw(NetHelper, FieldValue, Variable): """ Gateway for route table record """ source_variable = "os_install_net_route_data" column = 1 def init(self): self.label = _("Gateway") def check(self, value): ############################# # search unreachable gateways ############################# NET, GW = 0, 1 netsGw = zip(self.Get('os_install_net_route_network'), value) nets = [x for x in chain(self.Get('os_install_net_route_network'), self.Get('os_install_net_network')) if x and x != "default"] for wrongip in filterfalse(ip.checkIp, value): raise VariableError(_("Wrong gateway IP %s") % wrongip) wrongGws = [x[GW] for x in [y for y in netsGw if y[GW]] if not ip.isIpInNet(x[GW], *(set(nets) - set(x[NET])))] if wrongGws: raise VariableError(_("Gateways %s are unreachable") % (",".join(wrongGws))) class VariableOsInstallNetRouteDev(NetHelper, FieldValue, Variable): """ Device for route table record """ type = "choice-list" source_variable = "os_install_net_route_data" column = 2 def init(self): self.label = _("Interface") def choice(self): return self.Get('os_install_net_interfaces') class VariableOsInstallNetRouteSrc(NetHelper, FieldValue, Variable): """ Source ip for route table record """ type = "choiceedit-list" source_variable = "os_install_net_route_data" column = 3 def init(self): self.label = _("Source IP") def choice(self): return [""] + self.Get('os_install_net_ip') def check(self, value): for wrongip in filterfalse(ip.checkIp, filter(None, value)): raise VariableError(_("Wrong source IP %s") % wrongip) ipAddrs = self.Get('os_install_net_ip') wrongIps = [x for x in value if x and not x in ipAddrs] if wrongIps: raise VariableError( _("Wrong IP address %s in the specified source IP") % (",".join(wrongIps))) class VariableOsInstallNetRoute(NetHelper, ReadonlyVariable): """ Data by route for conf.d/net """ def performRouteData(self, performFunc): routeMatrix = list(zip(self.Get('os_install_net_route_network'), self.Get('os_install_net_route_gw'), self.Get('os_install_net_route_dev'), self.Get('os_install_net_route_src'))) DEV, IP, CIDR, NET = 0, 1, 2, 1 return [performFunc(x[DEV], x[NET], routeMatrix) for x # ip and mask to ip/net in [(y[DEV], ip.getIpNet(y[IP], cidr=y[CIDR])) if y[IP] and y[CIDR] else (y[DEV], "") for y in zip(self.Get('os_install_net_interfaces'), self.Get('os_install_net_ip'), self.Get('os_install_net_cidr'))]] def get(self): """Route info for conf.d/net""" defaultDev = 0 workIfaces = self.Select('os_install_net_interfaces', where='os_install_net_status', _notin="off") if len(workIfaces) == 1: defaultDev = workIfaces[0] def getRouteForInterfaceConf(interface, net, routeMatrix): NET, GW, DEV, SRC = 0, 1, 2, 3 # filter by interface and discard direct routes # example: for 192.168.1.5/24 discard 192.168.1.0/24 net route_list = [x for x in routeMatrix if (interface == x[DEV] or defaultDev and interface == defaultDev) and net != x[NET]] nets = [] route_list_uniqnet = [] for net, gw, dev, src in route_list: if net not in nets: route_list_uniqnet.append([net, gw, dev, src]) nets.append(net) route_strs = ("{net}{gateway}{src}".format(net=x[NET], gateway=" via %s" % x[GW] if x[GW] else "", src=" src %s" % x[SRC] if x[SRC] else "") for x in route_list_uniqnet) # build string for route from net,gateway,dev and src return "\n".join(route_strs) return self.performRouteData(getRouteForInterfaceConf) class VariableOsInstallNetNmroute(VariableOsInstallNetRoute): """ Data by route for NetworkManager """ mode = READONLY def get(self): """Route info for system-connections of NetworkManager""" defaultDev = 0 workIfaces = self.Select('os_install_net_interfaces', where='os_install_net_status', _notin="off") if len(workIfaces) == 1: defaultDev = workIfaces[0] def getRouteForInterfaceNM(interface, net, routeMatrix): NET, GW, DEV, SRC = 0, 1, 2, 3 defaultGw = ["%s;" % x[GW] for x in routeMatrix if interface == x[DEV] and x[NET] == "default"] # return "{0}\n".format(defaultGw[0] if defaultGw else "") + \ # "\n".join( # # build string for route from net,gateway,dev and src # map(lambda # x: "routes{num}={ip};{cidr};{gateway};0;".format( # num=x[0] + 1, # ip=x[1][NET].partition('/')[0], # cidr=x[1][NET].partition('/')[2], # gateway=x[1][GW] if x[1][GW] else "0.0.0.0"), # # filter by interface and discard direct routes # # example: for 192.168.1.5/24 discard 192.168.1.0/24 net # enumerate( # filter(lambda x: (interface == x[ # DEV] or defaultDev and # interface == defaultDev) and net != # x[ # NET] and \ # x[NET] != "default", # routeMatrix)))) return "{0}\n".format(defaultGw[0] if defaultGw else "") + "\n".join( # build string for route from net,gateway,dev and src ("routes{num}={ip};{cidr};{gateway};0;".format( num=x[0] + 1, ip=x[1][NET].partition('/')[0], cidr=x[1][NET].partition('/')[2], gateway=x[1][GW] if x[1][GW] else "0.0.0.0") for x # filter by interface and discard direct routes # example: for 192.168.1.5/24 discard 192.168.1.0/24 net in enumerate((x for x in routeMatrix if (interface == x[DEV] or defaultDev and interface == defaultDev) and net !=x[NET] and x[NET] != "default")))) return self.performRouteData(getRouteForInterfaceNM) class VariableOsInstallNetConfAvailable(NetHelper, Variable): """ Available net configuration """ type = "list" def get(self): mapNetConf = (('networkmanager', 'net-misc/networkmanager', _("NetworkManager")), ('openrc', '', _('OpenRC'))) image = self.Get('cl_image') if image: with image as distr: try: distrPath = image.getDirectory() return [itemgetter(0, 2)(x) for x in mapNetConf if not x[1] or isPkgInstalled(x[1], prefix=distrPath)] except DistributiveError as e: pass return sorted(map(itemgetter(0, 2), mapNetConf[-1:]), key=itemgetter(1)) class VariableOsInstallNetConf(NetHelper, Variable): """ Net setup (networkmanager or openrc) """ type = "choice" opt = ["--netconf"] metavalue = "NETMANAGER" def init(self): self.label = _("Network manager") self.help = _("network manager") def get(self): """Net setup (networkmanager or openrc)""" if [x for x in listDirectory('/etc/runlevels/boot') + listDirectory('/etc/runlevels/default') if x.lower() == "networkmanager"] or self.Get('os_root_type') == "livecd": nm = "networkmanager" else: nm = "" for val, comment in self.Get('os_install_net_conf_available'): if nm == val and not (self.Get('os_root_dev') == '/dev/nfs' and self.Get( 'os_install_root_type') == "livecd"): return nm else: return "openrc" def choice(self): return self.Get('os_install_net_conf_available') class VariableOsInstallNetDnsSearch(NetHelper, Variable): """ Dns search """ opt = ["--domain-search"] metavalue = "DOMAINS" def init(self): self.label = _("Search domains") self.help = _("search domains (comma-separated)") def isDNSByDHCP(self): """ If first interface get ip by DHCP dns must be DHCP """ statuses = self.Get('os_install_net_status') if statuses: if statuses[0] == "dhcp": return True return False def set(self, value): return " ".join(re.split('[; ,]', value)) def get(self): """Get current name servers""" dnsSearch = " ".join((x.strip().partition("search")[2].strip() for x in readLinesFile('/etc/resolv.conf') if x.lstrip().startswith("search"))) return "" if self.isDNSByDHCP() else dnsSearch def humanReadable(self): return self.Get() or (_("Get via DHCP") if self.isDNSByDHCP() else _("Not used")) class VariableOsInstallNetDns(VariableOsInstallNetDnsSearch): """ Dns servers """ opt = ["--dns"] metavalue = "DNS" def init(self): self.label = _("Domain name server") self.help = _("domain name server (comma-separated)") def set(self, value): return " ".join(re.split('[; ,]', value)) def get(self): dnsIps = (x for x in (y.strip().partition("nameserver")[2].strip() for y in readLinesFile('/etc/resolv.conf') if y.lstrip().startswith("nameserver")) if ip.checkIp(x)) return "" if self.isDNSByDHCP() else " ".join(dnsIps) def check(self, value): reIp = re.compile(ip.IP_ADDR) if any(filterfalse(reIp.match, value.split(' '))): raise VariableError(_("Wrong IP address for DNS")) def humanReadable(self): return self.Get() or (_("Get via DHCP") if self.isDNSByDHCP() else _("Not used")) class VariableOsInstallNetSettings(NetHelper, Variable): """ Net service configured """ type = "choice" value = "" def choice(self): return [("", "")] + self.Get('os_install_net_conf_available') class VariableOsInstallPxeIp(Variable): """ IP адрес PXE сервера """ type = "choice" opt = ["--ip"] metavalue = "IP" def init(self): self.label = _("PXE server IP") self.help = "set IP address for PXE server" def get(self): ips = self.Get('os_net_ip').split(',') for ipaddr in filter(None, ips): return ipaddr else: return "" def choice(self): ips = self.Get('os_net_ip').split(',') return [x for x in ips if x]