1#
2# -*- coding: utf-8 -*-
3# Copyright 2019 Red Hat
4# GNU General Public License v3.0+
5# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
6"""
7The vyos_l3_interfaces class
8It is in this file where the current configuration (as dict)
9is compared to the provided configuration (as dict) and the command set
10necessary to bring the current configuration to it's desired end-state is
11created
12"""
13
14from __future__ import absolute_import, division, print_function
15
16__metaclass__ = type
17
18
19from copy import deepcopy
20from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import (
21    ConfigBase,
22)
23from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
24    to_list,
25    remove_empties,
26)
27from ansible.module_utils.six import iteritems
28from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import (
29    Facts,
30)
31from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import (
32    search_obj_in_list,
33    get_interface_type,
34    diff_list_of_dicts,
35)
36
37
38class L3_interfaces(ConfigBase):
39    """
40    The vyos_l3_interfaces class
41    """
42
43    gather_subset = [
44        "!all",
45        "!min",
46    ]
47
48    gather_network_resources = [
49        "l3_interfaces",
50    ]
51
52    def __init__(self, module):
53        super(L3_interfaces, self).__init__(module)
54
55    def get_l3_interfaces_facts(self, data=None):
56        """Get the 'facts' (the current configuration)
57
58        :rtype: A dictionary
59        :returns: The current configuration as a dictionary
60        """
61        facts, _warnings = Facts(self._module).get_facts(
62            self.gather_subset, self.gather_network_resources, data=data
63        )
64        l3_interfaces_facts = facts["ansible_network_resources"].get(
65            "l3_interfaces"
66        )
67        if not l3_interfaces_facts:
68            return []
69        return l3_interfaces_facts
70
71    def execute_module(self):
72        """Execute the module
73
74        :rtype: A dictionary
75        :returns: The result from module execution
76        """
77        result = {"changed": False}
78        warnings = list()
79        commands = list()
80
81        if self.state in self.ACTION_STATES:
82            existing_l3_interfaces_facts = self.get_l3_interfaces_facts()
83        else:
84            existing_l3_interfaces_facts = []
85
86        if self.state in self.ACTION_STATES or self.state == "rendered":
87            commands.extend(self.set_config(existing_l3_interfaces_facts))
88
89        if commands and self.state in self.ACTION_STATES:
90            if not self._module.check_mode:
91                self._connection.edit_config(commands)
92            result["changed"] = True
93
94        if self.state in self.ACTION_STATES:
95            result["commands"] = commands
96
97        if self.state in self.ACTION_STATES or self.state == "gathered":
98            changed_l3_interfaces_facts = self.get_l3_interfaces_facts()
99        elif self.state == "rendered":
100            result["rendered"] = commands
101        elif self.state == "parsed":
102            running_config = self._module.params["running_config"]
103            if not running_config:
104                self._module.fail_json(
105                    msg="value of running_config parameter must not be empty for state parsed"
106                )
107            result["parsed"] = self.get_l3_interfaces_facts(
108                data=running_config
109            )
110        else:
111            changed_l3_interfaces_facts = []
112
113        if self.state in self.ACTION_STATES:
114            result["before"] = existing_l3_interfaces_facts
115            if result["changed"]:
116                result["after"] = changed_l3_interfaces_facts
117        elif self.state == "gathered":
118            result["gathered"] = changed_l3_interfaces_facts
119
120        result["warnings"] = warnings
121        return result
122
123    def set_config(self, existing_l3_interfaces_facts):
124        """Collect the configuration from the args passed to the module,
125            collect the current configuration (as a dict from facts)
126
127        :rtype: A list
128        :returns: the commands necessary to migrate the current configuration
129                  to the desired configuration
130        """
131        want = self._module.params["config"]
132        have = existing_l3_interfaces_facts
133        resp = self.set_state(want, have)
134        return to_list(resp)
135
136    def set_state(self, want, have):
137        """Select the appropriate function based on the state provided
138
139        :param want: the desired configuration as a dictionary
140        :param have: the current configuration as a dictionary
141        :rtype: A list
142        :returns: the commands necessary to migrate the current configuration
143                  to the desired configuration
144        """
145        commands = []
146        state = self._module.params["state"]
147
148        if (
149            state in ("merged", "replaced", "overridden", "rendered")
150            and not want
151        ):
152            self._module.fail_json(
153                msg="value of config parameter must not be empty for state {0}".format(
154                    state
155                )
156            )
157
158        if state == "overridden":
159            commands.extend(self._state_overridden(want=want, have=have))
160
161        elif state == "deleted":
162            if not want:
163                for intf in have:
164                    commands.extend(
165                        self._state_deleted({"name": intf["name"]}, intf)
166                    )
167            else:
168                for item in want:
169                    obj_in_have = search_obj_in_list(item["name"], have)
170                    commands.extend(self._state_deleted(item, obj_in_have))
171        else:
172            for item in want:
173                name = item["name"]
174                obj_in_have = search_obj_in_list(name, have)
175
176                if not obj_in_have:
177                    obj_in_have = {"name": item["name"]}
178
179                if state in ("merged", "rendered"):
180                    commands.extend(self._state_merged(item, obj_in_have))
181
182                elif state == "replaced":
183                    commands.extend(self._state_replaced(item, obj_in_have))
184
185        return commands
186
187    def _state_replaced(self, want, have):
188        """The command generator when state is replaced
189
190        :rtype: A list
191        :returns: the commands necessary to migrate the current configuration
192                  to the desired configuration
193        """
194        commands = []
195        if have:
196            commands.extend(self._state_deleted(want, have))
197
198        commands.extend(self._state_merged(want, have))
199
200        return commands
201
202    def _state_overridden(self, want, have):
203        """The command generator when state is overridden
204
205        :rtype: A list
206        :returns: the commands necessary to migrate the current configuration
207                  to the desired configuration
208        """
209        commands = []
210
211        for intf in have:
212            intf_in_want = search_obj_in_list(intf["name"], want)
213            if not intf_in_want:
214                commands.extend(
215                    self._state_deleted({"name": intf["name"]}, intf)
216                )
217
218        for intf in want:
219            intf_in_have = search_obj_in_list(intf["name"], have)
220            commands.extend(self._state_replaced(intf, intf_in_have))
221
222        return commands
223
224    def _state_merged(self, want, have):
225        """The command generator when state is merged
226
227        :rtype: A list
228        :returns: the commands necessary to merge the provided into
229                  the current configuration
230        """
231        commands = []
232        want_copy = deepcopy(remove_empties(want))
233        have_copy = deepcopy(remove_empties(have))
234
235        want_vifs = want_copy.pop("vifs", [])
236        have_vifs = have_copy.pop("vifs", [])
237
238        for update in self._get_updates(want_copy, have_copy):
239            for key, value in iteritems(update):
240                commands.append(
241                    self._compute_commands(
242                        key=key, value=value, interface=want_copy["name"]
243                    )
244                )
245
246        if want_vifs:
247            for want_vif in want_vifs:
248                have_vif = search_obj_in_list(
249                    want_vif["vlan_id"], have_vifs, key="vlan_id"
250                )
251                if not have_vif:
252                    have_vif = {}
253
254                for update in self._get_updates(want_vif, have_vif):
255                    for key, value in iteritems(update):
256                        commands.append(
257                            self._compute_commands(
258                                key=key,
259                                value=value,
260                                interface=want_copy["name"],
261                                vif=want_vif["vlan_id"],
262                            )
263                        )
264
265        return commands
266
267    def _state_deleted(self, want, have):
268        """The command generator when state is deleted
269
270        :rtype: A list
271        :returns: the commands necessary to remove the current configuration
272                  of the provided objects
273        """
274        commands = []
275        want_copy = deepcopy(remove_empties(want))
276        have_copy = deepcopy(have)
277
278        want_vifs = want_copy.pop("vifs", [])
279        have_vifs = have_copy.pop("vifs", [])
280
281        for update in self._get_updates(have_copy, want_copy):
282            for key, value in iteritems(update):
283                commands.append(
284                    self._compute_commands(
285                        key=key,
286                        value=value,
287                        interface=want_copy["name"],
288                        remove=True,
289                    )
290                )
291
292        if have_vifs:
293            for have_vif in have_vifs:
294                want_vif = search_obj_in_list(
295                    have_vif["vlan_id"], want_vifs, key="vlan_id"
296                )
297                if not want_vif:
298                    want_vif = {"vlan_id": have_vif["vlan_id"]}
299
300                for update in self._get_updates(have_vif, want_vif):
301                    for key, value in iteritems(update):
302                        commands.append(
303                            self._compute_commands(
304                                key=key,
305                                interface=want_copy["name"],
306                                value=value,
307                                vif=want_vif["vlan_id"],
308                                remove=True,
309                            )
310                        )
311
312        return commands
313
314    def _compute_commands(
315        self, interface, key, vif=None, value=None, remove=False
316    ):
317        intf_context = "interfaces {0} {1}".format(
318            get_interface_type(interface), interface
319        )
320        set_cmd = "set {0}".format(intf_context)
321        del_cmd = "delete {0}".format(intf_context)
322
323        if vif:
324            set_cmd = set_cmd + (" vif {0}".format(vif))
325            del_cmd = del_cmd + (" vif {0}".format(vif))
326
327        if remove:
328            command = "{0} {1} '{2}'".format(del_cmd, key, value)
329        else:
330            command = "{0} {1} '{2}'".format(set_cmd, key, value)
331
332        return command
333
334    def _get_updates(self, want, have):
335        updates = []
336
337        updates = diff_list_of_dicts(
338            want.get("ipv4", []), have.get("ipv4", [])
339        )
340        updates.extend(
341            diff_list_of_dicts(want.get("ipv6", []), have.get("ipv6", []))
342        )
343
344        return updates
345