1import errno
2from ipaddress import IPv4Network, IPv6Network, ip_address
3from socket import AF_INET, AF_INET6
4
5from pyroute2 import IPRoute
6from pyroute2.netlink.rtnl import ndmsg
7from pyroute2.netlink.exceptions import NetlinkError
8
9
10class InterfaceManager:
11    """Wrapper for the `ip` command."""
12
13    def __init__(self,
14                 interface="deckard",
15                 ip4_range=IPv4Network('127.127.0.0/16'),
16                 ip6_range=IPv6Network('fd00:dec::/32')):
17        self.ip4_internal_range = ip4_range
18        self.ip6_internal_range = ip6_range
19        self.ip4_iterator = (host for host in ip4_range)
20        self.ip6_iterator = (host for host in ip6_range)
21        self.added_addresses = set()
22        self.interface = interface
23
24        self._ip = IPRoute()
25        try:
26            self._dev = self._setup_interface()
27        except NetlinkError as ex:
28            raise RuntimeError(f"Couldn't set interface `{self.interface}` up.") from ex
29
30    def _setup_interface(self):
31        """Set up a dummy interface with default route as well as loopback.
32           This is done so the resulting PCAP contains as much of the communication
33           as possible (including ICMP Destination unreachable packets etc.)."""
34
35        # Create and set the interface up.
36        self._ip.link("add", ifname=self.interface, kind="dummy")
37        dev = self._ip.link_lookup(ifname=self.interface)[0]
38        self._ip.link("set", index=dev, state="up")
39
40        # Set up default route for both IPv6 and IPv4
41        self._ip.neigh("add", dst='169.254.1.1', lladdr='21:21:21:21:21:21',
42                       state=ndmsg.states['permanent'], ifindex=dev)
43        self._ip.neigh("add", family=AF_INET6, dst='fe80::1', lladdr='21:21:21:21:21:21',
44                       state=ndmsg.states['permanent'], ifindex=dev)
45        self._ip.addr("add", index=dev, address="169.254.1.2", mask=24)
46        self._ip.route("add", gateway="169.254.1.1", oif=dev)
47        self._ip.route("add", family=AF_INET6, gateway='fe80::1', oif=dev)
48
49        # Set the loopback up as well since some of the packets go through there.
50        lo = self._ip.link_lookup(ifname="lo")[0]
51        self._ip.link("set", index=lo, state="up")
52
53        # Return internal interface ID for later use
54        return dev
55
56    def assign_internal_address(self, sockfamily) -> str:
57        """Add and return new address from the internal range"""
58        try:
59            if sockfamily == AF_INET:
60                a = str(next(self.ip4_iterator))
61            elif sockfamily == AF_INET6:
62                a = str(next(self.ip6_iterator))
63            else:
64                raise ValueError(f"Unknown sockfamily {sockfamily}")
65        except StopIteration as ex:
66            raise RuntimeError("Out of addresses.") from ex
67
68        self._add_address(a)
69        return a
70
71    def add_address(self, address: str, check_duplicate=False):
72        """Add an arbitrary new address to the interface"""
73        if address in self.added_addresses and check_duplicate:
74            raise ValueError(f"Tried to add duplicate address {address}")
75        if ip_address(address) in self.ip4_internal_range or \
76           ip_address(address) in self.ip6_internal_range:
77            raise ValueError(f"Address {address} in the internally reserved range.")
78        self._add_address(address)
79
80    def _add_address(self, address):
81        if ":" in address:
82            mask = 128
83        else:
84            mask = 32
85        try:
86            self._ip.addr("add", index=self._dev, address=address, mask=mask, nodad=True)
87        except NetlinkError as ex:
88            if ex.code != errno.EEXIST:  # 'RTNETLINK answers: File exists' is OK here
89                raise ValueError(f"Couldn't add {address}") from ex
90
91        self.added_addresses.add(address)
92