1import ipaddress
2import socket
3
4import pytest
5from atf_python.sys.net.tools import ToolsHelper
6from atf_python.sys.net.vnet import IfaceFactory
7from atf_python.sys.net.vnet import SingleVnetTestTemplate
8from atf_python.sys.netlink.attrs import NlAttrIp
9from atf_python.sys.netlink.attrs import NlAttrU32
10from atf_python.sys.netlink.base_headers import NlmBaseFlags
11from atf_python.sys.netlink.base_headers import NlmGetFlags
12from atf_python.sys.netlink.base_headers import NlmNewFlags
13from atf_python.sys.netlink.base_headers import NlMsgType
14from atf_python.sys.netlink.netlink import NetlinkTestTemplate
15from atf_python.sys.netlink.netlink_route import NetlinkRtMessage
16from atf_python.sys.netlink.netlink_route import NlRtMsgType
17from atf_python.sys.netlink.netlink_route import RtattrType
18from atf_python.sys.netlink.utils import NlConst
19
20
21class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
22    IPV6_PREFIXES = ["2001:db8::1/64"]
23
24    def setup_method(self, method):
25        super().setup_method(method)
26        self.setup_netlink(NlConst.NETLINK_ROUTE)
27
28    @pytest.mark.timeout(5)
29    def test_add_route6_ll_gw(self):
30        epair_ifname = self.vnet.iface_alias_map["if1"].name
31        epair_ifindex = socket.if_nametoindex(epair_ifname)
32
33        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
34        msg.set_request()
35        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
36        msg.base_hdr.rtm_family = socket.AF_INET6
37        msg.base_hdr.rtm_dst_len = 64
38        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
39        msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))
40        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))
41
42        rx_msg = self.get_reply(msg)
43        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
44        assert rx_msg.error_code == 0
45
46        ToolsHelper.print_net_debug()
47        ToolsHelper.print_output("netstat -6onW")
48
49    @pytest.mark.timeout(5)
50    def test_add_route6_ll_if_gw(self):
51        tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
52        tun_ifindex = socket.if_nametoindex(tun_ifname)
53
54        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
55        msg.set_request()
56        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
57        msg.base_hdr.rtm_family = socket.AF_INET6
58        msg.base_hdr.rtm_dst_len = 64
59        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
60        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
61
62        rx_msg = self.get_reply(msg)
63        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
64        assert rx_msg.error_code == 0
65
66        ToolsHelper.print_net_debug()
67        ToolsHelper.print_output("netstat -6onW")
68
69    @pytest.mark.timeout(5)
70    def test_add_route4_ll_if_gw(self):
71        tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
72        tun_ifindex = socket.if_nametoindex(tun_ifname)
73
74        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
75        msg.set_request()
76        msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
77        msg.base_hdr.rtm_family = socket.AF_INET
78        msg.base_hdr.rtm_dst_len = 32
79        msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1"))
80        msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
81
82        rx_msg = self.get_reply(msg)
83        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
84        assert rx_msg.error_code == 0
85
86        ToolsHelper.print_net_debug()
87        ToolsHelper.print_output("netstat -4onW")
88
89    @pytest.mark.timeout(20)
90    def test_buffer_override(self):
91        msg_flags = (
92            NlmBaseFlags.NLM_F_ACK.value
93            | NlmBaseFlags.NLM_F_REQUEST.value
94            | NlmNewFlags.NLM_F_CREATE.value
95        )
96
97        num_routes = 1000
98        base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
99        for i in range(num_routes):
100            base_address[7] = i % 256
101            base_address[6] = i // 256
102            prefix_address = ipaddress.IPv6Address(bytes(base_address))
103
104            msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
105            msg.nl_hdr.nlmsg_flags = msg_flags
106            msg.base_hdr.rtm_family = socket.AF_INET6
107            msg.base_hdr.rtm_dst_len = 65
108            msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
109            msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
110
111            self.write_message(msg, silent=True)
112            rx_msg = self.read_message(silent=True)
113            assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
114            assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
115            assert rx_msg.error_code == 0
116        # Now, dump
117        msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
118        msg.nl_hdr.nlmsg_flags = (
119            NlmBaseFlags.NLM_F_ACK.value
120            | NlmBaseFlags.NLM_F_REQUEST.value
121            | NlmGetFlags.NLM_F_ROOT.value
122            | NlmGetFlags.NLM_F_MATCH.value
123        )
124        msg.base_hdr.rtm_family = socket.AF_INET6
125        self.write_message(msg)
126        num_received = 0
127        while True:
128            rx_msg = self.read_message(silent=True)
129            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
130                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
131                    if rx_msg.error_code != 0:
132                        raise ValueError(
133                            "unable to dump routes: error {}".format(rx_msg.error_code)
134                        )
135                if rx_msg.is_type(NlMsgType.NLMSG_DONE):
136                    break
137                if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
138                    if rx_msg.base_hdr.rtm_dst_len == 65:
139                        num_received += 1
140        assert num_routes == num_received
141