1# 2# -*- coding: utf-8 -*- 3# Copyright 2019 Red Hat 4# GNU General Public License v3.0+ 5# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 6""" 7The vyos_l3_interfaces class 8It is in this file where the current configuration (as dict) 9is compared to the provided configuration (as dict) and the command set 10necessary to bring the current configuration to it's desired end-state is 11created 12""" 13 14from __future__ import absolute_import, division, print_function 15 16__metaclass__ = type 17 18 19from copy import deepcopy 20from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import ( 21 ConfigBase, 22) 23from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( 24 to_list, 25 remove_empties, 26) 27from ansible.module_utils.six import iteritems 28from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import ( 29 Facts, 30) 31from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import ( 32 search_obj_in_list, 33 get_interface_type, 34 diff_list_of_dicts, 35) 36 37 38class L3_interfaces(ConfigBase): 39 """ 40 The vyos_l3_interfaces class 41 """ 42 43 gather_subset = [ 44 "!all", 45 "!min", 46 ] 47 48 gather_network_resources = [ 49 "l3_interfaces", 50 ] 51 52 def __init__(self, module): 53 super(L3_interfaces, self).__init__(module) 54 55 def get_l3_interfaces_facts(self, data=None): 56 """Get the 'facts' (the current configuration) 57 58 :rtype: A dictionary 59 :returns: The current configuration as a dictionary 60 """ 61 facts, _warnings = Facts(self._module).get_facts( 62 self.gather_subset, self.gather_network_resources, data=data 63 ) 64 l3_interfaces_facts = facts["ansible_network_resources"].get( 65 "l3_interfaces" 66 ) 67 if not l3_interfaces_facts: 68 return [] 69 return l3_interfaces_facts 70 71 def execute_module(self): 72 """Execute the module 73 74 :rtype: A dictionary 75 :returns: The result from module execution 76 """ 77 result = {"changed": False} 78 warnings = list() 79 commands = list() 80 81 if self.state in self.ACTION_STATES: 82 existing_l3_interfaces_facts = self.get_l3_interfaces_facts() 83 else: 84 existing_l3_interfaces_facts = [] 85 86 if self.state in self.ACTION_STATES or self.state == "rendered": 87 commands.extend(self.set_config(existing_l3_interfaces_facts)) 88 89 if commands and self.state in self.ACTION_STATES: 90 if not self._module.check_mode: 91 self._connection.edit_config(commands) 92 result["changed"] = True 93 94 if self.state in self.ACTION_STATES: 95 result["commands"] = commands 96 97 if self.state in self.ACTION_STATES or self.state == "gathered": 98 changed_l3_interfaces_facts = self.get_l3_interfaces_facts() 99 elif self.state == "rendered": 100 result["rendered"] = commands 101 elif self.state == "parsed": 102 running_config = self._module.params["running_config"] 103 if not running_config: 104 self._module.fail_json( 105 msg="value of running_config parameter must not be empty for state parsed" 106 ) 107 result["parsed"] = self.get_l3_interfaces_facts( 108 data=running_config 109 ) 110 else: 111 changed_l3_interfaces_facts = [] 112 113 if self.state in self.ACTION_STATES: 114 result["before"] = existing_l3_interfaces_facts 115 if result["changed"]: 116 result["after"] = changed_l3_interfaces_facts 117 elif self.state == "gathered": 118 result["gathered"] = changed_l3_interfaces_facts 119 120 result["warnings"] = warnings 121 return result 122 123 def set_config(self, existing_l3_interfaces_facts): 124 """Collect the configuration from the args passed to the module, 125 collect the current configuration (as a dict from facts) 126 127 :rtype: A list 128 :returns: the commands necessary to migrate the current configuration 129 to the desired configuration 130 """ 131 want = self._module.params["config"] 132 have = existing_l3_interfaces_facts 133 resp = self.set_state(want, have) 134 return to_list(resp) 135 136 def set_state(self, want, have): 137 """Select the appropriate function based on the state provided 138 139 :param want: the desired configuration as a dictionary 140 :param have: the current configuration as a dictionary 141 :rtype: A list 142 :returns: the commands necessary to migrate the current configuration 143 to the desired configuration 144 """ 145 commands = [] 146 state = self._module.params["state"] 147 148 if ( 149 state in ("merged", "replaced", "overridden", "rendered") 150 and not want 151 ): 152 self._module.fail_json( 153 msg="value of config parameter must not be empty for state {0}".format( 154 state 155 ) 156 ) 157 158 if state == "overridden": 159 commands.extend(self._state_overridden(want=want, have=have)) 160 161 elif state == "deleted": 162 if not want: 163 for intf in have: 164 commands.extend( 165 self._state_deleted({"name": intf["name"]}, intf) 166 ) 167 else: 168 for item in want: 169 obj_in_have = search_obj_in_list(item["name"], have) 170 commands.extend(self._state_deleted(item, obj_in_have)) 171 else: 172 for item in want: 173 name = item["name"] 174 obj_in_have = search_obj_in_list(name, have) 175 176 if not obj_in_have: 177 obj_in_have = {"name": item["name"]} 178 179 if state in ("merged", "rendered"): 180 commands.extend(self._state_merged(item, obj_in_have)) 181 182 elif state == "replaced": 183 commands.extend(self._state_replaced(item, obj_in_have)) 184 185 return commands 186 187 def _state_replaced(self, want, have): 188 """The command generator when state is replaced 189 190 :rtype: A list 191 :returns: the commands necessary to migrate the current configuration 192 to the desired configuration 193 """ 194 commands = [] 195 if have: 196 commands.extend(self._state_deleted(want, have)) 197 198 commands.extend(self._state_merged(want, have)) 199 200 return commands 201 202 def _state_overridden(self, want, have): 203 """The command generator when state is overridden 204 205 :rtype: A list 206 :returns: the commands necessary to migrate the current configuration 207 to the desired configuration 208 """ 209 commands = [] 210 211 for intf in have: 212 intf_in_want = search_obj_in_list(intf["name"], want) 213 if not intf_in_want: 214 commands.extend( 215 self._state_deleted({"name": intf["name"]}, intf) 216 ) 217 218 for intf in want: 219 intf_in_have = search_obj_in_list(intf["name"], have) 220 commands.extend(self._state_replaced(intf, intf_in_have)) 221 222 return commands 223 224 def _state_merged(self, want, have): 225 """The command generator when state is merged 226 227 :rtype: A list 228 :returns: the commands necessary to merge the provided into 229 the current configuration 230 """ 231 commands = [] 232 want_copy = deepcopy(remove_empties(want)) 233 have_copy = deepcopy(remove_empties(have)) 234 235 want_vifs = want_copy.pop("vifs", []) 236 have_vifs = have_copy.pop("vifs", []) 237 238 for update in self._get_updates(want_copy, have_copy): 239 for key, value in iteritems(update): 240 commands.append( 241 self._compute_commands( 242 key=key, value=value, interface=want_copy["name"] 243 ) 244 ) 245 246 if want_vifs: 247 for want_vif in want_vifs: 248 have_vif = search_obj_in_list( 249 want_vif["vlan_id"], have_vifs, key="vlan_id" 250 ) 251 if not have_vif: 252 have_vif = {} 253 254 for update in self._get_updates(want_vif, have_vif): 255 for key, value in iteritems(update): 256 commands.append( 257 self._compute_commands( 258 key=key, 259 value=value, 260 interface=want_copy["name"], 261 vif=want_vif["vlan_id"], 262 ) 263 ) 264 265 return commands 266 267 def _state_deleted(self, want, have): 268 """The command generator when state is deleted 269 270 :rtype: A list 271 :returns: the commands necessary to remove the current configuration 272 of the provided objects 273 """ 274 commands = [] 275 want_copy = deepcopy(remove_empties(want)) 276 have_copy = deepcopy(have) 277 278 want_vifs = want_copy.pop("vifs", []) 279 have_vifs = have_copy.pop("vifs", []) 280 281 for update in self._get_updates(have_copy, want_copy): 282 for key, value in iteritems(update): 283 commands.append( 284 self._compute_commands( 285 key=key, 286 value=value, 287 interface=want_copy["name"], 288 remove=True, 289 ) 290 ) 291 292 if have_vifs: 293 for have_vif in have_vifs: 294 want_vif = search_obj_in_list( 295 have_vif["vlan_id"], want_vifs, key="vlan_id" 296 ) 297 if not want_vif: 298 want_vif = {"vlan_id": have_vif["vlan_id"]} 299 300 for update in self._get_updates(have_vif, want_vif): 301 for key, value in iteritems(update): 302 commands.append( 303 self._compute_commands( 304 key=key, 305 interface=want_copy["name"], 306 value=value, 307 vif=want_vif["vlan_id"], 308 remove=True, 309 ) 310 ) 311 312 return commands 313 314 def _compute_commands( 315 self, interface, key, vif=None, value=None, remove=False 316 ): 317 intf_context = "interfaces {0} {1}".format( 318 get_interface_type(interface), interface 319 ) 320 set_cmd = "set {0}".format(intf_context) 321 del_cmd = "delete {0}".format(intf_context) 322 323 if vif: 324 set_cmd = set_cmd + (" vif {0}".format(vif)) 325 del_cmd = del_cmd + (" vif {0}".format(vif)) 326 327 if remove: 328 command = "{0} {1} '{2}'".format(del_cmd, key, value) 329 else: 330 command = "{0} {1} '{2}'".format(set_cmd, key, value) 331 332 return command 333 334 def _get_updates(self, want, have): 335 updates = [] 336 337 updates = diff_list_of_dicts( 338 want.get("ipv4", []), have.get("ipv4", []) 339 ) 340 updates.extend( 341 diff_list_of_dicts(want.get("ipv6", []), have.get("ipv6", [])) 342 ) 343 344 return updates 345