1import ipaddress
2import socket
3
4import pytest
5from atf_python.sys.net.tools import ToolsHelper
6from atf_python.sys.net.vnet import IfaceFactory
7from atf_python.sys.net.vnet import SingleVnetTestTemplate
8from atf_python.sys.netlink.attrs import NlAttrIp
9from atf_python.sys.netlink.attrs import NlAttrU32
10from atf_python.sys.netlink.base_headers import NlmBaseFlags
11from atf_python.sys.netlink.base_headers import NlmGetFlags
12from atf_python.sys.netlink.base_headers import NlmNewFlags
13from atf_python.sys.netlink.base_headers import NlMsgType
14from atf_python.sys.netlink.netlink import NetlinkTestTemplate
15from atf_python.sys.netlink.netlink_route import NetlinkRtMessage
16from atf_python.sys.netlink.netlink_route import NlRtMsgType
17from atf_python.sys.netlink.netlink_route import RtattrType
18from atf_python.sys.netlink.utils import NlConst
19
20
21class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
22    IPV6_PREFIXES = ["2001:db8::1/64"]
23
24    def setup_method(self, method):
25        super().setup_method(method)
26        self.setup_netlink(NlConst.NETLINK_ROUTE)
27
28    @pytest.mark.timeout(5)
29    def test_add_route6_ll_gw(self):
30        epair_ifname = self.vnet.iface_alias_map["if1"].name
31        epair_ifindex = socket.if_nametoindex(epair_ifname)
32
33        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
34        msg.set_request()
35        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
36        msg.base_hdr.rtm_family = socket.AF_INET6
37        msg.base_hdr.rtm_dst_len = 64
38        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
39        msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))
40        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))
41
42        rx_msg = self.get_reply(msg)
43        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
44        assert rx_msg.error_code == 0
45
46        ToolsHelper.print_net_debug()
47        ToolsHelper.print_output("netstat -6onW")
48
49    @pytest.mark.timeout(5)
50    def test_add_route6_ll_if_gw(self):
51        self.require_module("if_tun")
52        tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
53        tun_ifindex = socket.if_nametoindex(tun_ifname)
54
55        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
56        msg.set_request()
57        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
58        msg.base_hdr.rtm_family = socket.AF_INET6
59        msg.base_hdr.rtm_dst_len = 64
60        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
61        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
62
63        rx_msg = self.get_reply(msg)
64        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
65        assert rx_msg.error_code == 0
66
67        ToolsHelper.print_net_debug()
68        ToolsHelper.print_output("netstat -6onW")
69
70    @pytest.mark.timeout(5)
71    def test_add_route4_ll_if_gw(self):
72        self.require_module("if_tun")
73        tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
74        tun_ifindex = socket.if_nametoindex(tun_ifname)
75
76        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
77        msg.set_request()
78        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
79        msg.base_hdr.rtm_family = socket.AF_INET
80        msg.base_hdr.rtm_dst_len = 32
81        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1"))
82        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
83
84        rx_msg = self.get_reply(msg)
85        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
86        assert rx_msg.error_code == 0
87
88        ToolsHelper.print_net_debug()
89        ToolsHelper.print_output("netstat -4onW")
90
91    @pytest.mark.timeout(20)
92    def test_buffer_override(self):
93        msg_flags = (
94            NlmBaseFlags.NLM_F_ACK.value
95            | NlmBaseFlags.NLM_F_REQUEST.value
96            | NlmNewFlags.NLM_F_CREATE.value
97        )
98
99        num_routes = 1000
100        base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
101        for i in range(num_routes):
102            base_address[7] = i % 256
103            base_address[6] = i // 256
104            prefix_address = ipaddress.IPv6Address(bytes(base_address))
105
106            msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
107            msg.nl_hdr.nlmsg_flags = msg_flags
108            msg.base_hdr.rtm_family = socket.AF_INET6
109            msg.base_hdr.rtm_dst_len = 65
110            msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
111            msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
112
113            self.write_message(msg, silent=True)
114            rx_msg = self.read_message(silent=True)
115            assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116            assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
117            assert rx_msg.error_code == 0
118        # Now, dump
119        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
120        msg.nl_hdr.nlmsg_flags = (
121            NlmBaseFlags.NLM_F_ACK.value
122            | NlmBaseFlags.NLM_F_REQUEST.value
123            | NlmGetFlags.NLM_F_ROOT.value
124            | NlmGetFlags.NLM_F_MATCH.value
125        )
126        msg.base_hdr.rtm_family = socket.AF_INET6
127        self.write_message(msg)
128        num_received = 0
129        while True:
130            rx_msg = self.read_message(silent=True)
131            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
132                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
133                    if rx_msg.error_code != 0:
134                        raise ValueError(
135                            "unable to dump routes: error {}".format(rx_msg.error_code)
136                        )
137                if rx_msg.is_type(NlMsgType.NLMSG_DONE):
138                    break
139                if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
140                    if rx_msg.base_hdr.rtm_dst_len == 65:
141                        num_received += 1
142        assert num_routes == num_received
143