1import errno
2import ipaddress
3import socket
4import struct
5import time
6from ctypes import c_byte
7from ctypes import c_uint
8from ctypes import Structure
9
10import pytest
11from atf_python.sys.net.rtsock import SaHelper
12from atf_python.sys.net.tools import ToolsHelper
13from atf_python.sys.net.vnet import run_cmd
14from atf_python.sys.net.vnet import SingleVnetTestTemplate
15from atf_python.sys.net.vnet import VnetTestTemplate
16
17
18class In6Pktinfo(Structure):
19    _fields_ = [
20        ("ipi6_addr", c_byte * 16),
21        ("ipi6_ifindex", c_uint),
22    ]
23
24
25class VerboseSocketServer:
26    def __init__(self, ip: str, port: int, ifname: str = None):
27        self.ip = ip
28        self.port = port
29
30        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
31        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
32        addr = ipaddress.ip_address(ip)
33        if addr.is_link_local and ifname:
34            ifindex = socket.if_nametoindex(ifname)
35            addr_tuple = (ip, port, 0, ifindex)
36        elif addr.is_multicast and ifname:
37            ifindex = socket.if_nametoindex(ifname)
38            mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
39            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
40            print("## JOINED group {} % {}".format(ip, ifname))
41            addr_tuple = ("::", port, 0, ifindex)
42        else:
43            addr_tuple = (ip, port, 0, 0)
44        print("## Listening on [{}]:{}".format(addr_tuple[0], port))
45        s.bind(addr_tuple)
46        self.socket = s
47
48    def recv(self):
49        # data = self.socket.recv(4096)
50        # print("RX: " + data)
51        data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
52        # Assume ancdata has just 1 item
53        info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
54        dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
55        dst_iface = socket.if_indextoname(info.ipi6_ifindex)
56
57        tx_obj = {
58            "data": data,
59            "src_ip": address[0],
60            "dst_ip": dst_ip,
61            "dst_iface": dst_iface,
62        }
63        return tx_obj
64
65
66class BaseTestIP6Ouput(VnetTestTemplate):
67    TOPOLOGY = {
68        "vnet1": {"ifaces": ["if1", "if2", "if3"]},
69        "vnet2": {"ifaces": ["if1", "if2", "if3"]},
70        "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
71        "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
72        "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
73    }
74    DEFAULT_PORT = 45365
75
76    def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
77        """Generic listener that sends first received packet with metadata
78        back to the sender via pipw
79        """
80        ll_data = ToolsHelper.get_linklocals()
81        # Start listener
82        ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
83        vnet.pipe.send(ll_data)
84
85        tx_obj = ss.recv()
86        tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
87        vnet.pipe.send(tx_obj)
88
89
90class TestIP6Output(BaseTestIP6Ouput):
91    def vnet2_handler(self, vnet):
92        ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
93        self._vnet2_handler(vnet, ip, None)
94
95    @pytest.mark.require_user("root")
96    def test_output6_base(self):
97        """Tests simple UDP output"""
98        second_vnet = self.vnet_map["vnet2"]
99
100        # Pick target on if2 vnet2's end
101        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
102        ip = str(ifaddr.ip)
103
104        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
105        data = bytes("AAAA", "utf-8")
106        print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
107
108        # Wait for the child to become ready
109        self.wait_object(second_vnet.pipe)
110        s.sendto(data, (ip, self.DEFAULT_PORT))
111
112        # Wait for the received object
113        rx_obj = self.wait_object(second_vnet.pipe)
114        assert rx_obj["dst_ip"] == ip
115        assert rx_obj["dst_iface_alias"] == "if2"
116
117    @pytest.mark.require_user("root")
118    def test_output6_nhop(self):
119        """Tests UDP output with custom nhop set"""
120        second_vnet = self.vnet_map["vnet2"]
121
122        # Pick target on if2 vnet2's end
123        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
124        ip_dst = str(ifaddr.ip)
125        # Pick nexthop on if1
126        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
127        ip_next = str(ifaddr.ip)
128        sin6_next = SaHelper.ip6_sa(ip_next, 0)
129
130        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
131        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
132
133        # Wait for the child to become ready
134        self.wait_object(second_vnet.pipe)
135        data = bytes("AAAA", "utf-8")
136        s.sendto(data, (ip_dst, self.DEFAULT_PORT))
137
138        # Wait for the received object
139        rx_obj = self.wait_object(second_vnet.pipe)
140        assert rx_obj["dst_ip"] == ip_dst
141        assert rx_obj["dst_iface_alias"] == "if1"
142
143    @pytest.mark.parametrize(
144        "params",
145        [
146            # esrc: src-ip, if: src-interface, esrc: expected-src,
147            # eif: expected-rx-interface
148            pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
149            pytest.param(
150                {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
151                id="iponly1",
152            ),
153            pytest.param(
154                {
155                    "src": "2001:db8:c::1",
156                    "if": "if3",
157                    "ex": errno.EHOSTUNREACH,
158                },
159                id="ipandif",
160            ),
161            pytest.param(
162                {
163                    "src": "2001:db8:c::aaaa",
164                    "ex": errno.EADDRNOTAVAIL,
165                },
166                id="nolocalip",
167            ),
168            pytest.param(
169                {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
170            ),
171        ],
172    )
173    @pytest.mark.require_user("root")
174    def test_output6_pktinfo(self, params):
175        """Tests simple UDP output"""
176        second_vnet = self.vnet_map["vnet2"]
177        vnet = self.vnet
178
179        # Pick target on if2 vnet2's end
180        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
181        dst_ip = str(ifaddr.ip)
182
183        src_ip = params.get("src", "")
184        src_ifname = params.get("if", "")
185        expected_ip = params.get("esrc", "")
186        expected_ifname = params.get("eif", "")
187        errno = params.get("ex", 0)
188
189        pktinfo = In6Pktinfo()
190        if src_ip:
191            for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
192                pktinfo.ipi6_addr[i] = b
193        if src_ifname:
194            os_ifname = vnet.iface_alias_map[src_ifname].name
195            pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
196
197        # Wait for the child to become ready
198        self.wait_object(second_vnet.pipe)
199        data = bytes("AAAA", "utf-8")
200
201        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
202        try:
203            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204            aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
205            s.sendto(data, (dst_ip, self.DEFAULT_PORT))
206        except OSError as e:
207            if not errno:
208                raise
209            assert e.errno == errno
210            print("Correctly raised {}".format(e))
211            return
212
213        # Wait for the received object
214        rx_obj = self.wait_object(second_vnet.pipe)
215
216        assert rx_obj["dst_ip"] == dst_ip
217        if expected_ip:
218            assert rx_obj["src_ip"] == expected_ip
219        if expected_ifname:
220            assert rx_obj["dst_iface_alias"] == expected_ifname
221
222
223class TestIP6OutputLL(BaseTestIP6Ouput):
224    def vnet2_handler(self, vnet):
225        """Generic listener that sends first received packet with metadata
226        back to the sender via pipw
227        """
228        os_ifname = vnet.iface_alias_map["if2"].name
229        ll_data = ToolsHelper.get_linklocals()
230        ll_ip, _ = ll_data[os_ifname][0]
231        self._vnet2_handler(vnet, ll_ip, os_ifname)
232
233    @pytest.mark.require_user("root")
234    def test_output6_linklocal(self):
235        """Tests simple UDP output"""
236        second_vnet = self.vnet_map["vnet2"]
237
238        # Wait for the child to become ready
239        ll_data = self.wait_object(second_vnet.pipe)
240
241        # Pick LL address on if2 vnet2's end
242        ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
243        # Get local interface scope
244        os_ifname = self.vnet.iface_alias_map["if2"].name
245        scopeid = socket.if_nametoindex(os_ifname)
246
247        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
248        data = bytes("AAAA", "utf-8")
249        target = (ip, self.DEFAULT_PORT, 0, scopeid)
250        print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
251
252        s.sendto(data, target)
253
254        # Wait for the received object
255        rx_obj = self.wait_object(second_vnet.pipe)
256        assert rx_obj["dst_ip"] == ip
257        assert rx_obj["dst_iface_alias"] == "if2"
258
259
260class TestIP6OutputNhopLL(BaseTestIP6Ouput):
261    def vnet2_handler(self, vnet):
262        """Generic listener that sends first received packet with metadata
263        back to the sender via pipw
264        """
265        ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
266        self._vnet2_handler(vnet, ip, None)
267
268    @pytest.mark.require_user("root")
269    def test_output6_nhop_linklocal(self):
270        """Tests UDP output with custom link-local nhop set"""
271        second_vnet = self.vnet_map["vnet2"]
272
273        # Wait for the child to become ready
274        ll_data = self.wait_object(second_vnet.pipe)
275
276        # Pick target on if2 vnet2's end
277        ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
278        ip_dst = str(ifaddr.ip)
279        # Pick nexthop on if1
280        ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
281        # Get local interfaces
282        os_ifname = self.vnet.iface_alias_map["if1"].name
283        scopeid = socket.if_nametoindex(os_ifname)
284        sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
285
286        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
287        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
288
289        data = bytes("AAAA", "utf-8")
290        s.sendto(data, (ip_dst, self.DEFAULT_PORT))
291
292        # Wait for the received object
293        rx_obj = self.wait_object(second_vnet.pipe)
294        assert rx_obj["dst_ip"] == ip_dst
295        assert rx_obj["dst_iface_alias"] == "if1"
296
297
298class TestIP6OutputScope(BaseTestIP6Ouput):
299    def vnet2_handler(self, vnet):
300        """Generic listener that sends first received packet with metadata
301        back to the sender via pipw
302        """
303        bind_ip, bind_ifp = self.wait_object(vnet.pipe)
304        if bind_ip is None:
305            os_ifname = vnet.iface_alias_map[bind_ifp].name
306            ll_data = ToolsHelper.get_linklocals()
307            bind_ip, _ = ll_data[os_ifname][0]
308        if bind_ifp is not None:
309            bind_ifp = vnet.iface_alias_map[bind_ifp].name
310        print("## BIND {}%{}".format(bind_ip, bind_ifp))
311        self._vnet2_handler(vnet, bind_ip, bind_ifp)
312
313    @pytest.mark.parametrize(
314        "params",
315        [
316            # sif/dif: source/destination interface (for link-local addr)
317            # sip/dip: source/destination ip (for non-LL addr)
318            # ex: OSError errno that sendto() must raise
319            pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
320            pytest.param(
321                {
322                    "sif": "if1",
323                    "dif": "if2",
324                    "ex": errno.EHOSTUNREACH,
325                },
326                id="ll_differentif1",
327            ),
328            pytest.param(
329                {
330                    "sif": "if1",
331                    "dip": "2001:db8:b::2",
332                    "ex": errno.EHOSTUNREACH,
333                },
334                id="ll_differentif2",
335            ),
336            pytest.param(
337                {
338                    "sip": "2001:db8:a::1",
339                    "dif": "if2",
340                },
341                id="gu_to_ll",
342            ),
343        ],
344    )
345    @pytest.mark.require_user("root")
346    def test_output6_linklocal_scope(self, params):
347        """Tests simple UDP output"""
348        second_vnet = self.vnet_map["vnet2"]
349
350        src_ifp = params.get("sif")
351        src_ip = params.get("sip")
352        dst_ifp = params.get("dif")
353        dst_ip = params.get("dip")
354        errno = params.get("ex", 0)
355
356        # Sent ifp/IP to bind on
357        second_vnet = self.vnet_map["vnet2"]
358        second_vnet.pipe.send((dst_ip, dst_ifp))
359
360        # Wait for the child to become ready
361        ll_data = self.wait_object(second_vnet.pipe)
362
363        if dst_ip is None:
364            # Pick LL address on dst_ifp vnet2's end
365            dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
366            # Get local interface scope
367            os_ifname = self.vnet.iface_alias_map[dst_ifp].name
368            scopeid = socket.if_nametoindex(os_ifname)
369            target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
370        else:
371            target = (dst_ip, self.DEFAULT_PORT, 0, 0)
372
373        # Bind
374        if src_ip is None:
375            ll_data = ToolsHelper.get_linklocals()
376            os_ifname = self.vnet.iface_alias_map[src_ifp].name
377            src_ip, _ = ll_data[os_ifname][0]
378            scopeid = socket.if_nametoindex(os_ifname)
379            src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
380        else:
381            src = (src_ip, self.DEFAULT_PORT, 0, 0)
382
383        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
384        s.bind(src)
385        data = bytes("AAAA", "utf-8")
386        print("## TX packet {} -> {}".format(src, target))
387
388        try:
389            s.sendto(data, target)
390        except OSError as e:
391            if not errno:
392                raise
393            assert e.errno == errno
394            print("Correctly raised {}".format(e))
395            return
396
397        # Wait for the received object
398        rx_obj = self.wait_object(second_vnet.pipe)
399        assert rx_obj["dst_ip"] == dst_ip
400        assert rx_obj["src_ip"] == src_ip
401        # assert rx_obj["dst_iface_alias"] == "if2"
402
403
404class TestIP6OutputMulticast(BaseTestIP6Ouput):
405    def vnet2_handler(self, vnet):
406        group = self.wait_object(vnet.pipe)
407        os_ifname = vnet.iface_alias_map["if2"].name
408        self._vnet2_handler(vnet, group, os_ifname)
409
410    @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
411    @pytest.mark.require_user("root")
412    def test_output6_multicast(self, group_scope):
413        """Tests simple UDP output"""
414        second_vnet = self.vnet_map["vnet2"]
415
416        group = "{}::3456".format(group_scope)
417        second_vnet.pipe.send(group)
418
419        # Pick target on if2 vnet2's end
420        ip = group
421        os_ifname = self.vnet.iface_alias_map["if2"].name
422        ifindex = socket.if_nametoindex(os_ifname)
423        optval = struct.pack("I", ifindex)
424
425        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
426        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
427
428        data = bytes("AAAA", "utf-8")
429
430        # Wait for the child to become ready
431        self.wait_object(second_vnet.pipe)
432
433        print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
434        s.sendto(data, (ip, self.DEFAULT_PORT))
435
436        # Wait for the received object
437        rx_obj = self.wait_object(second_vnet.pipe)
438        assert rx_obj["dst_ip"] == ip
439        assert rx_obj["dst_iface_alias"] == "if2"
440
441
442class TestIP6OutputLoopback(SingleVnetTestTemplate):
443    IPV6_PREFIXES = ["2001:db8:a::1/64"]
444    DEFAULT_PORT = 45365
445
446    @pytest.mark.parametrize(
447        "source_validation",
448        [
449            pytest.param(0, id="no_sav"),
450            pytest.param(1, id="sav"),
451        ],
452    )
453    @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
454    def test_output6_self_tcp(self, scope, source_validation):
455        """Tests IPv6 TCP connection to the local IPv6 address"""
456
457        ToolsHelper.set_sysctl(
458            "net.inet6.ip6.source_address_validation", source_validation
459        )
460
461        if scope == "gu":
462            ip = "2001:db8:a::1"
463            addr_tuple = (ip, self.DEFAULT_PORT)
464        elif scope == "ll":
465            os_ifname = self.vnet.iface_alias_map["if1"].name
466            ifindex = socket.if_nametoindex(os_ifname)
467            ll_data = ToolsHelper.get_linklocals()
468            ip, _ = ll_data[os_ifname][0]
469            addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
470        elif scope == "lo":
471            ip = "::1"
472            ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
473            ifindex = socket.if_nametoindex("lo0")
474            addr_tuple = (ip, self.DEFAULT_PORT)
475        else:
476            assert 0 == 1
477        print("address: {}".format(addr_tuple))
478
479        start = time.perf_counter()
480        ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
481        ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
482        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
483        ss.bind(addr_tuple)
484        ss.listen()
485        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
486        s.settimeout(2.0)
487        s.connect(addr_tuple)
488        conn, from_addr = ss.accept()
489        duration = time.perf_counter() - start
490
491        assert from_addr[0] == ip
492        assert duration < 1.0
493
494    @pytest.mark.parametrize(
495        "source_validation",
496        [
497            pytest.param(0, id="no_sav"),
498            pytest.param(1, id="sav"),
499        ],
500    )
501    @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
502    def test_output6_self_udp(self, scope, source_validation):
503        """Tests IPv6 UDP connection to the local IPv6 address"""
504
505        ToolsHelper.set_sysctl(
506            "net.inet6.ip6.source_address_validation", source_validation
507        )
508
509        if scope == "gu":
510            ip = "2001:db8:a::1"
511            addr_tuple = (ip, self.DEFAULT_PORT)
512        elif scope == "ll":
513            os_ifname = self.vnet.iface_alias_map["if1"].name
514            ifindex = socket.if_nametoindex(os_ifname)
515            ll_data = ToolsHelper.get_linklocals()
516            ip, _ = ll_data[os_ifname][0]
517            addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
518        elif scope == "lo":
519            ip = "::1"
520            ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
521            ifindex = socket.if_nametoindex("lo0")
522            addr_tuple = (ip, self.DEFAULT_PORT)
523        else:
524            assert 0 == 1
525        print("address: {}".format(addr_tuple))
526
527        start = time.perf_counter()
528        ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
529        ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
530        ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
531        ss.bind(addr_tuple)
532        ss.listen()
533        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
534        s.settimeout(2.0)
535        s.connect(addr_tuple)
536        conn, from_addr = ss.accept()
537        duration = time.perf_counter() - start
538
539        assert from_addr[0] == ip
540        assert duration < 1.0
541