1import pytest
2from atf_python.sys.net.rtsock import RtConst
3from atf_python.sys.net.rtsock import Rtsock
4from atf_python.sys.net.rtsock import RtsockRtMessage
5from atf_python.sys.net.tools import ToolsHelper
6from atf_python.sys.net.vnet import SingleVnetTestTemplate
7
8
9class TestRtmMultipath(SingleVnetTestTemplate):
10    def setup_method(self, method):
11        method_name = method.__name__
12        if "multipath4" in method_name:
13            self.IPV4_PREFIXES = ["192.0.2.1/24"]
14            self.PREFIX = "128.66.0.0/24"
15        elif "multipath6" in method_name:
16            self.IPV6_PREFIXES = ["2001:db8::1/64"]
17            self.PREFIX = "2001:db8:0:ddbb::/64"
18        super().setup_method(method)
19        self.rtsock = Rtsock()
20
21    def get_prefix_routes(self):
22        family = "inet6" if ":" in self.PREFIX else "inet"
23        routes = ToolsHelper.get_routes(family)
24        return [r for r in routes if r["destination"] == self.PREFIX]
25
26    @pytest.mark.parametrize(
27        "gws",
28        [
29            pytest.param(["+.10=2", "+.5=3"], id="transition_multi"),
30            pytest.param(["+.10=2", "+.5=3", "-.10=2"], id="transition_single1"),
31            pytest.param(["+.10=2", "+.5=3", "-.5=3"], id="transition_single2"),
32            pytest.param(
33                ["+.10", "+.11", "+.50", "+.13", "+.145", "+.72"], id="correctness1"
34            ),
35            pytest.param(
36                ["+.10", "+.11", "+.50", "-.50", "+.145", "+.72"], id="correctness2"
37            ),
38            pytest.param(["+.10=1", "+.5=2"], id="weight1"),
39            pytest.param(["+.10=2", "+.5=7"], id="weight2"),
40            pytest.param(["+.10=13", "+.5=21"], id="weight3_max"),
41            pytest.param(["+.10=2", "+.5=3", "~.5=4"], id="change_new_weight1"),
42            pytest.param(["+.10=2", "+.5=3", "~.10=3"], id="change_new_weight2"),
43            pytest.param(
44                ["+.10=2", "+.5=3", "+.7=4", "~.10=3"], id="change_new_weight3"
45            ),
46            pytest.param(["+.10=2", "+.5=3", "~.5=3"], id="change_same_weight1"),
47            pytest.param(
48                ["+.10=2", "+.5=3", "+.7=4", "~.5=3"], id="change_same_weight2"
49            ),
50        ],
51    )
52    @pytest.mark.require_user("root")
53    def test_rtm_multipath4(self, gws):
54        """Tests RTM_ADD with IPv4 dest transitioning to multipath"""
55        self._test_rtm_multipath(gws, "192.0.2")
56
57    @pytest.mark.parametrize(
58        "gws",
59        [
60            pytest.param(["+:10=2", "+:5=3"], id="transition_multi"),
61            pytest.param(["+:10=2", "+:5=3", "-:10=2"], id="transition_single1"),
62            pytest.param(["+:10=2", "+:5=3", "-:5=3"], id="transition_single2"),
63            pytest.param(
64                ["+:10", "+:11", "+:50", "+:13", "+:145", "+:72"], id="correctness1"
65            ),
66            pytest.param(
67                ["+:10", "+:11", "+:50", "-:50", "+:145", "+:72"], id="correctness2"
68            ),
69            pytest.param(["+:10=1", "+:5=2"], id="weight1"),
70            pytest.param(["+:10=2", "+:5=7"], id="weight2"),
71            pytest.param(["+:10=13", "+:5=21"], id="weight3_max"),
72            pytest.param(["+:10=13", "+:5=21"], id="weight3_max"),
73            pytest.param(["+:10=2", "+:5=3", "~:5=4"], id="change_new_weight1"),
74            pytest.param(["+:10=2", "+:5=3", "~:10=3"], id="change_new_weight2"),
75            pytest.param(
76                ["+:10=2", "+:5=3", "+:7=4", "~:10=3"], id="change_new_weight3"
77            ),
78            pytest.param(["+:10=2", "+:5=3", "~:5=3"], id="change_same_weight1"),
79            pytest.param(
80                ["+:10=2", "+:5=3", "+:7=4", "~:5=3"], id="change_same_weight2"
81            ),
82        ],
83    )
84    @pytest.mark.require_user("root")
85    def test_rtm_multipath6(self, gws):
86        """Tests RTM_ADD with IPv6 dest transitioning to multipath"""
87        self._test_rtm_multipath(gws, "2001:db8:")
88
89    def _test_rtm_multipath(self, gws, gw_prefix: str):
90        desired_map = {}
91        for gw_act in gws:
92            # GW format: <+-~>GW[=weight]
93            if "=" in gw_act:
94                arr = gw_act[1:].split("=")
95                weight = int(arr[1])
96                gw = gw_prefix + arr[0]
97            else:
98                weight = None
99                gw = gw_prefix + gw_act[1:]
100            if gw_act[0] == "+":
101                msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
102                desired_map[gw] = self.rtsock.get_weight(weight)
103            elif gw_act[0] == "-":
104                msg = self.rtsock.new_rtm_del(self.PREFIX, gw)
105                del desired_map[gw]
106            else:
107                msg = self.rtsock.new_rtm_change(self.PREFIX, gw)
108                desired_map[gw] = self.rtsock.get_weight(weight)
109
110            msg.rtm_flags = RtConst.RTF_GATEWAY
111            if weight:
112                msg.rtm_inits |= RtConst.RTV_WEIGHT
113                msg.rtm_rmx.rmx_weight = weight
114            # Prepare SAs to check for
115            desired_sa = {
116                RtConst.RTA_DST: msg.get_sa(RtConst.RTA_DST),
117                RtConst.RTA_NETMASK: msg.get_sa(RtConst.RTA_NETMASK),
118                RtConst.RTA_GATEWAY: msg.get_sa(RtConst.RTA_GATEWAY),
119            }
120            self.rtsock.write_message(msg)
121
122            data = self.rtsock.read_data(msg.rtm_seq)
123            msg_in = RtsockRtMessage.from_bytes(data)
124            msg_in.print_in_message()
125            msg_in.verify(msg.rtm_type, desired_sa)
126            assert msg_in.rtm_rmx.rmx_weight == self.rtsock.get_weight(weight)
127
128            routes = self.get_prefix_routes()
129            derived_map = {r["gateway"]: r["weight"] for r in routes}
130            assert derived_map == desired_map
131
132    @pytest.mark.require_user("root")
133    def test_rtm_multipath4_add_same_eexist(self):
134        """Tests adding same IPv4 gw to the multipath group (EEXIST)"""
135        gws = ["192.0.2.10", "192.0.2.11", "192.0.2.11"]
136        self._test_rtm_multipath_add_same_eexist(gws)
137
138    @pytest.mark.require_user("root")
139    def test_rtm_multipath6_add_same_eexist(self):
140        """Tests adding same IPv4 gw to the multipath group (EEXIST)"""
141        gws = ["2001:db8::10", "2001:db8::11", "2001:db8::11"]
142        self._test_rtm_multipath_add_same_eexist(gws)
143
144    def _test_rtm_multipath_add_same_eexist(self, gws):
145        for idx, gw in enumerate(gws):
146            msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
147            msg.rtm_flags = RtConst.RTF_GATEWAY
148            try:
149                self.rtsock.write_message(msg)
150            except FileExistsError as e:
151                if idx != 2:
152                    raise
153                print("Succcessfully raised {}".format(e))
154
155    @pytest.mark.require_user("root")
156    def test_rtm_multipath4_del_unknown_esrch(self):
157        """Tests deleting non-existing dest from the multipath group (ESRCH)"""
158        gws = ["192.0.2.10", "192.0.2.11"]
159        self._test_rtm_multipath_del_unknown_esrch(gws, "192.0.2.7")
160
161    @pytest.mark.require_user("root")
162    def test_rtm_multipath6_del_unknown_esrch(self):
163        """Tests deleting non-existing dest from the multipath group (ESRCH)"""
164        gws = ["2001:db8::10", "2001:db8::11"]
165        self._test_rtm_multipath_del_unknown_esrch(gws, "2001:db8::7")
166
167    @pytest.mark.require_user("root")
168    def _test_rtm_multipath_del_unknown_esrch(self, gws, target_gw):
169        for gw in gws:
170            msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
171            msg.rtm_flags = RtConst.RTF_GATEWAY
172            self.rtsock.write_message(msg)
173        msg = self.rtsock.new_rtm_del(self.PREFIX, target_gw)
174        msg.rtm_flags = RtConst.RTF_GATEWAY
175        try:
176            self.rtsock.write_message(msg)
177        except ProcessLookupError as e:
178            print("Succcessfully raised {}".format(e))
179
180    @pytest.mark.require_user("root")
181    def test_rtm_multipath4_change_unknown_esrch(self):
182        """Tests changing non-existing dest in the multipath group (ESRCH)"""
183        gws = ["192.0.2.10", "192.0.2.11"]
184        self._test_rtm_multipath_change_unknown_esrch(gws, "192.0.2.7")
185
186    @pytest.mark.require_user("root")
187    def test_rtm_multipath6_change_unknown_esrch(self):
188        """Tests changing non-existing dest in the multipath group (ESRCH)"""
189        gws = ["2001:db8::10", "2001:db8::11"]
190        self._test_rtm_multipath_change_unknown_esrch(gws, "2001:db8::7")
191
192    @pytest.mark.require_user("root")
193    def _test_rtm_multipath_change_unknown_esrch(self, gws, target_gw):
194        for gw in gws:
195            msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
196            msg.rtm_flags = RtConst.RTF_GATEWAY
197            self.rtsock.write_message(msg)
198        msg = self.rtsock.new_rtm_change(self.PREFIX, target_gw)
199        msg.rtm_flags = RtConst.RTF_GATEWAY
200        try:
201            self.rtsock.write_message(msg)
202        except ProcessLookupError as e:
203            print("Succcessfully raised {}".format(e))
204
205    @pytest.mark.require_user("root")
206    def test_rtm_multipath4_add_zero_weight(self):
207        """Tests RTM_ADD with dest transitioning to multipath"""
208
209        desired_map = {}
210        for gw in ["192.0.2.10", "192.0.2.11", "192.0.2.13"]:
211            msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
212            msg.rtm_flags = RtConst.RTF_GATEWAY
213            msg.rtm_rmx.rmx_weight = 0
214            msg.rtm_inits |= RtConst.RTV_WEIGHT
215            self.rtsock.write_message(msg)
216            desired_map[gw] = self.rtsock.get_weight(0)
217
218        routes = self.get_prefix_routes()
219        derived_map = {r["gateway"]: r["weight"] for r in routes}
220        assert derived_map == desired_map
221
222    @pytest.mark.require_user("root")
223    def test_rtm_multipath4_getroute(self):
224        """Tests RTM_GET with exact prefix lookup on the multipath group"""
225        gws = ["192.0.2.10", "192.0.2.11", "192.0.2.13"]
226        return self._test_rtm_multipath_getroute(gws)
227
228    @pytest.mark.require_user("root")
229    def test_rtm_multipath6_getroute(self):
230        """Tests RTM_GET with exact prefix lookup on the multipath group"""
231        gws = ["2001:db8::10", "2001:db8::11", "2001:db8::13"]
232        return self._test_rtm_multipath_getroute(gws)
233
234    def _test_rtm_multipath_getroute(self, gws):
235        valid_gws = []
236        for gw in gws:
237            msg = self.rtsock.new_rtm_add(self.PREFIX, gw)
238            msg.rtm_flags = RtConst.RTF_GATEWAY
239            self.rtsock.write_message(msg)
240
241            desired_sa = {
242                RtConst.RTA_DST: msg.get_sa(RtConst.RTA_DST),
243                RtConst.RTA_NETMASK: msg.get_sa(RtConst.RTA_NETMASK),
244            }
245            valid_gws.append(msg.get_sa(RtConst.RTA_GATEWAY))
246
247            msg_get = RtsockRtMessage(
248                RtConst.RTM_GET,
249                self.rtsock.get_seq(),
250                msg.get_sa(RtConst.RTA_DST),
251                msg.get_sa(RtConst.RTA_NETMASK),
252            )
253            self.rtsock.write_message(msg_get)
254
255            data = self.rtsock.read_data(msg_get.rtm_seq)
256            msg_in = RtsockRtMessage.from_bytes(data)
257            msg_in.print_in_message()
258            msg_in.verify(RtConst.RTM_GET, desired_sa)
259
260            # Additionally, check that the gateway is among the valid
261            # gateways
262            gw_found = False
263            gw_in = msg_in.get_sa(RtConst.RTA_GATEWAY)
264            for valid_gw in valid_gws:
265                try:
266                    assert valid_gw == gw_in
267                    gw_found = True
268                    break
269                except AssertionError:
270                    pass
271            assert gw_found is True
272