1import errno
2import socket
3
4import pytest
5from atf_python.sys.netlink.netlink_route import IflattrType
6from atf_python.sys.netlink.netlink_route import IflinkInfo
7from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
8from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
9from atf_python.sys.netlink.netlink import NetlinkTestTemplate
10from atf_python.sys.netlink.attrs import NlAttrNested
11from atf_python.sys.netlink.attrs import NlAttrStr
12from atf_python.sys.netlink.attrs import NlAttrStrn
13from atf_python.sys.netlink.attrs import NlAttrU16
14from atf_python.sys.netlink.attrs import NlAttrU32
15from atf_python.sys.netlink.utils import NlConst
16from atf_python.sys.netlink.base_headers import NlmBaseFlags
17from atf_python.sys.netlink.base_headers import NlmNewFlags
18from atf_python.sys.netlink.base_headers import NlMsgType
19from atf_python.sys.netlink.netlink_route import NlRtMsgType
20from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
21from atf_python.sys.net.vnet import SingleVnetTestTemplate
22
23
24class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
25    def setup_method(self, method):
26        super().setup_method(method)
27        self.setup_netlink(NlConst.NETLINK_ROUTE)
28
29    def get_interface_byname(self, ifname):
30        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
31        msg.nl_hdr.nlmsg_flags = (
32            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
33        )
34        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
35        self.write_message(msg)
36        while True:
37            rx_msg = self.read_message()
38            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
39                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
40                    if rx_msg.error_code != 0:
41                        raise ValueError("unable to get interface {}".format(ifname))
42                elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
43                    return rx_msg
44                else:
45                    raise ValueError("bad message")
46
47    def test_get_iface_byname_error(self):
48        """Tests error on fetching non-existing interface name"""
49        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
50        msg.nl_hdr.nlmsg_flags = (
51            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
52        )
53        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
54
55        rx_msg = self.get_reply(msg)
56        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
57        assert rx_msg.error_code == errno.ENODEV
58
59    def test_get_iface_byindex_error(self):
60        """Tests error on fetching non-existing interface index"""
61        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
62        msg.nl_hdr.nlmsg_flags = (
63            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
64        )
65        msg.base_hdr.ifi_index = 2147483647
66
67        rx_msg = self.get_reply(msg)
68        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
69        assert rx_msg.error_code == errno.ENODEV
70
71    @pytest.mark.require_user("root")
72    def test_create_iface_plain(self):
73        """Tests loopback creation w/o any parameters"""
74        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
75        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
76        msg.nl_hdr.nlmsg_flags = (
77            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
78        )
79        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
80        msg.add_nla(
81            NlAttrNested(
82                IflattrType.IFLA_LINKINFO,
83                [
84                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
85                ],
86            )
87        )
88
89        rx_msg = self.get_reply(msg)
90        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
91        assert rx_msg.error_code == 0
92
93        self.get_interface_byname("lo10")
94
95    @pytest.mark.require_user("root")
96    def test_create_iface_plain_retvals(self):
97        """Tests loopback creation w/o any parameters"""
98        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
99        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
100        msg.nl_hdr.nlmsg_flags = (
101            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
102        )
103        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
104        msg.add_nla(
105            NlAttrNested(
106                IflattrType.IFLA_LINKINFO,
107                [
108                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
109                ],
110            )
111        )
112
113        rx_msg = self.get_reply(msg)
114        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
115        assert rx_msg.error_code == 0
116        assert rx_msg.cookie is not None
117        nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
118        nla_map = {n.nla_type: n for n in nla_list}
119        assert IflattrType.IFLA_IFNAME.value in nla_map
120        assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
121        assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
122        assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
123
124        lo_msg = self.get_interface_byname("lo10")
125        assert (
126            lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
127        )
128
129    @pytest.mark.require_user("root")
130    def test_create_iface_attrs(self):
131        """Tests interface creation with additional properties"""
132        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
133        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
134        msg.nl_hdr.nlmsg_flags = (
135            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
136        )
137        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
138        msg.add_nla(
139            NlAttrNested(
140                IflattrType.IFLA_LINKINFO,
141                [
142                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
143                ],
144            )
145        )
146
147        # Custom attributes
148        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
149        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
150
151        rx_msg = self.get_reply(msg)
152        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
153        assert rx_msg.error_code == 0
154
155        iface_msg = self.get_interface_byname("lo10")
156        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
157        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
158
159    @pytest.mark.require_user("root")
160    def test_modify_iface_attrs(self):
161        """Tests interface modifications"""
162        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
163        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
164        msg.nl_hdr.nlmsg_flags = (
165            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
166        )
167        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
168        msg.add_nla(
169            NlAttrNested(
170                IflattrType.IFLA_LINKINFO,
171                [
172                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
173                ],
174            )
175        )
176
177        rx_msg = self.get_reply(msg)
178        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
179        assert rx_msg.error_code == 0
180
181        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
182        msg.nl_hdr.nlmsg_flags = (
183            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
184        )
185        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
186
187        # Custom attributes
188        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
189        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
190
191        rx_msg = self.get_reply(msg)
192        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
193        assert rx_msg.error_code == 0
194
195        iface_msg = self.get_interface_byname("lo10")
196        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
197        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
198
199    @pytest.mark.require_user("root")
200    def test_delete_iface(self):
201        """Tests interface modifications"""
202        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
203        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
204        msg.nl_hdr.nlmsg_flags = (
205            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
206        )
207        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
208        msg.add_nla(
209            NlAttrNested(
210                IflattrType.IFLA_LINKINFO,
211                [
212                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
213                ],
214            )
215        )
216
217        rx_msg = self.get_reply(msg)
218        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
219        assert rx_msg.error_code == 0
220
221        iface_msg = self.get_interface_byname("lo10")
222        iface_idx = iface_msg.base_hdr.ifi_index
223
224        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
225        msg.nl_hdr.nlmsg_flags = (
226            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
227        )
228        msg.base_hdr.ifi_index = iface_idx
229        # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
230
231        rx_msg = self.get_reply(msg)
232        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
233        assert rx_msg.error_code == 0
234
235        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
236        msg.nl_hdr.nlmsg_flags = (
237            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
238        )
239        msg.base_hdr.ifi_index = 2147483647
240
241        rx_msg = self.get_reply(msg)
242        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
243        assert rx_msg.error_code == errno.ENODEV
244
245    @pytest.mark.require_user("root")
246    def test_dump_ifaces_many(self):
247        """Tests if interface dummp is not missing interfaces"""
248
249        ifmap = {}
250        ifmap[socket.if_nametoindex("lo0")] = "lo0"
251
252        for i in range(40):
253            ifname = "lo{}".format(i + 1)
254            flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
255            msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
256            msg.nl_hdr.nlmsg_flags = (
257                flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
258            )
259            msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
260            msg.add_nla(
261                NlAttrNested(
262                    IflattrType.IFLA_LINKINFO,
263                    [
264                        NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
265                    ],
266                )
267            )
268
269            rx_msg = self.get_reply(msg)
270            assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
271            nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
272            nla_map = {n.nla_type: n for n in nla_list}
273            assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
274            ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
275            assert ifindex > 0
276            assert ifindex not in ifmap
277            ifmap[ifindex] = ifname
278
279            # Dump all interfaces and check if the output matches ifmap
280            kernel_ifmap = {}
281            msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
282            msg.nl_hdr.nlmsg_flags = (
283                NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
284            )
285            self.write_message(msg)
286            while True:
287                rx_msg = self.read_message()
288                if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
289                    raise ValueError(
290                        "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
291                    )
292                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
293                    raise ValueError("unexpected message {}".format(rx_msg))
294                if rx_msg.is_type(NlMsgType.NLMSG_DONE):
295                    break
296                if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
297                    raise ValueError("unexpected message {}".format(rx_msg))
298
299                ifindex = rx_msg.base_hdr.ifi_index
300                assert ifindex == rx_msg.base_hdr.ifi_index
301                ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
302                if ifname.startswith("lo"):
303                    kernel_ifmap[ifindex] = ifname
304            assert kernel_ifmap == ifmap
305
306    #
307    # *
308    # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
309    # *  {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
310    # *    {{nla_len=8, nla_type=IFLA_LINK}, 2},
311    # *    {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
312    # *    {{nla_len=24, nla_type=IFLA_LINKINFO},
313    # *      {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
314    # *      {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
315    # */
316    @pytest.mark.require_user("root")
317    def test_create_vlan_plain(self):
318        """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
319        os_ifname = self.vnet.iface_alias_map["if1"].name
320        ifindex = socket.if_nametoindex(os_ifname)
321
322        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
323        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
324        msg.nl_hdr.nlmsg_flags = (
325            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
326        )
327
328        msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
329        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
330
331        msg.add_nla(
332            NlAttrNested(
333                IflattrType.IFLA_LINKINFO,
334                [
335                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
336                    NlAttrNested(
337                        IflinkInfo.IFLA_INFO_DATA,
338                        [
339                            NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
340                        ],
341                    ),
342                ],
343            )
344        )
345
346        rx_msg = self.get_reply(msg)
347        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
348        assert rx_msg.error_code == 0
349
350        self.get_interface_byname("vlan22")
351        # ToolsHelper.print_net_debug()
352