1#!/usr/bin/env python3
2# Copyright (c) 2014-2019 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Linux network utilities.
6
7Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
8"""
9
10import sys
11import socket
12import struct
13import array
14import os
15from binascii import unhexlify
16
17# STATE_ESTABLISHED = '01'
18# STATE_SYN_SENT  = '02'
19# STATE_SYN_RECV = '03'
20# STATE_FIN_WAIT1 = '04'
21# STATE_FIN_WAIT2 = '05'
22# STATE_TIME_WAIT = '06'
23# STATE_CLOSE = '07'
24# STATE_CLOSE_WAIT = '08'
25# STATE_LAST_ACK = '09'
26STATE_LISTEN = '0A'
27# STATE_CLOSING = '0B'
28
29def get_socket_inodes(pid):
30    '''
31    Get list of socket inodes for process pid.
32    '''
33    base = '/proc/%i/fd' % pid
34    inodes = []
35    for item in os.listdir(base):
36        target = os.readlink(os.path.join(base, item))
37        if target.startswith('socket:'):
38            inodes.append(int(target[8:-1]))
39    return inodes
40
41def _remove_empty(array):
42    return [x for x in array if x !='']
43
44def _convert_ip_port(array):
45    host,port = array.split(':')
46    # convert host from mangled-per-four-bytes form as used by kernel
47    host = unhexlify(host)
48    host_out = ''
49    for x in range(0, len(host) // 4):
50        (val,) = struct.unpack('=I', host[x*4:(x+1)*4])
51        host_out += '%08x' % val
52
53    return host_out,int(port,16)
54
55def netstat(typ='tcp'):
56    '''
57    Function to return a list with status of tcp connections at linux systems
58    To get pid of all network process running on system, you must run this script
59    as superuser
60    '''
61    with open('/proc/net/'+typ,'r',encoding='utf8') as f:
62        content = f.readlines()
63        content.pop(0)
64    result = []
65    for line in content:
66        line_array = _remove_empty(line.split(' '))     # Split lines and remove empty spaces.
67        tcp_id = line_array[0]
68        l_addr = _convert_ip_port(line_array[1])
69        r_addr = _convert_ip_port(line_array[2])
70        state = line_array[3]
71        inode = int(line_array[9])                      # Need the inode to match with process pid.
72        nline = [tcp_id, l_addr, r_addr, state, inode]
73        result.append(nline)
74    return result
75
76def get_bind_addrs(pid):
77    '''
78    Get bind addresses as (host,port) tuples for process pid.
79    '''
80    inodes = get_socket_inodes(pid)
81    bind_addrs = []
82    for conn in netstat('tcp') + netstat('tcp6'):
83        if conn[3] == STATE_LISTEN and conn[4] in inodes:
84            bind_addrs.append(conn[1])
85    return bind_addrs
86
87# from: https://code.activestate.com/recipes/439093/
88def all_interfaces():
89    '''
90    Return all interfaces that are up
91    '''
92    import fcntl  # Linux only, so only import when required
93
94    is_64bits = sys.maxsize > 2**32
95    struct_size = 40 if is_64bits else 32
96    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
97    max_possible = 8 # initial value
98    while True:
99        bytes = max_possible * struct_size
100        names = array.array('B', b'\0' * bytes)
101        outbytes = struct.unpack('iL', fcntl.ioctl(
102            s.fileno(),
103            0x8912,  # SIOCGIFCONF
104            struct.pack('iL', bytes, names.buffer_info()[0])
105        ))[0]
106        if outbytes == bytes:
107            max_possible *= 2
108        else:
109            break
110    namestr = names.tobytes()
111    return [(namestr[i:i+16].split(b'\0', 1)[0],
112             socket.inet_ntoa(namestr[i+20:i+24]))
113            for i in range(0, outbytes, struct_size)]
114
115def addr_to_hex(addr):
116    '''
117    Convert string IPv4 or IPv6 address to binary address as returned by
118    get_bind_addrs.
119    Very naive implementation that certainly doesn't work for all IPv6 variants.
120    '''
121    if '.' in addr: # IPv4
122        addr = [int(x) for x in addr.split('.')]
123    elif ':' in addr: # IPv6
124        sub = [[], []] # prefix, suffix
125        x = 0
126        addr = addr.split(':')
127        for i,comp in enumerate(addr):
128            if comp == '':
129                if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
130                    continue
131                x += 1 # :: skips to suffix
132                assert x < 2
133            else: # two bytes per component
134                val = int(comp, 16)
135                sub[x].append(val >> 8)
136                sub[x].append(val & 0xff)
137        nullbytes = 16 - len(sub[0]) - len(sub[1])
138        assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
139        addr = sub[0] + ([0] * nullbytes) + sub[1]
140    else:
141        raise ValueError('Could not parse address %s' % addr)
142    return bytearray(addr).hex()
143
144def test_ipv6_local():
145    '''
146    Check for (local) IPv6 support.
147    '''
148    import socket
149    # By using SOCK_DGRAM this will not actually make a connection, but it will
150    # fail if there is no route to IPv6 localhost.
151    have_ipv6 = True
152    try:
153        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
154        s.connect(('::1', 1))
155    except socket.error:
156        have_ipv6 = False
157    return have_ipv6
158