1#!/usr/bin/env python3 2# Copyright (c) 2014-2016 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6# Linux network utilities 7 8import sys 9import socket 10import fcntl 11import struct 12import array 13import os 14from binascii import unhexlify, hexlify 15 16# Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal 17STATE_ESTABLISHED = '01' 18STATE_SYN_SENT = '02' 19STATE_SYN_RECV = '03' 20STATE_FIN_WAIT1 = '04' 21STATE_FIN_WAIT2 = '05' 22STATE_TIME_WAIT = '06' 23STATE_CLOSE = '07' 24STATE_CLOSE_WAIT = '08' 25STATE_LAST_ACK = '09' 26STATE_LISTEN = '0A' 27STATE_CLOSING = '0B' 28 29def get_socket_inodes(pid): 30 ''' 31 Get list of socket inodes for process pid. 32 ''' 33 base = '/proc/%i/fd' % pid 34 inodes = [] 35 for item in os.listdir(base): 36 target = os.readlink(os.path.join(base, item)) 37 if target.startswith('socket:'): 38 inodes.append(int(target[8:-1])) 39 return inodes 40 41def _remove_empty(array): 42 return [x for x in array if x !=''] 43 44def _convert_ip_port(array): 45 host,port = array.split(':') 46 # convert host from mangled-per-four-bytes form as used by kernel 47 host = unhexlify(host) 48 host_out = '' 49 for x in range(0, len(host) // 4): 50 (val,) = struct.unpack('=I', host[x*4:(x+1)*4]) 51 host_out += '%08x' % val 52 53 return host_out,int(port,16) 54 55def netstat(typ='tcp'): 56 ''' 57 Function to return a list with status of tcp connections at linux systems 58 To get pid of all network process running on system, you must run this script 59 as superuser 60 ''' 61 with open('/proc/net/'+typ,'r',encoding='utf8') as f: 62 content = f.readlines() 63 content.pop(0) 64 result = [] 65 for line in content: 66 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. 67 tcp_id = line_array[0] 68 l_addr = _convert_ip_port(line_array[1]) 69 r_addr = _convert_ip_port(line_array[2]) 70 state = line_array[3] 71 inode = int(line_array[9]) # Need the inode to match with process pid. 72 nline = [tcp_id, l_addr, r_addr, state, inode] 73 result.append(nline) 74 return result 75 76def get_bind_addrs(pid): 77 ''' 78 Get bind addresses as (host,port) tuples for process pid. 79 ''' 80 inodes = get_socket_inodes(pid) 81 bind_addrs = [] 82 for conn in netstat('tcp') + netstat('tcp6'): 83 if conn[3] == STATE_LISTEN and conn[4] in inodes: 84 bind_addrs.append(conn[1]) 85 return bind_addrs 86 87# from: http://code.activestate.com/recipes/439093/ 88def all_interfaces(): 89 ''' 90 Return all interfaces that are up 91 ''' 92 is_64bits = sys.maxsize > 2**32 93 struct_size = 40 if is_64bits else 32 94 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 95 max_possible = 8 # initial value 96 while True: 97 bytes = max_possible * struct_size 98 names = array.array('B', b'\0' * bytes) 99 outbytes = struct.unpack('iL', fcntl.ioctl( 100 s.fileno(), 101 0x8912, # SIOCGIFCONF 102 struct.pack('iL', bytes, names.buffer_info()[0]) 103 ))[0] 104 if outbytes == bytes: 105 max_possible *= 2 106 else: 107 break 108 namestr = names.tostring() 109 return [(namestr[i:i+16].split(b'\0', 1)[0], 110 socket.inet_ntoa(namestr[i+20:i+24])) 111 for i in range(0, outbytes, struct_size)] 112 113def addr_to_hex(addr): 114 ''' 115 Convert string IPv4 or IPv6 address to binary address as returned by 116 get_bind_addrs. 117 Very naive implementation that certainly doesn't work for all IPv6 variants. 118 ''' 119 if '.' in addr: # IPv4 120 addr = [int(x) for x in addr.split('.')] 121 elif ':' in addr: # IPv6 122 sub = [[], []] # prefix, suffix 123 x = 0 124 addr = addr.split(':') 125 for i,comp in enumerate(addr): 126 if comp == '': 127 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end 128 continue 129 x += 1 # :: skips to suffix 130 assert(x < 2) 131 else: # two bytes per component 132 val = int(comp, 16) 133 sub[x].append(val >> 8) 134 sub[x].append(val & 0xff) 135 nullbytes = 16 - len(sub[0]) - len(sub[1]) 136 assert((x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)) 137 addr = sub[0] + ([0] * nullbytes) + sub[1] 138 else: 139 raise ValueError('Could not parse address %s' % addr) 140 return hexlify(bytearray(addr)).decode('ascii') 141 142def test_ipv6_local(): 143 ''' 144 Check for (local) IPv6 support. 145 ''' 146 import socket 147 # By using SOCK_DGRAM this will not actually make a connection, but it will 148 # fail if there is no route to IPv6 localhost. 149 have_ipv6 = True 150 try: 151 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 152 s.connect(('::1', 0)) 153 except socket.error: 154 have_ipv6 = False 155 return have_ipv6 156