1import socket
2
3import pytest
4from atf_python.sys.net.vnet import SingleVnetTestTemplate
5from atf_python.sys.netlink.netlink import NetlinkTestTemplate
6from atf_python.sys.netlink.netlink_route import NdAttrType
7from atf_python.sys.netlink.netlink_route import NetlinkNdMessage
8from atf_python.sys.netlink.netlink_route import NlRtMsgType
9from atf_python.sys.netlink.utils import NlConst
10
11
12class TestRtNlNeigh(NetlinkTestTemplate, SingleVnetTestTemplate):
13    def setup_method(self, method):
14        method_name = method.__name__
15        if "4" in method_name:
16            self.IPV4_PREFIXES = ["192.0.2.1/24"]
17        if "6" in method_name:
18            self.IPV6_PREFIXES = ["2001:db8::1/64"]
19        super().setup_method(method)
20        self.setup_netlink(NlConst.NETLINK_ROUTE)
21
22    def filter_iface(self, family, num_items):
23        epair_ifname = self.vnet.iface_alias_map["if1"].name
24        epair_ifindex = socket.if_nametoindex(epair_ifname)
25
26        msg = NetlinkNdMessage(self.helper, NlRtMsgType.RTM_GETNEIGH)
27        msg.set_request()
28        msg.base_hdr.ndm_family = family
29        msg.base_hdr.ndm_ifindex = epair_ifindex
30        self.write_message(msg)
31
32        ret = []
33        for rx_msg in self.read_msg_list(
34            msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWNEIGH
35        ):
36            ifname = socket.if_indextoname(rx_msg.base_hdr.ndm_ifindex)
37            family = rx_msg.base_hdr.ndm_family
38            assert ifname == epair_ifname
39            assert family == family
40            assert rx_msg.get_nla(NdAttrType.NDA_DST) is not None
41            assert rx_msg.get_nla(NdAttrType.NDA_LLADDR) is not None
42            ret.append(rx_msg)
43        assert len(ret) == num_items
44
45    @pytest.mark.timeout(5)
46    def test_6_filter_iface(self):
47        """Tests that listing outputs all nd6 records"""
48        return self.filter_iface(socket.AF_INET6, 2)
49
50    @pytest.mark.timeout(5)
51    def test_4_filter_iface(self):
52        """Tests that listing outputs all arp records"""
53        return self.filter_iface(socket.AF_INET, 1)
54