1#!/usr/bin/env python3 2# Copyright (c) 2014-2019 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""Linux network utilities. 6 7Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal 8""" 9 10import sys 11import socket 12import struct 13import array 14import os 15from binascii import unhexlify 16 17# STATE_ESTABLISHED = '01' 18# STATE_SYN_SENT = '02' 19# STATE_SYN_RECV = '03' 20# STATE_FIN_WAIT1 = '04' 21# STATE_FIN_WAIT2 = '05' 22# STATE_TIME_WAIT = '06' 23# STATE_CLOSE = '07' 24# STATE_CLOSE_WAIT = '08' 25# STATE_LAST_ACK = '09' 26STATE_LISTEN = '0A' 27# STATE_CLOSING = '0B' 28 29def get_socket_inodes(pid): 30 ''' 31 Get list of socket inodes for process pid. 32 ''' 33 base = '/proc/%i/fd' % pid 34 inodes = [] 35 for item in os.listdir(base): 36 target = os.readlink(os.path.join(base, item)) 37 if target.startswith('socket:'): 38 inodes.append(int(target[8:-1])) 39 return inodes 40 41def _remove_empty(array): 42 return [x for x in array if x !=''] 43 44def _convert_ip_port(array): 45 host,port = array.split(':') 46 # convert host from mangled-per-four-bytes form as used by kernel 47 host = unhexlify(host) 48 host_out = '' 49 for x in range(0, len(host) // 4): 50 (val,) = struct.unpack('=I', host[x*4:(x+1)*4]) 51 host_out += '%08x' % val 52 53 return host_out,int(port,16) 54 55def netstat(typ='tcp'): 56 ''' 57 Function to return a list with status of tcp connections at linux systems 58 To get pid of all network process running on system, you must run this script 59 as superuser 60 ''' 61 with open('/proc/net/'+typ,'r',encoding='utf8') as f: 62 content = f.readlines() 63 content.pop(0) 64 result = [] 65 for line in content: 66 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. 67 tcp_id = line_array[0] 68 l_addr = _convert_ip_port(line_array[1]) 69 r_addr = _convert_ip_port(line_array[2]) 70 state = line_array[3] 71 inode = int(line_array[9]) # Need the inode to match with process pid. 72 nline = [tcp_id, l_addr, r_addr, state, inode] 73 result.append(nline) 74 return result 75 76def get_bind_addrs(pid): 77 ''' 78 Get bind addresses as (host,port) tuples for process pid. 79 ''' 80 inodes = get_socket_inodes(pid) 81 bind_addrs = [] 82 for conn in netstat('tcp') + netstat('tcp6'): 83 if conn[3] == STATE_LISTEN and conn[4] in inodes: 84 bind_addrs.append(conn[1]) 85 return bind_addrs 86 87# from: https://code.activestate.com/recipes/439093/ 88def all_interfaces(): 89 ''' 90 Return all interfaces that are up 91 ''' 92 import fcntl # Linux only, so only import when required 93 94 is_64bits = sys.maxsize > 2**32 95 struct_size = 40 if is_64bits else 32 96 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 97 max_possible = 8 # initial value 98 while True: 99 bytes = max_possible * struct_size 100 names = array.array('B', b'\0' * bytes) 101 outbytes = struct.unpack('iL', fcntl.ioctl( 102 s.fileno(), 103 0x8912, # SIOCGIFCONF 104 struct.pack('iL', bytes, names.buffer_info()[0]) 105 ))[0] 106 if outbytes == bytes: 107 max_possible *= 2 108 else: 109 break 110 namestr = names.tobytes() 111 return [(namestr[i:i+16].split(b'\0', 1)[0], 112 socket.inet_ntoa(namestr[i+20:i+24])) 113 for i in range(0, outbytes, struct_size)] 114 115def addr_to_hex(addr): 116 ''' 117 Convert string IPv4 or IPv6 address to binary address as returned by 118 get_bind_addrs. 119 Very naive implementation that certainly doesn't work for all IPv6 variants. 120 ''' 121 if '.' in addr: # IPv4 122 addr = [int(x) for x in addr.split('.')] 123 elif ':' in addr: # IPv6 124 sub = [[], []] # prefix, suffix 125 x = 0 126 addr = addr.split(':') 127 for i,comp in enumerate(addr): 128 if comp == '': 129 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end 130 continue 131 x += 1 # :: skips to suffix 132 assert x < 2 133 else: # two bytes per component 134 val = int(comp, 16) 135 sub[x].append(val >> 8) 136 sub[x].append(val & 0xff) 137 nullbytes = 16 - len(sub[0]) - len(sub[1]) 138 assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0) 139 addr = sub[0] + ([0] * nullbytes) + sub[1] 140 else: 141 raise ValueError('Could not parse address %s' % addr) 142 return bytearray(addr).hex() 143 144def test_ipv6_local(): 145 ''' 146 Check for (local) IPv6 support. 147 ''' 148 import socket 149 # By using SOCK_DGRAM this will not actually make a connection, but it will 150 # fail if there is no route to IPv6 localhost. 151 have_ipv6 = True 152 try: 153 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 154 s.connect(('::1', 1)) 155 except socket.error: 156 have_ipv6 = False 157 return have_ipv6 158