1import ipaddress
2import socket
3import struct
4
5from atf_python.sys.net.netlink import IfattrType
6from atf_python.sys.net.netlink import NetlinkIfaMessage
7from atf_python.sys.net.netlink import NetlinkTestTemplate
8from atf_python.sys.net.netlink import NlConst
9from atf_python.sys.net.netlink import NlHelper
10from atf_python.sys.net.netlink import NlmBaseFlags
11from atf_python.sys.net.netlink import Nlmsghdr
12from atf_python.sys.net.netlink import NlMsgType
13from atf_python.sys.net.netlink import NlRtMsgType
14from atf_python.sys.net.netlink import Nlsock
15from atf_python.sys.net.netlink import RtScope
16from atf_python.sys.net.vnet import SingleVnetTestTemplate
17
18
19class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate):
20    def setup_method(self, method):
21        method_name = method.__name__
22        if "4" in method_name:
23            self.IPV4_PREFIXES = ["192.0.2.1/24"]
24        if "6" in method_name:
25            self.IPV6_PREFIXES = ["2001:db8::1/64"]
26        super().setup_method(method)
27        self.setup_netlink(NlConst.NETLINK_ROUTE)
28
29    def test_46_nofilter(self):
30        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
31        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
32        msg.nl_hdr.nlmsg_flags = (
33            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
34        )
35        self.write_message(msg)
36
37        ret = []
38        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
39            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
40            family = rx_msg.base_hdr.ifa_family
41            ret.append((ifname, family, rx_msg))
42
43        ifname = "lo0"
44        assert len([r for r in ret if r[0] == ifname]) > 0
45
46        ifname = self.vnet.iface_alias_map["if1"].name
47        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
48        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
49
50    def test_46_filter_iface(self):
51        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
52        epair_ifname = self.vnet.iface_alias_map["if1"].name
53
54        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
55        msg.nl_hdr.nlmsg_flags = (
56            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
57        )
58        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
59        self.write_message(msg)
60
61        ret = []
62        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
63            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
64            family = rx_msg.base_hdr.ifa_family
65            ret.append((ifname, family, rx_msg))
66
67        ifname = epair_ifname
68        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
69        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
70        assert len(ret) == 3
71
72    def test_46_filter_family_compat(self):
73        """Tests that family filtering works with the stripped header"""
74
75        hdr = Nlmsghdr(
76                nlmsg_len=17,
77                nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
78                nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
79                nlmsg_seq=self.helper.get_seq()
80                )
81        data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
82        self.nlsock.write_data(data)
83
84        ret = []
85        for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
86            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
87            family = rx_msg.base_hdr.ifa_family
88            ret.append((ifname, family, rx_msg))
89        assert len(ret) == 2
90
91    def filter_iface_family(self, family, num_items):
92        """Tests that listing outputs IPv4 for the specific interface"""
93        epair_ifname = self.vnet.iface_alias_map["if1"].name
94
95        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
96        msg.nl_hdr.nlmsg_flags = (
97            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
98        )
99        msg.base_hdr.ifa_family = family
100        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
101        self.write_message(msg)
102
103        ret = []
104        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
105            assert family == rx_msg.base_hdr.ifa_family
106            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
107            ret.append(rx_msg)
108        assert len(ret) == num_items
109        return ret
110
111    def test_4_broadcast(self):
112        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
113        ret = self.filter_iface_family(socket.AF_INET, 1)
114        # Should be 192.0.2.1/24
115        msg = ret[0]
116        # Family and ifindex has been checked already
117        assert msg.base_hdr.ifa_prefixlen == 24
118        # Ignore IFA_FLAGS for now
119        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
120
121        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1"
122        assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1"
123        assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255"
124
125        epair_ifname = self.vnet.iface_alias_map["if1"].name
126        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
127
128    def test_6_broadcast(self):
129        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
130        ret = self.filter_iface_family(socket.AF_INET6, 2)
131        # Should be 192.0.2.1/24
132        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
133            (gmsg, lmsg) = ret
134        else:
135            (lmsg, gmsg) = ret
136        # Start with global ( 2001:db8::1/64 )
137        msg = gmsg
138        # Family and ifindex has been checked already
139        assert msg.base_hdr.ifa_prefixlen == 64
140        # Ignore IFA_FLAGS for now
141        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
142
143        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1"
144        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
145        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
146
147        epair_ifname = self.vnet.iface_alias_map["if1"].name
148        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
149
150        # Local: fe80::/64
151        msg = lmsg
152        assert msg.base_hdr.ifa_prefixlen == 64
153        # Ignore IFA_FLAGS for now
154        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
155
156        addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr)
157        assert addr.is_link_local
158        # Verify that ifindex is not emmbedded
159        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
160        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
161        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
162
163        epair_ifname = self.vnet.iface_alias_map["if1"].name
164        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
165