1import errno
2import socket
3
4import pytest
5from atf_python.sys.netlink.netlink_route import IflattrType
6from atf_python.sys.netlink.netlink_route import IflinkInfo
7from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
8from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
9from atf_python.sys.netlink.netlink import NetlinkTestTemplate
10from atf_python.sys.netlink.attrs import NlAttrNested
11from atf_python.sys.netlink.attrs import NlAttrStr
12from atf_python.sys.netlink.attrs import NlAttrStrn
13from atf_python.sys.netlink.attrs import NlAttrU16
14from atf_python.sys.netlink.attrs import NlAttrU32
15from atf_python.sys.netlink.utils import NlConst
16from atf_python.sys.netlink.base_headers import NlmBaseFlags
17from atf_python.sys.netlink.base_headers import NlmNewFlags
18from atf_python.sys.netlink.base_headers import NlMsgType
19from atf_python.sys.netlink.netlink_route import NlRtMsgType
20from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
21from atf_python.sys.net.vnet import SingleVnetTestTemplate
22from atf_python.sys.net.tools import ToolsHelper
23
24
25class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
26    def setup_method(self, method):
27        super().setup_method(method)
28        self.setup_netlink(NlConst.NETLINK_ROUTE)
29
30    def get_interface_byname(self, ifname):
31        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
32        msg.nl_hdr.nlmsg_flags = (
33            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
34        )
35        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
36        self.write_message(msg)
37        while True:
38            rx_msg = self.read_message()
39            if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
40                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
41                    if rx_msg.error_code != 0:
42                        raise ValueError("unable to get interface {}".format(ifname))
43                elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
44                    return rx_msg
45                else:
46                    raise ValueError("bad message")
47
48    def test_get_iface_byname_error(self):
49        """Tests error on fetching non-existing interface name"""
50        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
51        msg.nl_hdr.nlmsg_flags = (
52            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
53        )
54        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
55
56        rx_msg = self.get_reply(msg)
57        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
58        assert rx_msg.error_code == errno.ENODEV
59
60    def test_get_iface_byindex_error(self):
61        """Tests error on fetching non-existing interface index"""
62        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
63        msg.nl_hdr.nlmsg_flags = (
64            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
65        )
66        msg.base_hdr.ifi_index = 2147483647
67
68        rx_msg = self.get_reply(msg)
69        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
70        assert rx_msg.error_code == errno.ENODEV
71
72    @pytest.mark.require_user("root")
73    def test_create_iface_plain(self):
74        """Tests loopback creation w/o any parameters"""
75        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
76        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
77        msg.nl_hdr.nlmsg_flags = (
78            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
79        )
80        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
81        msg.add_nla(
82            NlAttrNested(
83                IflattrType.IFLA_LINKINFO,
84                [
85                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
86                ],
87            )
88        )
89
90        rx_msg = self.get_reply(msg)
91        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
92        assert rx_msg.error_code == 0
93
94        self.get_interface_byname("lo10")
95
96    @pytest.mark.require_user("root")
97    def test_create_iface_plain_retvals(self):
98        """Tests loopback creation w/o any parameters"""
99        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
100        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
101        msg.nl_hdr.nlmsg_flags = (
102            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
103        )
104        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
105        msg.add_nla(
106            NlAttrNested(
107                IflattrType.IFLA_LINKINFO,
108                [
109                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
110                ],
111            )
112        )
113
114        rx_msg = self.get_reply(msg)
115        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116        assert rx_msg.error_code == 0
117        assert rx_msg.cookie is not None
118        nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
119        nla_map = {n.nla_type: n for n in nla_list}
120        assert IflattrType.IFLA_IFNAME.value in nla_map
121        assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
122        assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
123        assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
124
125        lo_msg = self.get_interface_byname("lo10")
126        assert (
127            lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
128        )
129
130    @pytest.mark.require_user("root")
131    def test_create_iface_attrs(self):
132        """Tests interface creation with additional properties"""
133        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
134        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
135        msg.nl_hdr.nlmsg_flags = (
136            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
137        )
138        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
139        msg.add_nla(
140            NlAttrNested(
141                IflattrType.IFLA_LINKINFO,
142                [
143                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
144                ],
145            )
146        )
147
148        # Custom attributes
149        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
150        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
151
152        rx_msg = self.get_reply(msg)
153        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
154        assert rx_msg.error_code == 0
155
156        iface_msg = self.get_interface_byname("lo10")
157        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
158        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
159
160    @pytest.mark.require_user("root")
161    def test_modify_iface_attrs(self):
162        """Tests interface modifications"""
163        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
164        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
165        msg.nl_hdr.nlmsg_flags = (
166            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
167        )
168        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
169        msg.add_nla(
170            NlAttrNested(
171                IflattrType.IFLA_LINKINFO,
172                [
173                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
174                ],
175            )
176        )
177
178        rx_msg = self.get_reply(msg)
179        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
180        assert rx_msg.error_code == 0
181
182        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
183        msg.nl_hdr.nlmsg_flags = (
184            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
185        )
186        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
187
188        # Custom attributes
189        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
190        msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
191
192        rx_msg = self.get_reply(msg)
193        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
194        assert rx_msg.error_code == 0
195
196        iface_msg = self.get_interface_byname("lo10")
197        assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
198        assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
199
200    @pytest.mark.require_user("root")
201    def test_delete_iface(self):
202        """Tests interface modifications"""
203        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
204        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
205        msg.nl_hdr.nlmsg_flags = (
206            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
207        )
208        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
209        msg.add_nla(
210            NlAttrNested(
211                IflattrType.IFLA_LINKINFO,
212                [
213                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
214                ],
215            )
216        )
217
218        rx_msg = self.get_reply(msg)
219        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
220        assert rx_msg.error_code == 0
221
222        iface_msg = self.get_interface_byname("lo10")
223        iface_idx = iface_msg.base_hdr.ifi_index
224
225        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
226        msg.nl_hdr.nlmsg_flags = (
227            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
228        )
229        msg.base_hdr.ifi_index = iface_idx
230        # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
231
232        rx_msg = self.get_reply(msg)
233        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
234        assert rx_msg.error_code == 0
235
236        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
237        msg.nl_hdr.nlmsg_flags = (
238            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
239        )
240        msg.base_hdr.ifi_index = 2147483647
241
242        rx_msg = self.get_reply(msg)
243        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
244        assert rx_msg.error_code == errno.ENODEV
245
246    @pytest.mark.require_user("root")
247    def test_dump_ifaces_many(self):
248        """Tests if interface dummp is not missing interfaces"""
249
250        ifmap = {}
251        ifmap[socket.if_nametoindex("lo0")] = "lo0"
252
253        for i in range(40):
254            ifname = "lo{}".format(i + 1)
255            flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
256            msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
257            msg.nl_hdr.nlmsg_flags = (
258                flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
259            )
260            msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
261            msg.add_nla(
262                NlAttrNested(
263                    IflattrType.IFLA_LINKINFO,
264                    [
265                        NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
266                    ],
267                )
268            )
269
270            rx_msg = self.get_reply(msg)
271            assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
272            nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
273            nla_map = {n.nla_type: n for n in nla_list}
274            assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
275            ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
276            assert ifindex > 0
277            assert ifindex not in ifmap
278            ifmap[ifindex] = ifname
279
280            # Dump all interfaces and check if the output matches ifmap
281            kernel_ifmap = {}
282            msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
283            msg.nl_hdr.nlmsg_flags = (
284                NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
285            )
286            self.write_message(msg)
287            while True:
288                rx_msg = self.read_message()
289                if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
290                    raise ValueError(
291                        "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
292                    )
293                if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
294                    raise ValueError("unexpected message {}".format(rx_msg))
295                if rx_msg.is_type(NlMsgType.NLMSG_DONE):
296                    break
297                if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
298                    raise ValueError("unexpected message {}".format(rx_msg))
299
300                ifindex = rx_msg.base_hdr.ifi_index
301                assert ifindex == rx_msg.base_hdr.ifi_index
302                ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
303                if ifname.startswith("lo"):
304                    kernel_ifmap[ifindex] = ifname
305            assert kernel_ifmap == ifmap
306
307    #
308    # *
309    # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
310    # *  {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
311    # *    {{nla_len=8, nla_type=IFLA_LINK}, 2},
312    # *    {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
313    # *    {{nla_len=24, nla_type=IFLA_LINKINFO},
314    # *      {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
315    # *      {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
316    # */
317    @pytest.mark.require_user("root")
318    def test_create_vlan_plain(self):
319        """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
320        self.require_module("if_vlan")
321        os_ifname = self.vnet.iface_alias_map["if1"].name
322        ifindex = socket.if_nametoindex(os_ifname)
323
324        flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
325        msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
326        msg.nl_hdr.nlmsg_flags = (
327            flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
328        )
329        msg.base_hdr.ifi_index = ifindex
330
331        msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
332        msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
333
334        msg.add_nla(
335            NlAttrNested(
336                IflattrType.IFLA_LINKINFO,
337                [
338                    NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
339                    NlAttrNested(
340                        IflinkInfo.IFLA_INFO_DATA,
341                        [
342                            NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
343                        ],
344                    ),
345                ],
346            )
347        )
348
349        rx_msg = self.get_reply(msg)
350        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
351        assert rx_msg.error_code == 0
352
353        ToolsHelper.print_net_debug()
354        self.get_interface_byname("vlan22")
355        # ToolsHelper.print_net_debug()
356