1import ipaddress
2import socket
3import struct
4
5import pytest
6from atf_python.sys.net.vnet import SingleVnetTestTemplate
7from atf_python.sys.netlink.attrs import NlAttr
8from atf_python.sys.netlink.attrs import NlAttrIp
9from atf_python.sys.netlink.attrs import NlAttrNested
10from atf_python.sys.netlink.attrs import NlAttrU32
11from atf_python.sys.netlink.base_headers import NlmBaseFlags
12from atf_python.sys.netlink.base_headers import NlmNewFlags
13from atf_python.sys.netlink.base_headers import Nlmsghdr
14from atf_python.sys.netlink.message import NlMsgType
15from atf_python.sys.netlink.netlink import NetlinkTestTemplate
16from atf_python.sys.netlink.netlink import Nlsock
17from atf_python.sys.netlink.netlink_generic import CarpAttrType
18from atf_python.sys.netlink.netlink_generic import CarpGenMessage
19from atf_python.sys.netlink.netlink_generic import CarpMsgType
20from atf_python.sys.netlink.netlink_route import IfaAttrType
21from atf_python.sys.netlink.netlink_route import IfaCacheInfo
22from atf_python.sys.netlink.netlink_route import IfafAttrType
23from atf_python.sys.netlink.netlink_route import IfafFlags6
24from atf_python.sys.netlink.netlink_route import IfaFlags
25from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
26from atf_python.sys.netlink.netlink_route import NlRtMsgType
27from atf_python.sys.netlink.netlink_route import RtScope
28from atf_python.sys.netlink.utils import enum_or_int
29from atf_python.sys.netlink.utils import NlConst
30
31
32class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
33    def setup_method(self, method):
34        method_name = method.__name__
35        if "4" in method_name:
36            self.IPV4_PREFIXES = ["192.0.2.1/24"]
37        if "6" in method_name:
38            self.IPV6_PREFIXES = ["2001:db8::1/64"]
39        super().setup_method(method)
40        self.setup_netlink(NlConst.NETLINK_ROUTE)
41
42    def test_46_nofilter(self):
43        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
44        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
45        msg.set_request()
46        self.write_message(msg)
47
48        ret = []
49        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
50            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
51            family = rx_msg.base_hdr.ifa_family
52            ret.append((ifname, family, rx_msg))
53
54        ifname = "lo0"
55        assert len([r for r in ret if r[0] == ifname]) > 0
56
57        ifname = self.vnet.iface_alias_map["if1"].name
58        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
59        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
60
61    def test_46_filter_iface(self):
62        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
63        epair_ifname = self.vnet.iface_alias_map["if1"].name
64
65        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
66        msg.set_request()
67        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
68        self.write_message(msg)
69
70        ret = []
71        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
72            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
73            family = rx_msg.base_hdr.ifa_family
74            ret.append((ifname, family, rx_msg))
75
76        ifname = epair_ifname
77        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
78        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
79        assert len(ret) == 3
80
81    def test_46_filter_family_compat(self):
82        """Tests that family filtering works with the stripped header"""
83
84        hdr = Nlmsghdr(
85            nlmsg_len=17,
86            nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
87            nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
88            nlmsg_seq=self.helper.get_seq(),
89        )
90        data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
91        self.nlsock.write_data(data)
92
93        ret = []
94        for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
95            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
96            family = rx_msg.base_hdr.ifa_family
97            ret.append((ifname, family, rx_msg))
98        assert len(ret) == 2
99
100    def filter_iface_family(self, family, num_items):
101        """Tests that listing outputs IPv4 for the specific interface"""
102        epair_ifname = self.vnet.iface_alias_map["if1"].name
103
104        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
105        msg.set_request()
106        msg.base_hdr.ifa_family = family
107        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
108        self.write_message(msg)
109
110        ret = []
111        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
112            assert family == rx_msg.base_hdr.ifa_family
113            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
114            ret.append(rx_msg)
115        assert len(ret) == num_items
116        return ret
117
118    def test_4_broadcast(self):
119        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
120        ret = self.filter_iface_family(socket.AF_INET, 1)
121        # Should be 192.0.2.1/24
122        msg = ret[0]
123        # Family and ifindex has been checked already
124        assert msg.base_hdr.ifa_prefixlen == 24
125        # Ignore IFA_FLAGS for now
126        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
127
128        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
129        assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
130        assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"
131
132        epair_ifname = self.vnet.iface_alias_map["if1"].name
133        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
134
135    def test_6_broadcast(self):
136        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
137        ret = self.filter_iface_family(socket.AF_INET6, 2)
138        # Should be 192.0.2.1/24
139        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
140            (gmsg, lmsg) = ret
141        else:
142            (lmsg, gmsg) = ret
143        # Start with global ( 2001:db8::1/64 )
144        msg = gmsg
145        # Family and ifindex has been checked already
146        assert msg.base_hdr.ifa_prefixlen == 64
147        # Ignore IFA_FLAGS for now
148        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
149
150        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
151        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
152        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
153
154        epair_ifname = self.vnet.iface_alias_map["if1"].name
155        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
156
157        # Local: fe80::/64
158        msg = lmsg
159        assert msg.base_hdr.ifa_prefixlen == 64
160        # Ignore IFA_FLAGS for now
161        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
162
163        addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
164        assert addr.is_link_local
165        # Verify that ifindex is not emmbedded
166        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
167        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
168        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
169
170        epair_ifname = self.vnet.iface_alias_map["if1"].name
171        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
172
173
174class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
175    def setup_method(self, method):
176        super().setup_method(method)
177        self.setup_netlink(NlConst.NETLINK_ROUTE)
178
179    def send_check_success(self, msg):
180        rx_msg = self.get_reply(msg)
181        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
182        assert rx_msg.error_code == 0
183
184    @staticmethod
185    def get_family_from_ip(ip):
186        if ip.version == 4:
187            return socket.AF_INET
188        return socket.AF_INET6
189
190    def create_msg(self, ifa):
191        iface = self.vnet.iface_alias_map["if1"]
192
193        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
194        msg.set_request()
195        msg.nl_hdr.nlmsg_flags |= (
196            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
197        )
198        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
199        msg.base_hdr.ifa_index = iface.ifindex
200        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
201        return msg
202
203    def get_ifa_list(self, ifindex=0, family=0):
204        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
205        msg.set_request()
206        msg.base_hdr.ifa_family = family
207        msg.base_hdr.ifa_index = ifindex
208        self.write_message(msg)
209        return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)
210
211    def find_msg_by_ifa(self, msg_list, ip):
212        for msg in msg_list:
213            if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
214                return msg
215        return None
216
217    def setup_dummy_carp(self, ifindex: int, vhid: int):
218        self.require_module("carp")
219
220        nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
221        family_id = nlsock.get_genl_family_id("carp")
222
223        msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
224        msg.set_request()
225        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
226        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
227        rx_msg = nlsock.get_reply(msg)
228
229        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
230        assert rx_msg.error_code == 0
231
232
233class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
234    def test_add_4(self):
235        """Tests IPv4 address addition to the standard broadcast interface"""
236        ifa = ipaddress.ip_interface("192.0.2.1/24")
237        ifa_brd = ifa.network.broadcast_address
238        iface = self.vnet.iface_alias_map["if1"]
239
240        msg = self.create_msg(ifa)
241        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
242        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
243
244        self.send_check_success(msg)
245
246        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
247        assert len(lst) == 1
248        rx_msg = lst[0]
249
250        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
251        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
252
253        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
254        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
255        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
256
257    @pytest.mark.parametrize(
258        "brd",
259        [
260            pytest.param((32, True, "192.0.2.1"), id="auto_32"),
261            pytest.param((31, True, "255.255.255.255"), id="auto_31"),
262            pytest.param((30, True, "192.0.2.3"), id="auto_30"),
263            pytest.param((30, False, "192.0.2.2"), id="custom_30"),
264            pytest.param((24, False, "192.0.2.7"), id="custom_24"),
265        ],
266    )
267    def test_add_4_brd(self, brd):
268        """Tests proper broadcast setup when adding IPv4 ifa"""
269        plen, auto_brd, ifa_brd_str = brd
270        ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
271        iface = self.vnet.iface_alias_map["if1"]
272        ifa_brd = ipaddress.ip_address(ifa_brd_str)
273
274        msg = self.create_msg(ifa)
275        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
276        if not auto_brd:
277            msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
278
279        self.send_check_success(msg)
280
281        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
282        assert len(lst) == 1
283        rx_msg = lst[0]
284
285        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
286        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
287
288        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
289        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
290        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
291
292    def test_add_6(self):
293        ifa = ipaddress.ip_interface("2001:db8::1/64")
294        iface = self.vnet.iface_alias_map["if1"]
295
296        msg = self.create_msg(ifa)
297        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
298
299        self.send_check_success(msg)
300
301        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
302        assert len(lst) == 2
303        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
304        assert rx_msg_gu is not None
305
306        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
307        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
308        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
309
310    def test_add_4_carp(self):
311        ifa = ipaddress.ip_interface("192.0.2.1/24")
312        ifa_brd = ifa.network.broadcast_address
313        iface = self.vnet.iface_alias_map["if1"]
314        vhid = 77
315
316        self.setup_dummy_carp(iface.ifindex, vhid)
317
318        msg = self.create_msg(ifa)
319        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
320        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
321        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
322        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
323
324        self.send_check_success(msg)
325
326        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
327        assert len(lst) == 1
328        rx_msg = lst[0]
329
330        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
331        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
332
333        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
334        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
335        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
336        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
337        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
338
339    def test_add_6_carp(self):
340        ifa = ipaddress.ip_interface("2001:db8::1/64")
341        iface = self.vnet.iface_alias_map["if1"]
342        vhid = 77
343
344        self.setup_dummy_carp(iface.ifindex, vhid)
345
346        msg = self.create_msg(ifa)
347        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
348        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
349        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
350
351        self.send_check_success(msg)
352
353        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
354        assert len(lst) == 2
355        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
356        assert rx_msg_gu is not None
357
358        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
359        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
360        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
361        ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
362        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
363
364    def test_add_6_lifetime(self):
365        ifa = ipaddress.ip_interface("2001:db8::1/64")
366        iface = self.vnet.iface_alias_map["if1"]
367        pref_time = 43200
368        valid_time = 86400
369
370        ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)
371
372        msg = self.create_msg(ifa)
373        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
374        msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))
375
376        self.send_check_success(msg)
377
378        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
379        assert len(lst) == 2
380        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
381        assert rx_msg is not None
382
383        ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
384        assert pref_time - 5 <= ci.ifa_prefered <= pref_time
385        assert valid_time - 5 <= ci.ifa_valid <= valid_time
386        assert ci.cstamp > 0
387        assert ci.tstamp > 0
388        assert ci.tstamp >= ci.cstamp
389
390    @pytest.mark.parametrize(
391        "flags_str",
392        [
393            "autoconf",
394            "deprecated",
395            "autoconf,deprecated",
396            "prefer_source",
397        ],
398    )
399    def test_add_6_flags(self, flags_str):
400        ifa = ipaddress.ip_interface("2001:db8::1/64")
401        iface = self.vnet.iface_alias_map["if1"]
402
403        flags_map = {
404            "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
405            "deprecated": {
406                "nl": IfaFlags.IFA_F_DEPRECATED,
407                "f": IfafFlags6.IN6_IFF_DEPRECATED,
408            },
409            "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
410        }
411        nl_flags = 0
412        f_flags = 0
413
414        for flag_str in flags_str.split(","):
415            d = flags_map.get(flag_str, {})
416            nl_flags |= enum_or_int(d.get("nl", 0))
417            f_flags |= enum_or_int(d.get("f", 0))
418
419        msg = self.create_msg(ifa)
420        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
421        msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
422        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
423        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
424
425        self.send_check_success(msg)
426
427        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
428        assert len(lst) == 2
429        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
430        assert rx_msg is not None
431
432        assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
433        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
434        assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags
435
436    def test_add_4_empty_message(self):
437        """Tests correct failure w/ empty message"""
438        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
439        msg.set_request()
440        msg.nl_hdr.nlmsg_flags |= (
441            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
442        )
443
444        rx_msg = self.get_reply(msg)
445        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
446        assert rx_msg.error_code != 0
447
448    def test_add_4_empty_ifindex(self):
449        """Tests correct failure w/ empty ifindex"""
450        ifa = ipaddress.ip_interface("192.0.2.1/24")
451        ifa_brd = ifa.network.broadcast_address
452
453        msg = self.create_msg(ifa)
454        msg.base_hdr.ifa_index = 0
455        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
456        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
457
458        rx_msg = self.get_reply(msg)
459        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
460        assert rx_msg.error_code != 0
461
462    def test_add_4_empty_addr(self):
463        """Tests correct failure w/ empty address"""
464        ifa = ipaddress.ip_interface("192.0.2.1/24")
465        ifa_brd = ifa.network.broadcast_address
466
467        msg = self.create_msg(ifa)
468        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
469
470        rx_msg = self.get_reply(msg)
471        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
472        assert rx_msg.error_code != 0
473
474    @pytest.mark.parametrize(
475        "ifa_str",
476        [
477            pytest.param("192.0.2.1/32", id="ipv4_host"),
478            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
479            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
480            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
481        ],
482    )
483    @pytest.mark.parametrize(
484        "tlv",
485        [
486            pytest.param("local", id="ifa_local"),
487            pytest.param("address", id="ifa_address"),
488        ],
489    )
490    def test_del(self, tlv, ifa_str):
491        """Tests address deletion from the standard broadcast interface"""
492        ifa = ipaddress.ip_interface(ifa_str)
493        ifa_brd = ifa.network.broadcast_address
494        iface = self.vnet.iface_alias_map["if1"]
495
496        msg = self.create_msg(ifa)
497        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
498        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
499
500        self.send_check_success(msg)
501        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
502        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
503        assert rx_msg is not None
504
505        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
506        msg.set_request()
507        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
508        msg.base_hdr.ifa_index = iface.ifindex
509        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
510
511        if tlv == "local":
512            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
513        if tlv == "address":
514            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
515
516        self.send_check_success(msg)
517        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
518        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
519        assert rx_msg is None
520
521
522class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
523    IFTYPE = "gif"
524
525    @pytest.mark.parametrize(
526        "ifa_pair",
527        [
528            pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
529            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
530            pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
531            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
532            pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
533        ],
534    )
535    def test_add_4(self, ifa_pair):
536        """Tests IPv4 address addition to the p2p interface"""
537        ifa = ipaddress.ip_interface(ifa_pair[0])
538        peer_ip = ipaddress.ip_address(ifa_pair[1])
539        iface = self.vnet.iface_alias_map["if1"]
540
541        msg = self.create_msg(ifa)
542        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
543        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
544
545        self.send_check_success(msg)
546
547        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
548        assert len(lst) == 1
549        rx_msg = lst[0]
550
551        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
552        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
553
554        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
555        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
556
557    @pytest.mark.parametrize(
558        "ifa_pair",
559        [
560            pytest.param(
561                ["2001:db8::1/64", "2001:db8::2"],
562                id="dst_inside_64",
563                marks=pytest.mark.xfail(reason="currently fails"),
564            ),
565            pytest.param(
566                ["2001:db8::1/127", "2001:db8::2"],
567                id="dst_inside_127",
568                marks=pytest.mark.xfail(reason="currently fails"),
569            ),
570            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
571            pytest.param(
572                ["2001:db8::1/64", "2001:db8:2::2"],
573                id="dst_outside_64",
574                marks=pytest.mark.xfail(reason="currently fails"),
575            ),
576        ],
577    )
578    def test_add_6(self, ifa_pair):
579        """Tests IPv6 address addition to the p2p interface"""
580        ifa = ipaddress.ip_interface(ifa_pair[0])
581        peer_ip = ipaddress.ip_address(ifa_pair[1])
582        iface = self.vnet.iface_alias_map["if1"]
583
584        msg = self.create_msg(ifa)
585        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
586        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
587
588        self.send_check_success(msg)
589
590        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
591        assert len(lst) == 2
592        rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
593        assert rx_msg_gu is not None
594
595        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
596        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
597        assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
598        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
599
600    @pytest.mark.parametrize(
601        "ifa_pair",
602        [
603            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
604            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
605            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
606        ],
607    )
608    @pytest.mark.parametrize(
609        "tlv_pair",
610        [
611            pytest.param(["a", ""], id="ifa_addr=addr"),
612            pytest.param(["", "a"], id="ifa_local=addr"),
613            pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
614        ],
615    )
616    def test_del(self, tlv_pair, ifa_pair):
617        """Tests address deletion from the P2P interface"""
618        ifa = ipaddress.ip_interface(ifa_pair[0])
619        peer_ip = ipaddress.ip_address(ifa_pair[1])
620        iface = self.vnet.iface_alias_map["if1"]
621        ifa_addr_str, ifa_local_str = tlv_pair
622
623        msg = self.create_msg(ifa)
624        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
625        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
626
627        self.send_check_success(msg)
628        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
629        rx_msg = self.find_msg_by_ifa(lst, peer_ip)
630        assert rx_msg is not None
631
632        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
633        msg.set_request()
634        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
635        msg.base_hdr.ifa_index = iface.ifindex
636        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
637
638        if "a" in ifa_addr_str:
639            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
640        if "p" in ifa_addr_str:
641            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
642        if "a" in ifa_local_str:
643            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
644        if "p" in ifa_local_str:
645            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))
646
647        self.send_check_success(msg)
648        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
649        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
650        assert rx_msg is None
651
652
653class TestRtNlAddIfaddrLo(RtnlIfaOps):
654    IFTYPE = "lo"
655
656    @pytest.mark.parametrize(
657        "ifa_str",
658        [
659            pytest.param("192.0.2.1/24", id="prefix"),
660            pytest.param("192.0.2.1/32", id="host"),
661        ],
662    )
663    def test_add_4(self, ifa_str):
664        """Tests IPv4 address addition to the loopback interface"""
665        ifa = ipaddress.ip_interface(ifa_str)
666        iface = self.vnet.iface_alias_map["if1"]
667
668        msg = self.create_msg(ifa)
669        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
670
671        self.send_check_success(msg)
672
673        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
674        assert len(lst) == 1
675        rx_msg = lst[0]
676
677        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
678        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
679
680        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
681        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
682
683    @pytest.mark.parametrize(
684        "ifa_str",
685        [
686            pytest.param("2001:db8::1/64", id="gu_prefix"),
687            pytest.param("2001:db8::1/128", id="gu_host"),
688        ],
689    )
690    def test_add_6(self, ifa_str):
691        """Tests IPv6 address addition to the loopback interface"""
692        ifa = ipaddress.ip_interface(ifa_str)
693        iface = self.vnet.iface_alias_map["if1"]
694
695        msg = self.create_msg(ifa)
696        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
697
698        self.send_check_success(msg)
699
700        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
701        assert len(lst) == 2  # link-local should be auto-created as well
702        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
703        assert rx_msg is not None
704
705        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
706        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
707        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
708
709    @pytest.mark.parametrize(
710        "ifa_str",
711        [
712            pytest.param("192.0.2.1/32", id="ipv4_host"),
713            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
714            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
715            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
716        ],
717    )
718    @pytest.mark.parametrize(
719        "tlv",
720        [
721            pytest.param("local", id="ifa_local"),
722            pytest.param("address", id="ifa_address"),
723        ],
724    )
725    def test_del(self, tlv, ifa_str):
726        """Tests address deletion from the loopback interface"""
727        ifa = ipaddress.ip_interface(ifa_str)
728        iface = self.vnet.iface_alias_map["if1"]
729
730        msg = self.create_msg(ifa)
731        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
732
733        self.send_check_success(msg)
734        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
735        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
736        assert rx_msg is not None
737
738        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
739        msg.set_request()
740        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
741        msg.base_hdr.ifa_index = iface.ifindex
742        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
743
744        if tlv == "local":
745            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
746        if tlv == "address":
747            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
748
749        self.send_check_success(msg)
750        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
751        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
752        assert rx_msg is None
753