1# -*- coding: utf-8 -*- 2# Copyright 2019 Red Hat 3# GNU General Public License v3.0+ 4# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5""" 6The eos_l3_interfaces class 7It is in this file where the current configuration (as dict) 8is compared to the provided configuration (as dict) and the command set 9necessary to bring the current configuration to it's desired end-state is 10created 11""" 12 13from __future__ import absolute_import, division, print_function 14 15__metaclass__ = type 16 17from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import ( 18 ConfigBase, 19) 20from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( 21 to_list, 22 param_list_to_dict, 23) 24from ansible_collections.arista.eos.plugins.module_utils.network.eos.facts.facts import ( 25 Facts, 26) 27from ansible_collections.arista.eos.plugins.module_utils.network.eos.utils.utils import ( 28 normalize_interface, 29) 30 31 32class L3_interfaces(ConfigBase): 33 """ 34 The eos_l3_interfaces class 35 """ 36 37 gather_subset = ["!all", "!min"] 38 39 gather_network_resources = ["l3_interfaces"] 40 41 def get_l3_interfaces_facts(self, data=None): 42 """Get the 'facts' (the current configuration) 43 44 :rtype: A dictionary 45 :returns: The current configuration as a dictionary 46 """ 47 facts, _warnings = Facts(self._module).get_facts( 48 self.gather_subset, self.gather_network_resources, data=data 49 ) 50 l3_interfaces_facts = facts["ansible_network_resources"].get( 51 "l3_interfaces" 52 ) 53 if not l3_interfaces_facts: 54 return [] 55 return l3_interfaces_facts 56 57 def execute_module(self): 58 """Execute the module 59 60 :rtype: A dictionary 61 :returns: The result from module execution 62 """ 63 result = {"changed": False} 64 commands = list() 65 warnings = list() 66 67 if self.state in self.ACTION_STATES: 68 existing_l3_interfaces_facts = self.get_l3_interfaces_facts() 69 else: 70 existing_l3_interfaces_facts = [] 71 72 if self.state in self.ACTION_STATES or self.state == "rendered": 73 commands.extend(self.set_config(existing_l3_interfaces_facts)) 74 75 if commands and self.state in self.ACTION_STATES: 76 if not self._module.check_mode: 77 self._connection.edit_config(commands) 78 result["changed"] = True 79 if self.state in self.ACTION_STATES: 80 result["commands"] = commands 81 82 if self.state in self.ACTION_STATES or self.state == "gathered": 83 changed_l3_interfaces_facts = self.get_l3_interfaces_facts() 84 85 elif self.state == "rendered": 86 result["rendered"] = commands 87 88 elif self.state == "parsed": 89 running_config = self._module.params["running_config"] 90 if not running_config: 91 self._module.fail_json( 92 msg="value of running_config parameter must not be empty for state parsed" 93 ) 94 result["parsed"] = self.get_l3_interfaces_facts( 95 data=running_config 96 ) 97 98 if self.state in self.ACTION_STATES: 99 result["before"] = existing_l3_interfaces_facts 100 if result["changed"]: 101 result["after"] = changed_l3_interfaces_facts 102 103 elif self.state == "gathered": 104 result["gathered"] = changed_l3_interfaces_facts 105 106 result["warnings"] = warnings 107 return result 108 109 def set_config(self, existing_l3_interfaces_facts): 110 """Collect the configuration from the args passed to the module, 111 collect the current configuration (as a dict from facts) 112 113 :rtype: A list 114 :returns: the commands necessary to migrate the current configuration 115 to the desired configuration 116 """ 117 want = self._module.params["config"] 118 have = existing_l3_interfaces_facts 119 resp = self.set_state(want, have) 120 return to_list(resp) 121 122 def set_state(self, want, have): 123 """Select the appropriate function based on the state provided 124 125 :param want: the desired configuration as a dictionary 126 :param have: the current configuration as a dictionary 127 :rtype: A list 128 :returns: the commands necessary to migrate the current configuration 129 to the desired configuration 130 """ 131 state = self._module.params["state"] 132 if ( 133 state in ("merged", "replaced", "overridden", "rendered") 134 and not want 135 ): 136 self._module.fail_json( 137 msg="value of config parameter must not be empty for state {0}".format( 138 state 139 ) 140 ) 141 want = param_list_to_dict(want) 142 have = param_list_to_dict(have) 143 commands = [] 144 if state == "overridden": 145 commands = self._state_overridden(want, have) 146 elif state == "deleted": 147 commands = self._state_deleted(want, have) 148 elif state == "merged" or state == "rendered": 149 commands = self._state_merged(want, have) 150 elif state == "replaced": 151 commands = self._state_replaced(want, have) 152 return commands 153 154 @staticmethod 155 def _state_replaced(want, have): 156 """The command generator when state is replaced 157 158 :rtype: A list 159 :returns: the commands necessary to migrate the current configuration 160 to the desired configuration 161 """ 162 commands = [] 163 for key, desired in want.items(): 164 interface_name = normalize_interface(key) 165 if interface_name in have: 166 extant = have[interface_name] 167 else: 168 extant = dict() 169 intf_commands = set_interface(desired, extant) 170 intf_commands.extend(clear_interface(desired, extant)) 171 172 if intf_commands: 173 commands.append("interface {0}".format(interface_name)) 174 commands.extend(intf_commands) 175 return commands 176 177 @staticmethod 178 def _state_overridden(want, have): 179 """The command generator when state is overridden 180 181 :rtype: A list 182 :returns: the commands necessary to migrate the current configuration 183 to the desired configuration 184 """ 185 commands = [] 186 for key, extant in have.items(): 187 interface_name = normalize_interface(key) 188 if interface_name in want: 189 desired = want[interface_name] 190 else: 191 desired = dict() 192 if desired.get("ipv4"): 193 for ipv4 in desired["ipv4"]: 194 for k in ["secondary", "virtual"]: 195 if ipv4[k] is None: 196 del ipv4[k] 197 intf_commands = set_interface(desired, extant) 198 intf_commands.extend(clear_interface(desired, extant)) 199 200 if intf_commands: 201 commands.append("interface {0}".format(interface_name)) 202 commands.extend(intf_commands) 203 204 return commands 205 206 @staticmethod 207 def _state_merged(want, have): 208 """The command generator when state is merged 209 210 :rtype: A list 211 :returns: the commands necessary to merge the provided into 212 the current configuration 213 """ 214 commands = [] 215 for key, desired in want.items(): 216 interface_name = normalize_interface(key) 217 if interface_name in have: 218 extant = have[interface_name] 219 else: 220 extant = dict() 221 222 intf_commands = set_interface(desired, extant) 223 if intf_commands: 224 commands.append("interface {0}".format(interface_name)) 225 commands.extend(intf_commands) 226 227 return commands 228 229 @staticmethod 230 def _state_deleted(want, have): 231 """The command generator when state is deleted 232 233 :rtype: A list 234 :returns: the commands necessary to remove the current configuration 235 of the provided objects 236 """ 237 commands = [] 238 for key in want: 239 interface_name = normalize_interface(key) 240 desired = dict() 241 if interface_name in have: 242 extant = have[interface_name] 243 else: 244 continue 245 246 intf_commands = clear_interface(desired, extant) 247 248 if intf_commands: 249 commands.append("interface {0}".format(interface_name)) 250 commands.extend(intf_commands) 251 252 return commands 253 254 255def set_interface(want, have): 256 commands = [] 257 want_ipv4 = set( 258 tuple(sorted(address.items())) for address in want.get("ipv4") or [] 259 ) 260 have_ipv4 = set( 261 tuple(sorted(address.items())) for address in have.get("ipv4") or [] 262 ) 263 for address in want_ipv4 - have_ipv4: 264 address = dict(address) 265 for param in ["secondary", "virtual"]: 266 if param in address and not address[param]: 267 del address[param] 268 if tuple(sorted(address.items())) in have_ipv4: 269 continue 270 271 address_cmd = "ip address {0}".format(address["address"]) 272 if address.get("secondary"): 273 address_cmd += " secondary" 274 if address.get("virtual"): 275 address_cmd = "ip address virtual {0}".format(address["address"]) 276 commands.append(address_cmd) 277 278 want_ipv6 = set( 279 tuple(sorted(address.items())) for address in want.get("ipv6") or [] 280 ) 281 have_ipv6 = set( 282 tuple(sorted(address.items())) for address in have.get("ipv6") or [] 283 ) 284 for address in want_ipv6 - have_ipv6: 285 address = dict(address) 286 commands.append("ipv6 address {0}".format(address["address"])) 287 return commands 288 289 290def clear_interface(want, have): 291 commands = [] 292 want_ipv4 = set( 293 tuple(sorted(address.items())) for address in want.get("ipv4") or [] 294 ) 295 have_ipv4 = set( 296 tuple(sorted(address.items())) for address in have.get("ipv4") or [] 297 ) 298 if not want_ipv4 and have_ipv4: 299 commands.append("no ip address") 300 else: 301 for address in have_ipv4 - want_ipv4: 302 address = dict(address) 303 for param in ["secondary", "virtual"]: 304 if param not in address: 305 address[param] = None 306 if tuple(sorted(address.items())) in want_ipv4: 307 continue 308 309 if address.get("secondary"): 310 commands.append( 311 "no ip address {0} secondary".format(address["address"]) 312 ) 313 if address.get("virtual"): 314 commands.append( 315 "no ip address virtual {0}".format(address["address"]) 316 ) 317 318 if "secondary" not in address: 319 # Removing non-secondary removes all other interfaces 320 break 321 322 want_ipv6 = set( 323 tuple(sorted(address.items())) for address in want.get("ipv6") or [] 324 ) 325 have_ipv6 = set( 326 tuple(sorted(address.items())) for address in have.get("ipv6") or [] 327 ) 328 for address in have_ipv6 - want_ipv6: 329 address = dict(address) 330 commands.append("no ipv6 address {0}".format(address["address"])) 331 return commands 332