You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/ip.py

507 lines
15 KiB

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from files import (process, checkUtils, readFile, listDirectory,
getRunCommands, getProgPath)
import sys
import os
import re
import struct
import fcntl
import socket
import math
import ctypes
from os import path
import select
import time
from calculate.lib.cl_lang import setLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_lib3', sys.modules[__name__])
SYSFS_NET_PATH = "/sys/class/net"
PROCFS_NET_PATH = "/proc/net/dev"
# From linux/sockios.h
SIOCGIFINDEX = 0x8933
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
SIOCSIFHWADDR = 0x8924
SIOCSIFADDR = 0x8916
SIOCSIFNETMASK = 0x891C
SIOCETHTOOL = 0x8946
SIOCGIFADDR = 0x8915
SIOCGIFNETMASK = 0x891B
SIOCGIFHWADDR = 0x8927
# Resources allocated
IFF_RUNNING = 0x40
IFF_MASTER = 0x400
IFF_SLAVE = 0x800
# ip digit from 0|1-255|254 (template)
IP_DIG = "[%s-9]|(?:1[0-9]|[1-9])[0-9]|2[0-4][0-9]|25[0-%s]"
# ip net 0-32
IP_NET_SUFFIX = "[0-9]|[12][0-9]|3[012]"
# ip digs 1-254,0-254,0-255
IP_DIGS = {'dig1_254': IP_DIG % (1, 4), 'dig0_254': IP_DIG % (0, 4),
'dig0_255': IP_DIG % (0, 5), }
# ip addr 10.0.0.12
IP_ADDR = "(%(dig1_254)s)\.(%(dig0_254)s)\.(%(dig0_254)s)\.(%(dig1_254)s)" % \
IP_DIGS
IP_MASK = "(%(dig0_255)s)\.(%(dig0_255)s)\.(%(dig0_255)s)\.(%(dig0_255)s)" % \
IP_DIGS
# ip addr for net 10.0.0.0
IP_NET = "(%(dig1_254)s)\.(%(dig0_254)s)\.(%(dig0_254)s)\.(%(dig0_254)s)" % \
IP_DIGS
# ip and net 192.168.0.0/16
IP_ADDR_NET = "(%(ipaddr)s)/((%(ipnet)s))" % {'ipaddr': IP_NET,
'ipnet': IP_NET_SUFFIX}
reIp = re.compile("^{0}$".format(IP_ADDR))
reNetSuffix = re.compile("^{0}$".format(IP_NET_SUFFIX))
reNet = re.compile("^{0}$".format(IP_ADDR_NET))
reMask = re.compile("^{0}$".format(IP_MASK))
def checkIp(ip):
"""Check ip"""
return reIp.match(ip)
def checkNetSuffix(netSuffix):
"""Check net suffix"""
return reNetSuffix.match(netSuffix)
def checkNet(net):
"""Check net"""
if not reNet.match(net):
return False
ip, op, cidr = net.partition('/')
mask = strIpToIntIp(cidrToMask(int(cidr)))
return (strIpToIntIp(ip) & mask) == (strIpToIntIp(ip))
maskDigs = map(lambda x: str(x),
(0b10000000, 0b11000000, 0b11100000, 0b11110000,
0b11111000, 0b11111100, 0b11111110, 0b11111111))
def checkMask(mask):
"""Check net"""
if not mask:
return False
if mask.count('.') != 3:
return False
zero = False
for dig in mask.split('.'):
if zero or not dig in maskDigs:
if dig == "0":
zero = True
else:
return False
return True
def getIpAndMask(interface="eth0"):
"""Get ip and mask from interface"""
ifconfig = process('/sbin/ifconfig', interface)
res = re.search(r"inet addr:(\S+)\s.*Mask:(\S+)", ifconfig.read(), re.S)
if res:
return res.groups()
else:
return "", ""
def strIpToIntIp(addr):
"""Convert ip specified by string to integer"""
addr = addr.split('.')
return ((int(addr[0]) << 24) |
(int(addr[1]) << 16) |
(int(addr[2]) << 8) |
(int(addr[3])))
def intIpToStrIp(addr):
"""Convert ip specified by integer to string"""
return "{0}.{1}.{2}.{3}".format(
addr >> 24, (addr >> 16) & 0xff, (addr >> 8) & 0xff, addr & 0xff)
def numMaskToCidr(netmask):
"""
Convert integer mask to cidr
"""
neg_net = ctypes.c_uint32(~netmask).value
return 32 - int(math.log(neg_net, 2)) - 1 if neg_net else 32
def maskToCidr(mask):
"""Convert mask specified by str to net"""
mask = strIpToIntIp(mask)
return numMaskToCidr(mask)
def cidrToMask(cidr):
"""Convert net to mask specified by str"""
return intIpToStrIp((2 ** cidr - 1) << (32 - cidr))
def getIpNet(ip, mask=None, cidr=None):
"""Get net (xx.xx.xx.xx/xx) by ip address and mask"""
ip = strIpToIntIp(ip)
if not mask is None:
net = maskToCidr(mask)
else:
net = int(cidr)
mask = cidrToMask(net)
mask = strIpToIntIp(mask)
return "{ip}/{net}".format(ip=intIpToStrIp(ip & mask),
net=net)
def isIpInNet(checkip, *ipnets):
"""Check is ip in specified nets"""
return map(lambda x: x[0],
filter(lambda x: strIpToIntIp(checkip) & x[2] == strIpToIntIp(
x[1]) & x[2],
map(lambda x: (
x[0], x[1][0],
strIpToIntIp(cidrToMask(int(x[1][1])))),
map(lambda x: (x, x.partition('/')[0::2]),
ipnets))))
def isDhcpIp(interface="eth0"):
"""Get ip by dhcp or static"""
# dhclients (dhcpcd, dhclient (dhcp), udhcpc (busybox)
commands = getRunCommands()
dhcpProgs = ("dhcpcd", "dhclient", "udhcpc")
if filter(lambda x: interface in x and any(prog in x for prog in dhcpProgs),
commands):
return True
else:
# если запущен демон dhcpcd
if filter(lambda x: "dhcpcd\x00-q" in x, commands):
curIp = getIp(interface)
dhcpcd = getProgPath('/sbin/dhcpcd')
leaseIp = \
map(lambda x: x.group(1),
filter(None,
map(re.compile('^ip_address=(.*)$').search,
process(dhcpcd, '-U', interface))))
if not curIp or leaseIp and leaseIp[0] == curIp:
return True
return False
def getRouteTable(onlyIface=()):
"""Get route table, exclude specifed iface"""
ipProg = checkUtils('/sbin/ip')
routes = process(ipProg, "route")
if onlyIface:
filterRe = re.compile("|".join(map(lambda x: r"dev %s" % x, onlyIface)))
routes = filter(filterRe.search, routes)
for line in routes:
network, op, line = line.partition(" ")
routeParams = map(lambda x: x.strip(), line.split())
# (network,{'via':value,'dev':value})
if network:
yield (network, dict(zip(routeParams[0::2], routeParams[1::2])))
def getInterfaces():
"""
Get available interfaces (discard which hasn't device)
"""
return sorted(
filter(lambda x: path.exists(path.join(SYSFS_NET_PATH, x, "device")),
listDirectory(SYSFS_NET_PATH)))
def getIp(iface):
sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ifreq = struct.pack('16sH14s', iface, socket.AF_INET, '\x00' * 14)
try:
res = fcntl.ioctl(sockfd, SIOCGIFADDR, ifreq)
except IOError:
return ""
finally:
sockfd.close()
ip = struct.unpack('16sH2x4s8x', res)[2]
return socket.inet_ntoa(ip)
def checkFlag(iface, flag):
sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ifreq = struct.pack('16sH14s', iface, socket.AF_INET, '\x00' * 14)
try:
res = fcntl.ioctl(sockfd, SIOCGIFFLAGS, ifreq)
except IOError:
return False
finally:
sockfd.close()
return bool(struct.unpack('16sH2x4s8x', res)[1] & flag)
def getPlugged(iface):
return checkFlag(iface, IFF_RUNNING)
def isSlaveInterface(iface):
return checkFlag(iface, IFF_SLAVE)
def isMasterInterface(iface):
return checkFlag(iface, IFF_MASTER)
def getMask(iface):
"""
Get mask for interface
"""
sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ifreq = struct.pack('16sH14s', iface, socket.AF_INET, '\x00' * 14)
try:
res = fcntl.ioctl(sockfd, SIOCGIFNETMASK, ifreq)
except IOError:
return 0
finally:
sockfd.close()
netmask = socket.ntohl(struct.unpack('16sH2xI8x', res)[2])
return numMaskToCidr(netmask)
def getMac(iface):
"""
Get mac for interface
"""
sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ifreq = struct.pack('16sH14s', iface, socket.AF_UNIX, '\x00' * 14)
try:
res = fcntl.ioctl(sockfd, SIOCGIFHWADDR, ifreq)
except IOError:
return ""
address = struct.unpack('16sH14s', res)[2]
mac = struct.unpack('6B8x', address)
sockfd.close()
return ":".join(['%02X' % i for i in mac])
def getOperState(iface):
"""
Get interface state up or down
"""
if readFile("/sys/class/net/%s/operstate" % iface) == "down":
return "down"
return "up"
def isOpenPort(ip, port):
"""
Test if an [ip:port] is open
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except Exception:
return False
class IPError(Exception):
"""Error received on work with ip"""
pass
class Pinger(object):
# ICMP parameters
ICMP_ECHO = 8 # Echo request (per RFC792)
ICMP_MAX_RECV = 2048 # Max size of incoming buffer
def checksum(self, source_string):
"""
A port of the functionality of in_cksum() from ping.c
Ideally this would act on the string as a series of 16-bit ints (host
packed), but this works.
Network data is big-endian, hosts are typically little-endian
"""
countTo = (int(len(source_string) / 2)) * 2
sum = 0
count = 0
# Handle bytes in pairs (decoding as short ints)
while count < countTo:
if sys.byteorder == "little":
loByte = ord(source_string[count])
hiByte = ord(source_string[count + 1])
else:
loByte = ord(source_string[count + 1])
hiByte = ord(source_string[count])
sum += hiByte * 256 + loByte
count += 2
# Handle last byte if applicable (odd-number of bytes)
# Endianness should be irrelevant in this case
if countTo < len(source_string): # Check for odd length
loByte = ord(source_string[len(source_string) - 1])
sum += loByte
# Truncate sum to 32 bits (a variance from ping.c,which
sum &= 0xffffffff
# uses signed ints, but overflow is unlikely in ping)
sum = (sum >> 16) + (sum & 0xffff) # Add high 16 bits to low 16 bits
sum += (sum >> 16) # Add carry from above (if any)
answer = ~sum & 0xffff # Invert and truncate to 16 bits
answer = socket.htons(answer)
return answer
def ping(self, destIP, timeout, numDataBytes):
"""
Returns either the delay (in ms) or None on timeout.
"""
delay = None
try: # One could use UDP here, but it's obscure
mySocket = socket.socket(socket.AF_INET, socket.SOCK_RAW,
socket.getprotobyname("icmp"))
except socket.error as e:
raise IPError(_("failed. (socket error: '%s')" % e.args[1]))
my_ID = os.getpid() & 0xFFFF
sentTime = self.send_one_ping(mySocket, destIP, my_ID, 0,
numDataBytes)
if sentTime is None:
mySocket.close()
return delay
recvTime, dataSize, iphSrcIP, icmpSeqNumber, iphTTL = \
self.receive_one_ping(mySocket, my_ID, timeout)
mySocket.close()
if recvTime:
delay = (recvTime - sentTime) * 1000
return (dataSize, socket.inet_ntoa(struct.pack("!I", iphSrcIP)),
iphTTL, delay)
else:
raise IPError(_("Request timed out"))
def send_one_ping(self, mySocket, destIP, myID, mySeqNumber, numDataBytes):
"""
Send one ping to the given >destIP<.
"""
try:
destIP = socket.gethostbyname(destIP)
except socket.gaierror as e:
raise IPError(e.strerror)
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
myChecksum = 0
# Make a dummy heder with a 0 checksum.
header = struct.pack(
"!BBHHH", self.ICMP_ECHO, 0, myChecksum, myID, mySeqNumber
)
padBytes = []
startVal = 0x42
for i in range(startVal, startVal + numDataBytes):
padBytes += [(i & 0xff)] # Keep chars in the 0-255 range
data = bytes(padBytes)
# Calculate the checksum on the data and the dummy header.
# Checksum is in network order
myChecksum = self.checksum(header + data)
# Now that we have the right checksum, we put that in. It's just easier
# to make up a new header than to stuff it into the dummy.
header = struct.pack(
"!BBHHH", self.ICMP_ECHO, 0, myChecksum, myID, mySeqNumber
)
packet = header + data
sendTime = time.time()
try:
# Port number is irrelevant for ICMP
mySocket.sendto(packet, (destIP, 1))
except socket.error as e:
raise IPError("General failure (%s)" % (e.args[1]))
return sendTime
def receive_one_ping(self, mySocket, myID, timeout):
"""
Receive the ping from the socket. Timeout = in ms
"""
timeLeft = timeout / 1000.0
while True: # Loop while waiting for packet or timeout
startedSelect = time.time()
whatReady = select.select([mySocket], [], [], timeLeft)
howLongInSelect = (time.time() - startedSelect)
if isinstance(whatReady[0], list) and not whatReady[0]: # Timeout
return None, 0, 0, 0, 0
timeReceived = time.time()
recPacket, addr = mySocket.recvfrom(self.ICMP_MAX_RECV)
ipHeader = recPacket[:20]
iphVersion, iphTypeOfSvc, iphLength, \
iphID, iphFlags, iphTTL, iphProtocol, \
iphChecksum, iphSrcIP, iphDestIP = struct.unpack(
"!BBHHHBBHII", ipHeader
)
icmpHeader = recPacket[20:28]
icmpType, icmpCode, icmpChecksum, \
icmpPacketID, icmpSeqNumber = struct.unpack(
"!BBHHH", icmpHeader
)
if icmpPacketID == myID: # Our packet
dataSize = len(recPacket) - 28
return timeReceived, dataSize, iphSrcIP, icmpSeqNumber, iphTTL
timeLeft = timeLeft - howLongInSelect
if timeLeft <= 0:
return None, 0, 0, 0, 0
def check_port(target, port, timeout=5):
"""
Проверить порт (port) на хосте (target)
:param target: хост
:param port: порт
:param timeout: таймаут
:return: True - порт открыт, False - нет хоста, порт закрыт, firewall
"""
try:
targetIP = socket.gethostbyname(target)
except socket.gaierror:
return False
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
s.connect((targetIP, port))
s.close()
except (socket.error, socket.timeout):
return False
return True