1import errno 2from ipaddress import IPv4Network, IPv6Network, ip_address 3from socket import AF_INET, AF_INET6 4 5from pyroute2 import IPRoute 6from pyroute2.netlink.rtnl import ndmsg 7from pyroute2.netlink.exceptions import NetlinkError 8 9 10class InterfaceManager: 11 """Wrapper for the `ip` command.""" 12 13 def __init__(self, 14 interface="deckard", 15 ip4_range=IPv4Network('127.127.0.0/16'), 16 ip6_range=IPv6Network('fd00:dec::/32')): 17 self.ip4_internal_range = ip4_range 18 self.ip6_internal_range = ip6_range 19 self.ip4_iterator = (host for host in ip4_range) 20 self.ip6_iterator = (host for host in ip6_range) 21 self.added_addresses = set() 22 self.interface = interface 23 24 self._ip = IPRoute() 25 try: 26 self._dev = self._setup_interface() 27 except NetlinkError as ex: 28 raise RuntimeError(f"Couldn't set interface `{self.interface}` up.") from ex 29 30 def _setup_interface(self): 31 """Set up a dummy interface with default route as well as loopback. 32 This is done so the resulting PCAP contains as much of the communication 33 as possible (including ICMP Destination unreachable packets etc.).""" 34 35 # Create and set the interface up. 36 self._ip.link("add", ifname=self.interface, kind="dummy") 37 dev = self._ip.link_lookup(ifname=self.interface)[0] 38 self._ip.link("set", index=dev, state="up") 39 40 # Set up default route for both IPv6 and IPv4 41 self._ip.neigh("add", dst='169.254.1.1', lladdr='21:21:21:21:21:21', 42 state=ndmsg.states['permanent'], ifindex=dev) 43 self._ip.neigh("add", family=AF_INET6, dst='fe80::1', lladdr='21:21:21:21:21:21', 44 state=ndmsg.states['permanent'], ifindex=dev) 45 self._ip.addr("add", index=dev, address="169.254.1.2", mask=24) 46 self._ip.route("add", gateway="169.254.1.1", oif=dev) 47 self._ip.route("add", family=AF_INET6, gateway='fe80::1', oif=dev) 48 49 # Set the loopback up as well since some of the packets go through there. 50 lo = self._ip.link_lookup(ifname="lo")[0] 51 self._ip.link("set", index=lo, state="up") 52 53 # Return internal interface ID for later use 54 return dev 55 56 def assign_internal_address(self, sockfamily) -> str: 57 """Add and return new address from the internal range""" 58 try: 59 if sockfamily == AF_INET: 60 a = str(next(self.ip4_iterator)) 61 elif sockfamily == AF_INET6: 62 a = str(next(self.ip6_iterator)) 63 else: 64 raise ValueError(f"Unknown sockfamily {sockfamily}") 65 except StopIteration as ex: 66 raise RuntimeError("Out of addresses.") from ex 67 68 self._add_address(a) 69 return a 70 71 def add_address(self, address: str, check_duplicate=False): 72 """Add an arbitrary new address to the interface""" 73 if address in self.added_addresses and check_duplicate: 74 raise ValueError(f"Tried to add duplicate address {address}") 75 if ip_address(address) in self.ip4_internal_range or \ 76 ip_address(address) in self.ip6_internal_range: 77 raise ValueError(f"Address {address} in the internally reserved range.") 78 self._add_address(address) 79 80 def _add_address(self, address): 81 if ":" in address: 82 mask = 128 83 else: 84 mask = 32 85 try: 86 self._ip.addr("add", index=self._dev, address=address, mask=mask, nodad=True) 87 except NetlinkError as ex: 88 if ex.code != errno.EEXIST: # 'RTNETLINK answers: File exists' is OK here 89 raise ValueError(f"Couldn't add {address}") from ex 90 91 self.added_addresses.add(address) 92