1# -*- coding: utf-8 -*- 2# Copyright 2015 Spotify AB. All rights reserved. 3# 4# The contents of this file are licensed under the Apache License, Version 2.0 5# (the "License"); you may not use this file except in compliance with the 6# License. You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15 16# import stdlib 17from builtins import super 18import os 19import re 20import time 21import tempfile 22import uuid 23from collections import defaultdict 24 25# import third party lib 26from requests.exceptions import ConnectionError 27from netaddr import IPAddress 28from netaddr.core import AddrFormatError 29from netmiko import file_transfer 30from napalm.nxapi_plumbing import Device as NXOSDevice 31from napalm.nxapi_plumbing import ( 32 NXAPIAuthError, 33 NXAPIConnectionError, 34 NXAPICommandError, 35) 36import json 37 38# import NAPALM Base 39import napalm.base.helpers 40from napalm.base import NetworkDriver 41from napalm.base.exceptions import ConnectionException 42from napalm.base.exceptions import MergeConfigException 43from napalm.base.exceptions import CommandErrorException 44from napalm.base.exceptions import ReplaceConfigException 45from napalm.base.helpers import generate_regex_or 46from napalm.base.helpers import as_number 47from napalm.base.netmiko_helpers import netmiko_args 48import napalm.base.constants as c 49 50 51def ensure_netmiko_conn(func): 52 """Decorator that ensures Netmiko connection exists.""" 53 54 def wrap_function(self, filename=None, config=None): 55 try: 56 netmiko_object = self._netmiko_device 57 if netmiko_object is None: 58 raise AttributeError() 59 except AttributeError: 60 device_type = c.NETMIKO_MAP[self.platform] 61 netmiko_optional_args = self.netmiko_optional_args 62 if "port" in netmiko_optional_args: 63 netmiko_optional_args["port"] = 22 64 self._netmiko_open( 65 device_type=device_type, netmiko_optional_args=netmiko_optional_args 66 ) 67 func(self, filename=filename, config=config) 68 69 return wrap_function 70 71 72class NXOSDriverBase(NetworkDriver): 73 """Common code shared between nx-api and nxos_ssh.""" 74 75 def __init__(self, hostname, username, password, timeout=60, optional_args=None): 76 if optional_args is None: 77 optional_args = {} 78 self.hostname = hostname 79 self.username = username 80 self.password = password 81 self.timeout = timeout 82 self.replace = True 83 self.loaded = False 84 self.changed = False 85 self.merge_candidate = "" 86 self.candidate_cfg = "candidate_config.txt" 87 self.rollback_cfg = "rollback_config.txt" 88 self._dest_file_system = optional_args.pop("dest_file_system", "bootflash:") 89 self.force_no_enable = optional_args.get("force_no_enable", False) 90 self.netmiko_optional_args = netmiko_args(optional_args) 91 self.device = None 92 93 @ensure_netmiko_conn 94 def load_replace_candidate(self, filename=None, config=None): 95 96 if not filename and not config: 97 raise ReplaceConfigException( 98 "filename or config parameter must be provided." 99 ) 100 101 if not filename: 102 tmp_file = self._create_tmp_file(config) 103 filename = tmp_file 104 else: 105 if not os.path.isfile(filename): 106 raise ReplaceConfigException("File {} not found".format(filename)) 107 108 try: 109 transfer_result = file_transfer( 110 self._netmiko_device, 111 source_file=filename, 112 dest_file=self.candidate_cfg, 113 file_system=self._dest_file_system, 114 direction="put", 115 overwrite_file=True, 116 ) 117 if not transfer_result["file_exists"]: 118 raise ValueError() 119 except Exception: 120 msg = ( 121 "Could not transfer file. There was an error " 122 "during transfer. Please make sure remote " 123 "permissions are set." 124 ) 125 raise ReplaceConfigException(msg) 126 127 self.replace = True 128 self.loaded = True 129 if config and os.path.isfile(tmp_file): 130 os.remove(tmp_file) 131 132 def load_merge_candidate(self, filename=None, config=None): 133 if not filename and not config: 134 raise MergeConfigException("filename or config param must be provided.") 135 136 self.merge_candidate += "\n" # insert one extra line 137 if filename is not None: 138 with open(filename, "r") as f: 139 self.merge_candidate += f.read() 140 else: 141 self.merge_candidate += config 142 self.replace = False 143 self.loaded = True 144 145 def _send_command(self, command, raw_text=False): 146 raise NotImplementedError 147 148 def _commit_merge(self): 149 try: 150 output = self._send_config(self.merge_candidate) 151 if output and "Invalid command" in output: 152 raise MergeConfigException("Error while applying config!") 153 except Exception as e: 154 self.changed = True 155 self.rollback() 156 err_header = "Configuration merge failed; automatic rollback attempted" 157 merge_error = "{0}:\n{1}".format(err_header, repr(str(e))) 158 raise MergeConfigException(merge_error) 159 160 self.changed = True 161 # clear the merge buffer 162 self.merge_candidate = "" 163 164 def _get_merge_diff(self): 165 """ 166 The merge diff is not necessarily what needs to be loaded 167 for example under NTP, even though the 'ntp commit' command might be 168 alread configured, it is mandatory to be sent 169 otherwise it won't take the new configuration - see: 170 https://github.com/napalm-automation/napalm-nxos/issues/59 171 therefore this method will return the real diff (but not necessarily what is 172 being sent by the merge_load_config() 173 """ 174 diff = [] 175 running_config = self.get_config(retrieve="running")["running"] 176 running_lines = running_config.splitlines() 177 for line in self.merge_candidate.splitlines(): 178 if line not in running_lines and line: 179 if line[0].strip() != "!": 180 diff.append(line) 181 return "\n".join(diff) 182 183 def _get_diff(self): 184 """Get a diff between running config and a proposed file.""" 185 diff = [] 186 self._create_sot_file() 187 diff_out = self._send_command( 188 "show diff rollback-patch file {} file {}".format( 189 "sot_file", self.candidate_cfg 190 ), 191 raw_text=True, 192 ) 193 try: 194 diff_out = ( 195 diff_out.split("Generating Rollback Patch")[1] 196 .replace("Rollback Patch is Empty", "") 197 .strip() 198 ) 199 for line in diff_out.splitlines(): 200 if line: 201 if line[0].strip() != "!" and line[0].strip() != ".": 202 diff.append(line.rstrip(" ")) 203 except (AttributeError, KeyError): 204 raise ReplaceConfigException( 205 "Could not calculate diff. It's possible the given file doesn't exist." 206 ) 207 return "\n".join(diff) 208 209 def compare_config(self): 210 if self.loaded: 211 if not self.replace: 212 return self._get_merge_diff() 213 diff = self._get_diff() 214 return diff 215 return "" 216 217 def commit_config(self, message="", revert_in=None): 218 if revert_in is not None: 219 raise NotImplementedError( 220 "Commit confirm has not been implemented on this platform." 221 ) 222 if message: 223 raise NotImplementedError( 224 "Commit message not implemented for this platform" 225 ) 226 if self.loaded: 227 # Create checkpoint from current running-config 228 self._save_to_checkpoint(self.rollback_cfg) 229 230 if self.replace: 231 self._load_cfg_from_checkpoint() 232 else: 233 self._commit_merge() 234 235 try: 236 # If hostname changes ensure Netmiko state is updated properly 237 self._netmiko_device.set_base_prompt() 238 except AttributeError: 239 pass 240 241 self._copy_run_start() 242 self.loaded = False 243 else: 244 raise ReplaceConfigException("No config loaded.") 245 246 def discard_config(self): 247 if self.loaded: 248 # clear the buffer 249 self.merge_candidate = "" 250 if self.loaded and self.replace: 251 self._delete_file(self.candidate_cfg) 252 self.loaded = False 253 254 def _create_sot_file(self): 255 """Create Source of Truth file to compare.""" 256 257 # Bug on on NX-OS 6.2.16 where overwriting sot_file would take exceptionally long time 258 # (over 12 minutes); so just delete the sot_file 259 try: 260 self._delete_file(filename="sot_file") 261 except Exception: 262 pass 263 commands = [ 264 "terminal dont-ask", 265 "checkpoint file sot_file", 266 "no terminal dont-ask", 267 ] 268 self._send_command_list(commands) 269 270 def ping( 271 self, 272 destination, 273 source=c.PING_SOURCE, 274 ttl=c.PING_TTL, 275 timeout=c.PING_TIMEOUT, 276 size=c.PING_SIZE, 277 count=c.PING_COUNT, 278 vrf=c.PING_VRF, 279 source_interface=c.PING_SOURCE_INTERFACE, 280 ): 281 """ 282 Execute ping on the device and returns a dictionary with the result. 283 Output dictionary has one of following keys: 284 * success 285 * error 286 In case of success, inner dictionary will have the followin keys: 287 * probes_sent (int) 288 * packet_loss (int) 289 * rtt_min (float) 290 * rtt_max (float) 291 * rtt_avg (float) 292 * rtt_stddev (float) 293 * results (list) 294 'results' is a list of dictionaries with the following keys: 295 * ip_address (str) 296 * rtt (float) 297 """ 298 ping_dict = {} 299 300 version = "" 301 try: 302 version = "6" if IPAddress(destination).version == 6 else "" 303 except AddrFormatError: 304 # Allow use of DNS names 305 pass 306 307 command = "ping{version} {destination}".format( 308 version=version, destination=destination 309 ) 310 command += " timeout {}".format(timeout) 311 command += " packet-size {}".format(size) 312 command += " count {}".format(count) 313 if source != "": 314 command += " source {}".format(source) 315 elif source_interface != "": 316 command += " source {}".format(source_interface) 317 318 if vrf != "": 319 command += " vrf {}".format(vrf) 320 output = self._send_command(command, raw_text=True) 321 322 if "connect:" in output: 323 ping_dict["error"] = output 324 elif "PING" in output: 325 ping_dict["success"] = { 326 "probes_sent": 0, 327 "packet_loss": 0, 328 "rtt_min": 0.0, 329 "rtt_max": 0.0, 330 "rtt_avg": 0.0, 331 "rtt_stddev": 0.0, 332 "results": [], 333 } 334 results_array = [] 335 for line in output.splitlines(): 336 fields = line.split() 337 if "icmp" in line: 338 if "Unreachable" in line: 339 if "(" in fields[2]: 340 results_array.append( 341 {"ip_address": str(fields[2][1:-1]), "rtt": 0.0} 342 ) 343 else: 344 results_array.append( 345 {"ip_address": str(fields[1]), "rtt": 0.0} 346 ) 347 elif "truncated" in line: 348 if "(" in fields[4]: 349 results_array.append( 350 {"ip_address": str(fields[4][1:-2]), "rtt": 0.0} 351 ) 352 else: 353 results_array.append( 354 {"ip_address": str(fields[3][:-1]), "rtt": 0.0} 355 ) 356 elif fields[1] == "bytes": 357 if version == "6": 358 m = fields[5][5:] 359 else: 360 m = fields[6][5:] 361 results_array.append( 362 {"ip_address": str(fields[3][:-1]), "rtt": float(m)} 363 ) 364 elif "packets transmitted" in line: 365 ping_dict["success"]["probes_sent"] = int(fields[0]) 366 ping_dict["success"]["packet_loss"] = int(fields[0]) - int( 367 fields[3] 368 ) 369 elif "min/avg/max" in line: 370 m = fields[3].split("/") 371 ping_dict["success"].update( 372 { 373 "rtt_min": float(m[0]), 374 "rtt_avg": float(m[1]), 375 "rtt_max": float(m[2]), 376 } 377 ) 378 ping_dict["success"].update({"results": results_array}) 379 return ping_dict 380 381 def traceroute( 382 self, 383 destination, 384 source=c.TRACEROUTE_SOURCE, 385 ttl=c.TRACEROUTE_TTL, 386 timeout=c.TRACEROUTE_TIMEOUT, 387 vrf=c.TRACEROUTE_VRF, 388 ): 389 390 _HOP_ENTRY_PROBE = [ 391 r"\s+", 392 r"(", # beginning of host_name (ip_address) RTT group 393 r"(", # beginning of host_name (ip_address) group only 394 r"([a-zA-Z0-9\.:-]*)", # hostname 395 r"\s+", 396 r"\(?([a-fA-F0-9\.:][^\)]*)\)?", # IP Address between brackets 397 r"(?:\s+\(AS\s+[0-9]+\))?", # AS number -- may or may not be present 398 r")?", # end of host_name (ip_address) group only 399 # also hostname/ip are optional -- they can or cannot be specified 400 # if not specified, means the current probe followed the same path as the previous 401 r"\s+", 402 r"(\d+\.\d+)\s+ms", # RTT 403 r"|\*", # OR *, when non responsive hop 404 r")", # end of host_name (ip_address) RTT group 405 ] 406 407 _HOP_ENTRY = [r"\s?", r"(\d+)"] # space before hop index? # hop index 408 409 traceroute_result = {} 410 timeout = 5 # seconds 411 probes = 3 # 3 probes/jop and this cannot be changed on NXOS! 412 413 version = "" 414 try: 415 version = "6" if IPAddress(destination).version == 6 else "" 416 except AddrFormatError: 417 # Allow use of DNS names 418 pass 419 420 if source: 421 source_opt = "source {source}".format(source=source) 422 command = "traceroute{version} {destination} {source_opt}".format( 423 version=version, destination=destination, source_opt=source_opt 424 ) 425 else: 426 command = "traceroute{version} {destination}".format( 427 version=version, destination=destination 428 ) 429 430 if vrf != "": 431 command += " vrf {vrf}".format(vrf=vrf) 432 433 try: 434 traceroute_raw_output = self._send_command(command, raw_text=True) 435 except CommandErrorException: 436 return { 437 "error": "Cannot execute traceroute on the device: {}".format(command) 438 } 439 440 hop_regex = "".join(_HOP_ENTRY + _HOP_ENTRY_PROBE * probes) 441 traceroute_result["success"] = {} 442 if traceroute_raw_output: 443 for line in traceroute_raw_output.splitlines(): 444 hop_search = re.search(hop_regex, line) 445 if not hop_search: 446 continue 447 hop_details = hop_search.groups() 448 hop_index = int(hop_details[0]) 449 previous_probe_host_name = "*" 450 previous_probe_ip_address = "*" 451 traceroute_result["success"][hop_index] = {"probes": {}} 452 for probe_index in range(probes): 453 host_name = hop_details[3 + probe_index * 5] 454 ip_address_raw = hop_details[4 + probe_index * 5] 455 ip_address = napalm.base.helpers.convert( 456 napalm.base.helpers.ip, ip_address_raw, ip_address_raw 457 ) 458 rtt = hop_details[5 + probe_index * 5] 459 if rtt: 460 rtt = float(rtt) 461 else: 462 rtt = timeout * 1000.0 463 if not host_name: 464 host_name = previous_probe_host_name 465 if not ip_address: 466 ip_address = previous_probe_ip_address 467 if hop_details[1 + probe_index * 5] == "*": 468 host_name = "*" 469 ip_address = "*" 470 traceroute_result["success"][hop_index]["probes"][ 471 probe_index + 1 472 ] = { 473 "host_name": str(host_name), 474 "ip_address": str(ip_address), 475 "rtt": rtt, 476 } 477 previous_probe_host_name = host_name 478 previous_probe_ip_address = ip_address 479 return traceroute_result 480 481 def _get_checkpoint_file(self): 482 filename = "temp_cp_file_from_napalm" 483 self._set_checkpoint(filename) 484 command = "show file {}".format(filename) 485 output = self._send_command(command, raw_text=True) 486 self._delete_file(filename) 487 return output 488 489 def _set_checkpoint(self, filename): 490 commands = [ 491 "terminal dont-ask", 492 "checkpoint file {}".format(filename), 493 "no terminal dont-ask", 494 ] 495 self._send_command_list(commands) 496 497 def _save_to_checkpoint(self, filename): 498 """Save the current running config to the given file.""" 499 commands = [ 500 "terminal dont-ask", 501 "checkpoint file {}".format(filename), 502 "no terminal dont-ask", 503 ] 504 self._send_command_list(commands) 505 506 def _delete_file(self, filename): 507 commands = [ 508 "terminal dont-ask", 509 "delete {}".format(filename), 510 "no terminal dont-ask", 511 ] 512 self._send_command_list(commands) 513 514 @staticmethod 515 def _create_tmp_file(config): 516 tmp_dir = tempfile.gettempdir() 517 rand_fname = str(uuid.uuid4()) 518 filename = os.path.join(tmp_dir, rand_fname) 519 with open(filename, "wt") as fobj: 520 fobj.write(config) 521 return filename 522 523 def _disable_confirmation(self): 524 self._send_command_list(["terminal dont-ask"]) 525 526 def get_config(self, retrieve="all", full=False, sanitized=False): 527 528 # NX-OS adds some extra, unneeded lines that should be filtered. 529 filter_strings = [ 530 r"!Command: show .*$", 531 r"!Time:.*\d{4}\s*$", 532 r"Startup config saved at:.*$", 533 ] 534 filter_pattern = generate_regex_or(filter_strings) 535 536 config = {"startup": "", "running": "", "candidate": ""} # default values 537 # NX-OS only supports "all" on "show run" 538 run_full = " all" if full else "" 539 540 if retrieve.lower() in ("running", "all"): 541 command = f"show running-config{run_full}" 542 output = self._send_command(command, raw_text=True) 543 output = re.sub(filter_pattern, "", output, flags=re.M) 544 config["running"] = output.strip() 545 if retrieve.lower() in ("startup", "all"): 546 command = "show startup-config" 547 output = self._send_command(command, raw_text=True) 548 output = re.sub(filter_pattern, "", output, flags=re.M) 549 config["startup"] = output.strip() 550 551 if sanitized: 552 return napalm.base.helpers.sanitize_configs( 553 config, c.CISCO_SANITIZE_FILTERS 554 ) 555 556 return config 557 558 def get_lldp_neighbors(self): 559 """IOS implementation of get_lldp_neighbors.""" 560 lldp = {} 561 neighbors_detail = self.get_lldp_neighbors_detail() 562 for intf_name, entries in neighbors_detail.items(): 563 lldp[intf_name] = [] 564 for lldp_entry in entries: 565 hostname = lldp_entry["remote_system_name"] 566 # Match IOS behaviour of taking remote chassis ID 567 # When lacking a system name (in show lldp neighbors) 568 if hostname == "N/A": 569 hostname = lldp_entry["remote_chassis_id"] 570 lldp_dict = {"port": lldp_entry["remote_port"], "hostname": hostname} 571 lldp[intf_name].append(lldp_dict) 572 573 return lldp 574 575 def get_lldp_neighbors_detail(self, interface=""): 576 lldp = {} 577 lldp_interfaces = [] 578 579 if interface: 580 command = "show lldp neighbors interface {} detail".format(interface) 581 else: 582 command = "show lldp neighbors detail" 583 lldp_entries = self._send_command(command, raw_text=True) 584 lldp_entries = str(lldp_entries) 585 lldp_entries = napalm.base.helpers.textfsm_extractor( 586 self, "show_lldp_neighbors_detail", lldp_entries 587 ) 588 589 if len(lldp_entries) == 0: 590 return {} 591 592 for idx, lldp_entry in enumerate(lldp_entries): 593 local_intf = lldp_entry.pop("local_interface") or lldp_interfaces[idx] 594 # Convert any 'not advertised' to an empty string 595 for field in lldp_entry: 596 if "not advertised" in lldp_entry[field]: 597 lldp_entry[field] = "" 598 # Add field missing on IOS 599 lldp_entry["parent_interface"] = "" 600 # Translate the capability fields 601 lldp_entry[ 602 "remote_system_capab" 603 ] = napalm.base.helpers.transform_lldp_capab( 604 lldp_entry["remote_system_capab"] 605 ) 606 lldp_entry[ 607 "remote_system_enable_capab" 608 ] = napalm.base.helpers.transform_lldp_capab( 609 lldp_entry["remote_system_enable_capab"] 610 ) 611 # Turn the interfaces into their long version 612 local_intf = napalm.base.helpers.canonical_interface_name(local_intf) 613 lldp.setdefault(local_intf, []) 614 lldp[local_intf].append(lldp_entry) 615 616 return lldp 617 618 @staticmethod 619 def _get_table_rows(parent_table, table_name, row_name): 620 """ 621 Inconsistent behavior: 622 {'TABLE_intf': [{'ROW_intf': { 623 vs 624 {'TABLE_mac_address': {'ROW_mac_address': [{ 625 vs 626 {'TABLE_vrf': {'ROW_vrf': {'TABLE_adj': {'ROW_adj': { 627 """ 628 if parent_table is None: 629 return [] 630 _table = parent_table.get(table_name) 631 _table_rows = [] 632 if isinstance(_table, list): 633 _table_rows = [_table_row.get(row_name) for _table_row in _table] 634 elif isinstance(_table, dict): 635 _table_rows = _table.get(row_name) 636 if not isinstance(_table_rows, list): 637 _table_rows = [_table_rows] 638 return _table_rows 639 640 def _get_reply_table(self, result, table_name, row_name): 641 return self._get_table_rows(result, table_name, row_name) 642 643 def _get_command_table(self, command, table_name, row_name): 644 json_output = self._send_command(command) 645 if type(json_output) is not dict and json_output: 646 json_output = json.loads(json_output) 647 return self._get_reply_table(json_output, table_name, row_name) 648 649 def _parse_vlan_ports(self, vlan_s): 650 vlans = [] 651 find_regexp = r"^([A-Za-z\/-]+|.*\/)(\d+)-(\d+)$" 652 vlan_str = "" 653 654 if isinstance(vlan_s, list): 655 vlan_str = ",".join(vlan_s) 656 else: 657 vlan_str = vlan_s 658 659 for vls in vlan_str.split(","): 660 find = re.findall(find_regexp, vls.strip()) 661 if find: 662 for i in range(int(find[0][1]), int(find[0][2]) + 1): 663 vlans.append( 664 napalm.base.helpers.canonical_interface_name( 665 find[0][0] + str(i) 666 ) 667 ) 668 else: 669 vlans.append(napalm.base.helpers.canonical_interface_name(vls.strip())) 670 return vlans 671 672 673class NXOSDriver(NXOSDriverBase): 674 def __init__(self, hostname, username, password, timeout=60, optional_args=None): 675 super().__init__( 676 hostname, username, password, timeout=timeout, optional_args=optional_args 677 ) 678 if optional_args is None: 679 optional_args = {} 680 681 # nxos_protocol is there for backwards compatibility, transport is the preferred method 682 self.transport = optional_args.get( 683 "transport", optional_args.get("nxos_protocol", "https") 684 ) 685 if self.transport == "https": 686 self.port = optional_args.get("port", 443) 687 elif self.transport == "http": 688 self.port = optional_args.get("port", 80) 689 690 self.ssl_verify = optional_args.get("ssl_verify", False) 691 self.platform = "nxos" 692 693 def open(self): 694 try: 695 self.device = NXOSDevice( 696 host=self.hostname, 697 username=self.username, 698 password=self.password, 699 timeout=self.timeout, 700 port=self.port, 701 transport=self.transport, 702 verify=self.ssl_verify, 703 api_format="jsonrpc", 704 ) 705 self._send_command("show hostname") 706 except (NXAPIConnectionError, NXAPIAuthError): 707 # unable to open connection 708 raise ConnectionException("Cannot connect to {}".format(self.hostname)) 709 710 def close(self): 711 self.device = None 712 713 def _send_command(self, command, raw_text=False): 714 """ 715 Wrapper for NX-API show method. 716 717 Allows more code sharing between NX-API and SSH. 718 """ 719 return self.device.show(command, raw_text=raw_text) 720 721 def _send_command_list(self, commands): 722 return self.device.config_list(commands) 723 724 def _send_config(self, commands): 725 if isinstance(commands, str): 726 # Has to be a list generator and not generator expression (not JSON serializable) 727 commands = [command for command in commands.splitlines() if command] 728 return self.device.config_list(commands) 729 730 @staticmethod 731 def _compute_timestamp(stupid_cisco_output): 732 """ 733 Some fields such `uptime` are returned as: 23week(s) 3day(s) 734 This method will determine the epoch of the event. 735 e.g.: 23week(s) 3day(s) -> 1462248287 736 """ 737 if not stupid_cisco_output or stupid_cisco_output == "never": 738 return -1.0 739 740 if "(s)" in stupid_cisco_output: 741 pass 742 elif ":" in stupid_cisco_output: 743 stupid_cisco_output = stupid_cisco_output.replace(":", "hour(s) ", 1) 744 stupid_cisco_output = stupid_cisco_output.replace(":", "minute(s) ", 1) 745 stupid_cisco_output += "second(s)" 746 else: 747 stupid_cisco_output = stupid_cisco_output.replace("d", "day(s) ") 748 stupid_cisco_output = stupid_cisco_output.replace("h", "hour(s)") 749 750 things = { 751 "second(s)": {"weight": 1}, 752 "minute(s)": {"weight": 60}, 753 "hour(s)": {"weight": 3600}, 754 "day(s)": {"weight": 24 * 3600}, 755 "week(s)": {"weight": 7 * 24 * 3600}, 756 "year(s)": {"weight": 365.25 * 24 * 3600}, 757 } 758 759 things_keys = things.keys() 760 for part in stupid_cisco_output.split(): 761 for key in things_keys: 762 if key in part: 763 things[key]["count"] = napalm.base.helpers.convert( 764 int, part.replace(key, ""), 0 765 ) 766 767 delta = sum( 768 [det.get("count", 0) * det.get("weight") for det in things.values()] 769 ) 770 return time.time() - delta 771 772 def is_alive(self): 773 if self.device: 774 return {"is_alive": True} 775 else: 776 return {"is_alive": False} 777 778 def _copy_run_start(self): 779 results = self.device.save(filename="startup-config") 780 if not results: 781 msg = "Unable to save running-config to startup-config!" 782 raise CommandErrorException(msg) 783 784 def _load_cfg_from_checkpoint(self): 785 commands = [ 786 "terminal dont-ask", 787 "rollback running-config file {}".format(self.candidate_cfg), 788 "no terminal dont-ask", 789 ] 790 try: 791 rollback_result = self._send_command_list(commands) 792 except ConnectionError: 793 # requests will raise an error with verbose warning output (don't fail on this). 794 return 795 finally: 796 self.changed = True 797 798 # For nx-api a list is returned so extract the result associated with the 799 # 'rollback' command. 800 rollback_result = rollback_result[1] 801 msg = ( 802 rollback_result.get("msg") 803 if rollback_result.get("msg") 804 else rollback_result 805 ) 806 error_msg = True if rollback_result.get("error") else False 807 808 if "Rollback failed." in msg or error_msg: 809 raise ReplaceConfigException(msg) 810 elif rollback_result == []: 811 raise ReplaceConfigException 812 813 def rollback(self): 814 if self.changed: 815 self.device.rollback(self.rollback_cfg) 816 self._copy_run_start() 817 self.changed = False 818 819 def get_facts(self): 820 facts = {} 821 facts["vendor"] = "Cisco" 822 823 show_inventory_table = self._get_command_table( 824 "show inventory", "TABLE_inv", "ROW_inv" 825 ) 826 if isinstance(show_inventory_table, dict): 827 show_inventory_table = [show_inventory_table] 828 829 facts["serial_number"] = None 830 831 for row in show_inventory_table: 832 if row["name"] == '"Chassis"' or row["name"] == "Chassis": 833 facts["serial_number"] = row.get("serialnum", "") 834 break 835 836 show_version = self._send_command("show version") 837 facts["model"] = show_version.get("chassis_id", "") 838 facts["hostname"] = show_version.get("host_name", "") 839 facts["os_version"] = show_version.get( 840 "sys_ver_str", show_version.get("kickstart_ver_str", "") 841 ) 842 843 uptime_days = int(show_version.get("kern_uptm_days", 0)) 844 uptime_hours = int(show_version.get("kern_uptm_hrs", 0)) 845 uptime_mins = int(show_version.get("kern_uptm_mins", 0)) 846 uptime_secs = int(show_version.get("kern_uptm_secs", 0)) 847 848 uptime = 0 849 uptime += uptime_days * 24 * 60 * 60 850 uptime += uptime_hours * 60 * 60 851 uptime += uptime_mins * 60 852 uptime += uptime_secs 853 854 facts["uptime"] = uptime 855 856 iface_cmd = "show interface" 857 interfaces_out = self._send_command(iface_cmd) 858 interfaces_body = interfaces_out["TABLE_interface"]["ROW_interface"] 859 interface_list = [intf_data["interface"] for intf_data in interfaces_body] 860 facts["interface_list"] = interface_list 861 862 hostname_cmd = "show hostname" 863 hostname = self._send_command(hostname_cmd).get("hostname") 864 if hostname: 865 facts["fqdn"] = hostname 866 867 return facts 868 869 def get_interfaces(self): 870 interfaces = {} 871 iface_cmd = "show interface" 872 interfaces_out = self._send_command(iface_cmd) 873 interfaces_body = interfaces_out["TABLE_interface"]["ROW_interface"] 874 875 for interface_details in interfaces_body: 876 interface_name = interface_details.get("interface") 877 878 if interface_details.get("eth_mtu"): 879 interface_mtu = int(interface_details["eth_mtu"]) 880 elif interface_details.get("svi_mtu"): 881 interface_mtu = int(interface_details["svi_mtu"]) 882 else: 883 interface_mtu = 0 884 885 # Earlier version of Nexus returned a list for 'eth_bw' (observed on 7.1(0)N1(1a)) 886 if interface_details.get("eth_bw"): 887 interface_speed = interface_details["eth_bw"] 888 elif interface_details.get("svi_bw"): 889 interface_speed = interface_details["svi_bw"] 890 else: 891 interface_speed = 0 892 if isinstance(interface_speed, list): 893 interface_speed = interface_speed[0] 894 interface_speed = int(int(interface_speed) / 1000) 895 896 if "admin_state" in interface_details: 897 is_up = interface_details.get("admin_state", "") == "up" 898 elif "svi_admin_state" in interface_details: 899 is_up = interface_details.get("svi_admin_state", "") == "up" 900 else: 901 is_up = interface_details.get("state", "") == "up" 902 if interface_details.get("eth_hw_addr"): 903 mac_address = interface_details["eth_hw_addr"] 904 elif interface_details.get("svi_mac"): 905 mac_address = interface_details["svi_mac"].strip() 906 else: 907 mac_address = None 908 interfaces[interface_name] = { 909 "is_up": is_up, 910 "is_enabled": ( 911 interface_details.get("state") == "up" 912 or interface_details.get("svi_admin_state") == "up" 913 ), 914 "description": str(interface_details.get("desc", "").strip('"')), 915 "last_flapped": self._compute_timestamp( 916 interface_details.get("eth_link_flapped", "") 917 ), 918 "speed": interface_speed, 919 "mtu": interface_mtu, 920 "mac_address": napalm.base.helpers.convert( 921 napalm.base.helpers.mac, mac_address 922 ), 923 } 924 return interfaces 925 926 def get_bgp_neighbors(self): 927 results = {} 928 bgp_state_dict = { 929 "Idle": {"is_up": False, "is_enabled": True}, 930 "Active": {"is_up": False, "is_enabled": True}, 931 "Open": {"is_up": False, "is_enabled": True}, 932 "Established": {"is_up": True, "is_enabled": True}, 933 "Closing": {"is_up": True, "is_enabled": True}, 934 "Shutdown": {"is_up": False, "is_enabled": False}, 935 } 936 """ 937 af_name_dict = { 938 'af-id': {'safi': "af-name"}, 939 'af-id': {'safi': "af-name"}, 940 'af-id': {'safi': "af-name"} 941 } 942 """ 943 af_name_dict = { 944 1: {1: "ipv4", 128: "vpnv4"}, 945 2: {1: "ipv6", 128: "vpnv6"}, 946 25: {70: "l2vpn"}, 947 } 948 949 try: 950 cmd = "show bgp all summary vrf all" 951 vrf_list = self._get_command_table(cmd, "TABLE_vrf", "ROW_vrf") 952 except NXAPICommandError: 953 vrf_list = [] 954 955 for vrf_dict in vrf_list: 956 result_vrf_dict = {"router_id": str(vrf_dict["vrf-router-id"]), "peers": {}} 957 958 af_list = vrf_dict.get("TABLE_af", {}).get("ROW_af", []) 959 if isinstance(af_list, dict): 960 af_list = [af_list] 961 962 for af_dict in af_list: 963 saf_dict = af_dict.get("TABLE_saf", {}).get("ROW_saf", {}) 964 neighbors_list = saf_dict.get("TABLE_neighbor", {}).get( 965 "ROW_neighbor", [] 966 ) 967 968 if isinstance(neighbors_list, dict): 969 neighbors_list = [neighbors_list] 970 971 for neighbor_dict in neighbors_list: 972 neighborid = napalm.base.helpers.ip(neighbor_dict["neighborid"]) 973 remoteas = as_number(neighbor_dict["neighboras"]) 974 state = str(neighbor_dict["state"]) 975 976 bgp_state = bgp_state_dict[state] 977 afid_dict = af_name_dict[int(af_dict["af-id"])] 978 safi_name = afid_dict[int(saf_dict["safi"])] 979 980 result_peer_dict = { 981 "local_as": as_number(vrf_dict["vrf-local-as"]), 982 "remote_as": remoteas, 983 "remote_id": neighborid, 984 "is_enabled": bgp_state["is_enabled"], 985 "uptime": -1, 986 "description": "", 987 "is_up": bgp_state["is_up"], 988 "address_family": { 989 safi_name: { 990 "sent_prefixes": -1, 991 "accepted_prefixes": -1, 992 "received_prefixes": int( 993 neighbor_dict["prefixreceived"] 994 ), 995 } 996 }, 997 } 998 result_vrf_dict["peers"][neighborid] = result_peer_dict 999 1000 vrf_name = vrf_dict["vrf-name-out"] 1001 if vrf_name == "default": 1002 vrf_name = "global" 1003 results[vrf_name] = result_vrf_dict 1004 return results 1005 1006 def cli(self, commands): 1007 cli_output = {} 1008 if type(commands) is not list: 1009 raise TypeError("Please enter a valid list of commands!") 1010 1011 for command in commands: 1012 command_output = self._send_command(command, raw_text=True) 1013 cli_output[str(command)] = command_output 1014 return cli_output 1015 1016 def get_arp_table(self, vrf=""): 1017 if vrf: 1018 msg = "VRF support has not been added for this getter on this platform." 1019 raise NotImplementedError(msg) 1020 1021 arp_table = [] 1022 command = "show ip arp" 1023 arp_table_vrf = self._get_command_table(command, "TABLE_vrf", "ROW_vrf") 1024 arp_table_raw = self._get_table_rows(arp_table_vrf[0], "TABLE_adj", "ROW_adj") 1025 1026 for arp_table_entry in arp_table_raw: 1027 raw_ip = arp_table_entry.get("ip-addr-out") 1028 raw_mac = arp_table_entry.get("mac") 1029 age = arp_table_entry.get("time-stamp") 1030 if age == "-": 1031 age_sec = -1.0 1032 elif ":" not in age: 1033 # Cisco sometimes returns a sub second arp time 0.411797 1034 try: 1035 age_sec = float(age) 1036 except ValueError: 1037 age_sec = -1.0 1038 else: 1039 fields = age.split(":") 1040 if len(fields) == 3: 1041 try: 1042 fields = [float(x) for x in fields] 1043 hours, minutes, seconds = fields 1044 age_sec = 3600 * hours + 60 * minutes + seconds 1045 except ValueError: 1046 age_sec = -1.0 1047 age_sec = round(age_sec, 1) 1048 1049 interface = str(arp_table_entry.get("intf-out")) 1050 arp_table.append( 1051 { 1052 "interface": interface, 1053 "mac": napalm.base.helpers.convert( 1054 napalm.base.helpers.mac, raw_mac, raw_mac 1055 ), 1056 "ip": napalm.base.helpers.ip(raw_ip), 1057 "age": age_sec, 1058 } 1059 ) 1060 return arp_table 1061 1062 def _get_ntp_entity(self, peer_type): 1063 ntp_entities = {} 1064 command = "show ntp peers" 1065 ntp_peers_table = self._get_command_table(command, "TABLE_peers", "ROW_peers") 1066 1067 for ntp_peer in ntp_peers_table: 1068 if ntp_peer.get("serv_peer", "").strip() != peer_type: 1069 continue 1070 peer_addr = napalm.base.helpers.ip(ntp_peer.get("PeerIPAddress").strip()) 1071 ntp_entities[peer_addr] = {} 1072 1073 return ntp_entities 1074 1075 def get_ntp_peers(self): 1076 return self._get_ntp_entity("Peer") 1077 1078 def get_ntp_servers(self): 1079 return self._get_ntp_entity("Server") 1080 1081 def get_ntp_stats(self): 1082 ntp_stats = [] 1083 command = "show ntp peer-status" 1084 ntp_stats_table = self._get_command_table( 1085 command, "TABLE_peersstatus", "ROW_peersstatus" 1086 ) 1087 1088 for ntp_peer in ntp_stats_table: 1089 peer_address = napalm.base.helpers.ip(ntp_peer.get("remote").strip()) 1090 syncmode = ntp_peer.get("syncmode") 1091 stratum = int(ntp_peer.get("st")) 1092 hostpoll = int(ntp_peer.get("poll")) 1093 reachability = int(ntp_peer.get("reach")) 1094 delay = float(ntp_peer.get("delay")) 1095 ntp_stats.append( 1096 { 1097 "remote": peer_address, 1098 "synchronized": (syncmode == "*"), 1099 "referenceid": peer_address, 1100 "stratum": stratum, 1101 "type": "", 1102 "when": "", 1103 "hostpoll": hostpoll, 1104 "reachability": reachability, 1105 "delay": delay, 1106 "offset": 0.0, 1107 "jitter": 0.0, 1108 } 1109 ) 1110 return ntp_stats 1111 1112 def get_interfaces_ip(self): 1113 interfaces_ip = {} 1114 ipv4_command = "show ip interface" 1115 ipv4_interf_table_vrf = self._get_command_table( 1116 ipv4_command, "TABLE_intf", "ROW_intf" 1117 ) 1118 1119 for interface in ipv4_interf_table_vrf: 1120 interface_name = str(interface.get("intf-name", "")) 1121 addr_str = interface.get("prefix") 1122 unnumbered = str(interface.get("unnum-intf", "")) 1123 if addr_str: 1124 address = napalm.base.helpers.ip(addr_str) 1125 prefix = int(interface.get("masklen", "")) 1126 if interface_name not in interfaces_ip.keys(): 1127 interfaces_ip[interface_name] = {} 1128 if "ipv4" not in interfaces_ip[interface_name].keys(): 1129 interfaces_ip[interface_name]["ipv4"] = {} 1130 if address not in interfaces_ip[interface_name].get("ipv4"): 1131 interfaces_ip[interface_name]["ipv4"][address] = {} 1132 interfaces_ip[interface_name]["ipv4"][address].update( 1133 {"prefix_length": prefix} 1134 ) 1135 elif unnumbered: 1136 for interf in ipv4_interf_table_vrf: 1137 interf_name = str(interf.get("intf-name", "")) 1138 if interf_name == unnumbered: 1139 address = napalm.base.helpers.ip(interf.get("prefix")) 1140 prefix = int(interf.get("masklen", "")) 1141 if interface_name not in interfaces_ip.keys(): 1142 interfaces_ip[interface_name] = {} 1143 if "ipv4" not in interfaces_ip[interface_name].keys(): 1144 interfaces_ip[interface_name]["ipv4"] = {} 1145 if address not in interfaces_ip[interface_name].get("ipv4"): 1146 interfaces_ip[interface_name]["ipv4"][address] = {} 1147 interfaces_ip[interface_name]["ipv4"][address].update( 1148 {"prefix_length": prefix} 1149 ) 1150 1151 secondary_addresses = interface.get("TABLE_secondary_address", {}).get( 1152 "ROW_secondary_address", [] 1153 ) 1154 if type(secondary_addresses) is dict: 1155 secondary_addresses = [secondary_addresses] 1156 for secondary_address in secondary_addresses: 1157 secondary_address_ip = napalm.base.helpers.ip( 1158 secondary_address.get("prefix1") 1159 ) 1160 secondary_address_prefix = int(secondary_address.get("masklen1", "")) 1161 if "ipv4" not in interfaces_ip[interface_name].keys(): 1162 interfaces_ip[interface_name]["ipv4"] = {} 1163 if secondary_address_ip not in interfaces_ip[interface_name].get( 1164 "ipv4" 1165 ): 1166 interfaces_ip[interface_name]["ipv4"][secondary_address_ip] = {} 1167 interfaces_ip[interface_name]["ipv4"][secondary_address_ip].update( 1168 {"prefix_length": secondary_address_prefix} 1169 ) 1170 1171 ipv6_command = "show ipv6 interface" 1172 # If the switch doesn't run IPv6 or support it the show ipv6 interface 1173 # command will throw an error so catch it and return the ipv4 addresses 1174 try: 1175 ipv6_interf_table_vrf = self._get_command_table( 1176 ipv6_command, "TABLE_intf", "ROW_intf" 1177 ) 1178 except napalm.nxapi_plumbing.errors.NXAPIPostError: 1179 return interfaces_ip 1180 1181 for interface in ipv6_interf_table_vrf: 1182 interface_name = str(interface.get("intf-name", "")) 1183 1184 if interface_name not in interfaces_ip.keys(): 1185 interfaces_ip[interface_name] = {} 1186 if "ipv6" not in interfaces_ip[interface_name].keys(): 1187 interfaces_ip[interface_name]["ipv6"] = {} 1188 if "addr" not in interface.keys(): 1189 # Handle nexus 9000 ipv6 interface output 1190 if isinstance(interface["TABLE_addr"]["ROW_addr"], list): 1191 addrs = [ 1192 addr["addr"] for addr in interface["TABLE_addr"]["ROW_addr"] 1193 ] 1194 elif isinstance(interface["TABLE_addr"]["ROW_addr"], dict): 1195 addrs = interface["TABLE_addr"]["ROW_addr"]["addr"] 1196 interface["addr"] = addrs 1197 1198 if type(interface.get("addr", "")) is list: 1199 for ipv6_address in interface.get("addr", ""): 1200 address = napalm.base.helpers.ip(ipv6_address.split("/")[0]) 1201 prefix = int(ipv6_address.split("/")[-1]) 1202 if address not in interfaces_ip[interface_name].get("ipv6"): 1203 interfaces_ip[interface_name]["ipv6"][address] = {} 1204 interfaces_ip[interface_name]["ipv6"][address].update( 1205 {"prefix_length": prefix} 1206 ) 1207 else: 1208 address = napalm.base.helpers.ip( 1209 interface.get("addr", "").split("/")[0] 1210 ) 1211 prefix = interface.get("prefix", "").split("/")[-1] 1212 if prefix: 1213 prefix = int(interface.get("prefix", "").split("/")[-1]) 1214 else: 1215 prefix = 128 1216 1217 if address not in interfaces_ip[interface_name].get("ipv6"): 1218 interfaces_ip[interface_name]["ipv6"][address] = {} 1219 interfaces_ip[interface_name]["ipv6"][address].update( 1220 {"prefix_length": prefix} 1221 ) 1222 return interfaces_ip 1223 1224 def get_mac_address_table(self): 1225 mac_table = [] 1226 command = "show mac address-table" 1227 mac_table_raw = self._get_command_table( 1228 command, "TABLE_mac_address", "ROW_mac_address" 1229 ) 1230 1231 for mac_entry in mac_table_raw: 1232 raw_mac = mac_entry.get("disp_mac_addr") 1233 interface = str(mac_entry.get("disp_port")) 1234 try: 1235 vlan = int(mac_entry.get("disp_vlan")) 1236 except ValueError: 1237 vlan = 0 1238 active = True 1239 static = mac_entry.get("disp_is_static") != "0" 1240 moves = 0 1241 last_move = 0.0 1242 mac_table.append( 1243 { 1244 "mac": napalm.base.helpers.mac(raw_mac), 1245 "interface": interface, 1246 "vlan": vlan, 1247 "active": active, 1248 "static": static, 1249 "moves": moves, 1250 "last_move": last_move, 1251 } 1252 ) 1253 return mac_table 1254 1255 def get_snmp_information(self): 1256 snmp_information = {} 1257 snmp_command = "show running-config" 1258 snmp_raw_output = self.cli([snmp_command]).get(snmp_command, "") 1259 snmp_config = napalm.base.helpers.textfsm_extractor( 1260 self, "snmp_config", snmp_raw_output 1261 ) 1262 1263 if not snmp_config: 1264 return snmp_information 1265 1266 snmp_information = { 1267 "contact": str(""), 1268 "location": str(""), 1269 "community": {}, 1270 "chassis_id": str(""), 1271 } 1272 1273 for snmp_entry in snmp_config: 1274 contact = str(snmp_entry.get("contact", "")) 1275 if contact: 1276 snmp_information["contact"] = contact 1277 location = str(snmp_entry.get("location", "")) 1278 if location: 1279 snmp_information["location"] = location 1280 1281 community_name = str(snmp_entry.get("community", "")) 1282 if not community_name: 1283 continue 1284 1285 if community_name not in snmp_information["community"].keys(): 1286 snmp_information["community"][community_name] = { 1287 "acl": str(snmp_entry.get("acl", "")), 1288 "mode": str(snmp_entry.get("mode", "").lower()), 1289 } 1290 else: 1291 acl = str(snmp_entry.get("acl", "")) 1292 if acl: 1293 snmp_information["community"][community_name]["acl"] = acl 1294 mode = str(snmp_entry.get("mode", "").lower()) 1295 if mode: 1296 snmp_information["community"][community_name]["mode"] = mode 1297 return snmp_information 1298 1299 def get_users(self): 1300 _CISCO_TO_CISCO_MAP = {"network-admin": 15, "network-operator": 5} 1301 1302 _DEFAULT_USER_DICT = {"password": "", "level": 0, "sshkeys": []} 1303 1304 users = {} 1305 command = "show running-config" 1306 section_username_raw_output = self.cli([command]).get(command, "") 1307 section_username_tabled_output = napalm.base.helpers.textfsm_extractor( 1308 self, "users", section_username_raw_output 1309 ) 1310 1311 for user in section_username_tabled_output: 1312 username = user.get("username", "") 1313 if not username: 1314 continue 1315 if username not in users: 1316 users[username] = _DEFAULT_USER_DICT.copy() 1317 1318 password = user.get("password", "") 1319 if password: 1320 users[username]["password"] = str(password.strip()) 1321 1322 level = 0 1323 role = user.get("role", "") 1324 if role.startswith("priv"): 1325 level = int(role.split("-")[-1]) 1326 else: 1327 level = _CISCO_TO_CISCO_MAP.get(role, 0) 1328 if level > users.get(username).get("level"): 1329 # unfortunately on Cisco you can set different priv levels for the same user 1330 # Good news though: the device will consider the highest level 1331 users[username]["level"] = level 1332 1333 sshkeytype = user.get("sshkeytype", "") 1334 sshkeyvalue = user.get("sshkeyvalue", "") 1335 if sshkeytype and sshkeyvalue: 1336 if sshkeytype not in ["ssh-rsa", "ssh-dsa"]: 1337 continue 1338 users[username]["sshkeys"].append(str(sshkeyvalue)) 1339 return users 1340 1341 def get_network_instances(self, name=""): 1342 """get_network_instances implementation for NX-OS""" 1343 1344 # command 'show vrf detail' returns all VRFs with detailed information 1345 # format: list of dictionaries with keys such as 'vrf_name' and 'rd' 1346 command = "show vrf detail" 1347 vrf_table_raw = self._get_command_table(command, "TABLE_vrf", "ROW_vrf") 1348 1349 # command 'show vrf interface' returns all interfaces including their assigned VRF 1350 # format: list of dictionaries with keys 'if_name', 'vrf_name', 'vrf_id' and 'soo' 1351 command = "show vrf interface" 1352 intf_table_raw = self._get_command_table(command, "TABLE_if", "ROW_if") 1353 1354 # create a dictionary with key = 'vrf_name' and value = list of interfaces 1355 vrf_intfs = defaultdict(list) 1356 for intf in intf_table_raw: 1357 vrf_intfs[intf["vrf_name"]].append(str(intf["if_name"])) 1358 1359 vrfs = {} 1360 for vrf in vrf_table_raw: 1361 vrf_name = str(vrf.get("vrf_name")) 1362 vrfs[vrf_name] = {} 1363 vrfs[vrf_name]["name"] = vrf_name 1364 1365 # differentiate between VRF type 'DEFAULT_INSTANCE' and 'L3VRF' 1366 if vrf_name == "default": 1367 vrfs[vrf_name]["type"] = "DEFAULT_INSTANCE" 1368 else: 1369 vrfs[vrf_name]["type"] = "L3VRF" 1370 1371 vrfs[vrf_name]["state"] = {"route_distinguisher": str(vrf.get("rd"))} 1372 1373 # convert list of interfaces (vrf_intfs[vrf_name]) to expected format 1374 # format = dict with key = interface name and empty values 1375 vrfs[vrf_name]["interfaces"] = {} 1376 vrfs[vrf_name]["interfaces"]["interface"] = dict.fromkeys( 1377 vrf_intfs[vrf_name], {} 1378 ) 1379 1380 # if name of a specific VRF was passed as an argument 1381 # only return results for this particular VRF 1382 if name: 1383 if name in vrfs.keys(): 1384 return {str(name): vrfs[name]} 1385 else: 1386 return {} 1387 # else return results for all VRFs 1388 else: 1389 return vrfs 1390 1391 def get_environment(self): 1392 def _process_pdus(power_data): 1393 normalized = defaultdict(dict) 1394 # some nexus devices have keys postfixed with the shorthand device series name (ie n3k) 1395 # ex. on a 9k, the key is TABLE_psinfo, but on a 3k it is TABLE_psinfo_n3k 1396 ps_info_key = [ 1397 i for i in power_data.keys() if i.startswith("TABLE_psinfo") 1398 ][0] 1399 ps_info_table = power_data[ps_info_key] 1400 # Later version of nxos will have a list under TABLE_psinfo like 1401 # TABLE_psinfo : [{'ROW_psinfo': {... 1402 # and not have the psnum under the row 1403 if isinstance(ps_info_table, list): 1404 # if this is one of those later versions, make the data look like 1405 # the older way 1406 count = 1 1407 tmp_table = [] 1408 for entry in ps_info_table: 1409 tmp = entry.get("ROW_psinfo") 1410 tmp["psnum"] = count 1411 # to access the power supply status, the key looks like it is device dependent 1412 # on a 3k device it is ps_status_3k 1413 status_key = [ 1414 i 1415 for i in entry["ROW_psinfo"].keys() 1416 if i.startswith("ps_status") 1417 ][0] 1418 tmp["ps_status"] = entry["ROW_psinfo"][status_key] 1419 count += 1 1420 tmp_table.append(tmp) 1421 ps_info_table = {"ROW_psinfo": tmp_table} 1422 # some nexus devices have keys postfixed with the shorthand device series name (ie n3k) 1423 # ex. on a 9k the key is ROW_psinfo, but on a 3k it is ROW_psinfo_n3k 1424 ps_info_row_key = [ 1425 i for i in ps_info_table.keys() if i.startswith("ROW_psinfo") 1426 ][0] 1427 for psinfo in ps_info_table[ps_info_row_key]: 1428 normalized[psinfo["psnum"]]["status"] = ( 1429 psinfo.get("ps_status", "ok") == "ok" 1430 ) 1431 normalized[psinfo["psnum"]]["output"] = float(psinfo.get("watts", -1.0)) 1432 # Newer nxos versions provide the total capacity in the `tot_capa` key 1433 if "tot_capa" in psinfo: 1434 normalized[psinfo["psnum"]]["capacity"] = float( 1435 psinfo["tot_capa"].split()[0] 1436 ) 1437 # The capacity of the power supply can be determined by the model 1438 # ie N2200-PAC-400W = 400 watts 1439 else: 1440 ps_model = psinfo.get("psmodel", "-1") 1441 normalized[psinfo["psnum"]]["capacity"] = float( 1442 ps_model.split("-")[-1][:-1] 1443 ) 1444 return json.loads(json.dumps(normalized)) 1445 1446 def _process_fans(fan_data): 1447 normalized = {} 1448 for entry in fan_data["TABLE_faninfo"]["ROW_faninfo"]: 1449 if "PS" in entry["fanname"]: 1450 # Skip fans in power supplies 1451 continue 1452 normalized[entry["fanname"]] = { 1453 # Copying the behavior of eos.py where if the fanstatus key is not found 1454 # we default the status to True 1455 "status": entry.get("fanstatus", "Ok") 1456 == "Ok" 1457 } 1458 return normalized 1459 1460 def _process_temperature(temperature_data): 1461 normalized = {} 1462 # The modname and sensor type are not unique enough keys, so adding a count 1463 count = 1 1464 past_tempmod = "1" 1465 for entry in temperature_data["ROW_tempinfo"]: 1466 mod_name = entry.get("tempmod").rstrip() 1467 # if the mod name has change reset the count to 1 1468 if past_tempmod != mod_name: 1469 count = 1 1470 name = "{}-{} {}".format(mod_name, count, entry.get("sensor").rstrip()) 1471 normalized[name] = { 1472 "temperature": float(entry.get("curtemp", -1)), 1473 "is_alert": entry.get("alarmstatus", "Ok").rstrip() != "Ok", 1474 "is_critical": float(entry.get("curtemp")) 1475 > float(entry.get("majthres")), 1476 } 1477 count += 1 1478 return normalized 1479 1480 def _process_cpu(cpu_data): 1481 idle = ( 1482 cpu_data.get("idle_percent") 1483 if cpu_data.get("idle_percent") 1484 else cpu_data["TABLE_cpu_util"]["ROW_cpu_util"]["idle_percent"] 1485 ) 1486 return {0: {"%usage": round(100 - float(idle), 2)}} 1487 1488 def _process_memory(memory_data): 1489 avail = memory_data["TABLE_process_tag"]["ROW_process_tag"][ 1490 "process-memory-share-total-shm-avail" 1491 ] 1492 used = memory_data["TABLE_process_tag"]["ROW_process_tag"][ 1493 "process-memory-share-total-shm-used" 1494 ] 1495 return {"available_ram": int(avail) * 1000, "used_ram": int(used) * 1000} 1496 1497 environment_raw = self._send_command("show environment") 1498 cpu_raw = self._send_command("show processes cpu") 1499 memory_raw = self._send_command("show processes memory shared") 1500 fan_key = [i for i in environment_raw.keys() if i.startswith("fandetails")][0] 1501 return { 1502 "power": _process_pdus(environment_raw["powersup"]), 1503 "fans": _process_fans(environment_raw[fan_key]), 1504 "temperature": _process_temperature(environment_raw["TABLE_tempinfo"]), 1505 "cpu": _process_cpu(cpu_raw), 1506 "memory": _process_memory(memory_raw), 1507 } 1508 1509 def get_vlans(self): 1510 vlans = {} 1511 command = "show vlan brief" 1512 vlan_table_raw = self._get_command_table( 1513 command, "TABLE_vlanbriefxbrief", "ROW_vlanbriefxbrief" 1514 ) 1515 if isinstance(vlan_table_raw, dict): 1516 vlan_table_raw = [vlan_table_raw] 1517 1518 for vlan in vlan_table_raw: 1519 if "vlanshowplist-ifidx" not in vlan.keys(): 1520 vlan["vlanshowplist-ifidx"] = [] 1521 vlans[vlan["vlanshowbr-vlanid"]] = { 1522 "name": vlan["vlanshowbr-vlanname"], 1523 "interfaces": self._parse_vlan_ports(vlan["vlanshowplist-ifidx"]), 1524 } 1525 return vlans 1526