1import ipaddress
2import socket
3import struct
4
5from atf_python.sys.net.vnet import SingleVnetTestTemplate
6from atf_python.sys.netlink.base_headers import NlmBaseFlags
7from atf_python.sys.netlink.base_headers import Nlmsghdr
8from atf_python.sys.netlink.netlink import NetlinkTestTemplate
9from atf_python.sys.netlink.netlink_route import IfattrType
10from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
11from atf_python.sys.netlink.netlink_route import NlRtMsgType
12from atf_python.sys.netlink.netlink_route import RtScope
13from atf_python.sys.netlink.utils import NlConst
14
15
16class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate):
17    def setup_method(self, method):
18        method_name = method.__name__
19        if "4" in method_name:
20            self.IPV4_PREFIXES = ["192.0.2.1/24"]
21        if "6" in method_name:
22            self.IPV6_PREFIXES = ["2001:db8::1/64"]
23        super().setup_method(method)
24        self.setup_netlink(NlConst.NETLINK_ROUTE)
25
26    def test_46_nofilter(self):
27        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
28        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
29        msg.nl_hdr.nlmsg_flags = (
30            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
31        )
32        self.write_message(msg)
33
34        ret = []
35        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
36            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
37            family = rx_msg.base_hdr.ifa_family
38            ret.append((ifname, family, rx_msg))
39
40        ifname = "lo0"
41        assert len([r for r in ret if r[0] == ifname]) > 0
42
43        ifname = self.vnet.iface_alias_map["if1"].name
44        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
45        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
46
47    def test_46_filter_iface(self):
48        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
49        epair_ifname = self.vnet.iface_alias_map["if1"].name
50
51        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
52        msg.nl_hdr.nlmsg_flags = (
53            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
54        )
55        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
56        self.write_message(msg)
57
58        ret = []
59        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
60            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
61            family = rx_msg.base_hdr.ifa_family
62            ret.append((ifname, family, rx_msg))
63
64        ifname = epair_ifname
65        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
66        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
67        assert len(ret) == 3
68
69    def test_46_filter_family_compat(self):
70        """Tests that family filtering works with the stripped header"""
71
72        hdr = Nlmsghdr(
73                nlmsg_len=17,
74                nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
75                nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
76                nlmsg_seq=self.helper.get_seq()
77                )
78        data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
79        self.nlsock.write_data(data)
80
81        ret = []
82        for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
83            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
84            family = rx_msg.base_hdr.ifa_family
85            ret.append((ifname, family, rx_msg))
86        assert len(ret) == 2
87
88    def filter_iface_family(self, family, num_items):
89        """Tests that listing outputs IPv4 for the specific interface"""
90        epair_ifname = self.vnet.iface_alias_map["if1"].name
91
92        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
93        msg.nl_hdr.nlmsg_flags = (
94            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
95        )
96        msg.base_hdr.ifa_family = family
97        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
98        self.write_message(msg)
99
100        ret = []
101        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
102            assert family == rx_msg.base_hdr.ifa_family
103            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
104            ret.append(rx_msg)
105        assert len(ret) == num_items
106        return ret
107
108    def test_4_broadcast(self):
109        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
110        ret = self.filter_iface_family(socket.AF_INET, 1)
111        # Should be 192.0.2.1/24
112        msg = ret[0]
113        # Family and ifindex has been checked already
114        assert msg.base_hdr.ifa_prefixlen == 24
115        # Ignore IFA_FLAGS for now
116        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
117
118        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1"
119        assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1"
120        assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255"
121
122        epair_ifname = self.vnet.iface_alias_map["if1"].name
123        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
124
125    def test_6_broadcast(self):
126        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
127        ret = self.filter_iface_family(socket.AF_INET6, 2)
128        # Should be 192.0.2.1/24
129        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
130            (gmsg, lmsg) = ret
131        else:
132            (lmsg, gmsg) = ret
133        # Start with global ( 2001:db8::1/64 )
134        msg = gmsg
135        # Family and ifindex has been checked already
136        assert msg.base_hdr.ifa_prefixlen == 64
137        # Ignore IFA_FLAGS for now
138        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
139
140        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1"
141        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
142        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
143
144        epair_ifname = self.vnet.iface_alias_map["if1"].name
145        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
146
147        # Local: fe80::/64
148        msg = lmsg
149        assert msg.base_hdr.ifa_prefixlen == 64
150        # Ignore IFA_FLAGS for now
151        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
152
153        addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr)
154        assert addr.is_link_local
155        # Verify that ifindex is not emmbedded
156        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
157        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
158        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None
159
160        epair_ifname = self.vnet.iface_alias_map["if1"].name
161        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname
162