1"""Miscellaneous utility functions.""" 2from glob import glob 3import sys 4import io 5import os 6from pathlib import Path 7import functools 8from datetime import datetime 9from netmiko._textfsm import _clitable as clitable 10from netmiko._textfsm._clitable import CliTableError 11 12try: 13 from genie.conf.base import Device 14 from genie.libs.parser.utils import get_parser 15 from pyats.datastructures import AttrDict 16 17 GENIE_INSTALLED = True 18except ImportError: 19 GENIE_INSTALLED = False 20 21try: 22 import serial.tools.list_ports 23 24 PYSERIAL_INSTALLED = True 25except ImportError: 26 PYSERIAL_INSTALLED = False 27 28 29# Dictionary mapping 'show run' for vendors with different command 30SHOW_RUN_MAPPER = { 31 "juniper": "show configuration", 32 "juniper_junos": "show configuration", 33 "extreme": "show configuration", 34 "extreme_ers": "show running-config", 35 "extreme_exos": "show configuration", 36 "extreme_netiron": "show running-config", 37 "extreme_nos": "show running-config", 38 "extreme_slx": "show running-config", 39 "extreme_vdx": "show running-config", 40 "extreme_vsp": "show running-config", 41 "extreme_wing": "show running-config", 42 "hp_comware": "display current-configuration", 43 "huawei": "display current-configuration", 44 "fortinet": "show full-configuration", 45 "checkpoint": "show configuration", 46 "cisco_wlc": "show run-config", 47 "enterasys": "show running-config", 48 "dell_force10": "show running-config", 49 "avaya_vsp": "show running-config", 50 "avaya_ers": "show running-config", 51 "brocade_vdx": "show running-config", 52 "brocade_nos": "show running-config", 53 "brocade_fastiron": "show running-config", 54 "brocade_netiron": "show running-config", 55 "alcatel_aos": "show configuration snapshot", 56} 57 58# Expand SHOW_RUN_MAPPER to include '_ssh' key 59new_dict = {} 60for k, v in SHOW_RUN_MAPPER.items(): 61 new_key = k + "_ssh" 62 new_dict[k] = v 63 new_dict[new_key] = v 64SHOW_RUN_MAPPER = new_dict 65 66# Default location of netmiko temp directory for netmiko tools 67NETMIKO_BASE_DIR = "~/.netmiko" 68 69 70def load_yaml_file(yaml_file): 71 """Read YAML file.""" 72 try: 73 import yaml 74 except ImportError: 75 sys.exit("Unable to import yaml module.") 76 try: 77 with io.open(yaml_file, "rt", encoding="utf-8") as fname: 78 return yaml.safe_load(fname) 79 except IOError: 80 sys.exit(f"Unable to open YAML file: {yaml_file}") 81 82 83def load_devices(file_name=None): 84 """Find and load .netmiko.yml file.""" 85 yaml_devices_file = find_cfg_file(file_name) 86 return load_yaml_file(yaml_devices_file) 87 88 89def find_cfg_file(file_name=None): 90 """ 91 Search for netmiko_tools inventory file in the following order: 92 NETMIKO_TOOLS_CFG environment variable 93 Current directory 94 Home directory 95 Look for file named: .netmiko.yml or netmiko.yml 96 Also allow NETMIKO_TOOLS_CFG to point directly at a file 97 """ 98 if file_name: 99 if os.path.isfile(file_name): 100 return file_name 101 optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "") 102 if os.path.isfile(optional_path): 103 return optional_path 104 search_paths = [optional_path, ".", os.path.expanduser("~")] 105 # Filter optional_path if null 106 search_paths = [path for path in search_paths if path] 107 for path in search_paths: 108 files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml") 109 if files: 110 return files[0] 111 raise IOError( 112 ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory, current " 113 "directory, or home directory." 114 ) 115 116 117def display_inventory(my_devices): 118 """Print out inventory devices and groups.""" 119 inventory_groups = ["all"] 120 inventory_devices = [] 121 for k, v in my_devices.items(): 122 if isinstance(v, list): 123 inventory_groups.append(k) 124 elif isinstance(v, dict): 125 inventory_devices.append((k, v["device_type"])) 126 127 inventory_groups.sort() 128 inventory_devices.sort(key=lambda x: x[0]) 129 print("\nDevices:") 130 print("-" * 40) 131 for a_device, device_type in inventory_devices: 132 device_type = f" ({device_type})" 133 print(f"{a_device:<25}{device_type:>15}") 134 print("\n\nGroups:") 135 print("-" * 40) 136 for a_group in inventory_groups: 137 print(a_group) 138 print() 139 140 141def obtain_all_devices(my_devices): 142 """Dynamically create 'all' group.""" 143 new_devices = {} 144 for device_name, device_or_group in my_devices.items(): 145 # Skip any groups 146 if not isinstance(device_or_group, list): 147 new_devices[device_name] = device_or_group 148 return new_devices 149 150 151def obtain_netmiko_filename(device_name): 152 """Create file name based on device_name.""" 153 _, netmiko_full_dir = find_netmiko_dir() 154 return f"{netmiko_full_dir}/{device_name}.txt" 155 156 157def write_tmp_file(device_name, output): 158 file_name = obtain_netmiko_filename(device_name) 159 with open(file_name, "w") as f: 160 f.write(output) 161 return file_name 162 163 164def ensure_dir_exists(verify_dir): 165 """Ensure directory exists. Create if necessary.""" 166 if not os.path.exists(verify_dir): 167 # Doesn't exist create dir 168 os.makedirs(verify_dir) 169 else: 170 # Exists 171 if not os.path.isdir(verify_dir): 172 # Not a dir, raise an exception 173 raise ValueError(f"{verify_dir} is not a directory") 174 175 176def find_netmiko_dir(): 177 """Check environment first, then default dir""" 178 try: 179 netmiko_base_dir = os.environ["NETMIKO_DIR"] 180 except KeyError: 181 netmiko_base_dir = NETMIKO_BASE_DIR 182 netmiko_base_dir = os.path.expanduser(netmiko_base_dir) 183 if netmiko_base_dir == "/": 184 raise ValueError("/ cannot be netmiko_base_dir") 185 netmiko_full_dir = f"{netmiko_base_dir}/tmp" 186 return (netmiko_base_dir, netmiko_full_dir) 187 188 189def write_bytes(out_data, encoding="ascii"): 190 """Legacy for Python2 and Python3 compatible byte stream.""" 191 if sys.version_info[0] >= 3: 192 if isinstance(out_data, type("")): 193 if encoding == "utf-8": 194 return out_data.encode("utf-8") 195 else: 196 return out_data.encode("ascii", "ignore") 197 elif isinstance(out_data, type(b"")): 198 return out_data 199 msg = "Invalid value for out_data neither unicode nor byte string: {}".format( 200 out_data 201 ) 202 raise ValueError(msg) 203 204 205def check_serial_port(name): 206 """returns valid COM Port.""" 207 208 if not PYSERIAL_INSTALLED: 209 msg = ( 210 "\npyserial is not installed. Please PIP install pyserial:\n\n" 211 "pip install pyserial\n\n" 212 ) 213 raise ValueError(msg) 214 215 try: 216 cdc = next(serial.tools.list_ports.grep(name)) 217 return cdc[0] 218 except StopIteration: 219 msg = f"device {name} not found. " 220 msg += "available devices are: " 221 ports = list(serial.tools.list_ports.comports()) 222 for p in ports: 223 msg += f"{str(p)}," 224 raise ValueError(msg) 225 226 227def get_template_dir(): 228 """Find and return the ntc-templates/templates dir.""" 229 try: 230 template_dir = os.path.expanduser(os.environ["NET_TEXTFSM"]) 231 index = os.path.join(template_dir, "index") 232 if not os.path.isfile(index): 233 # Assume only base ./ntc-templates specified 234 template_dir = os.path.join(template_dir, "templates") 235 except KeyError: 236 # Construct path ~/ntc-templates/templates 237 home_dir = os.path.expanduser("~") 238 template_dir = os.path.join(home_dir, "ntc-templates", "templates") 239 240 index = os.path.join(template_dir, "index") 241 if not os.path.isdir(template_dir) or not os.path.isfile(index): 242 msg = """ 243Valid ntc-templates not found, please install https://github.com/networktocode/ntc-templates 244and then set the NET_TEXTFSM environment variable to point to the ./ntc-templates/templates 245directory.""" 246 raise ValueError(msg) 247 return os.path.abspath(template_dir) 248 249 250def clitable_to_dict(cli_table): 251 """Converts TextFSM cli_table object to list of dictionaries.""" 252 objs = [] 253 for row in cli_table: 254 temp_dict = {} 255 for index, element in enumerate(row): 256 temp_dict[cli_table.header[index].lower()] = element 257 objs.append(temp_dict) 258 return objs 259 260 261def _textfsm_parse(textfsm_obj, raw_output, attrs, template_file=None): 262 """Perform the actual TextFSM parsing using the CliTable object.""" 263 try: 264 # Parse output through template 265 if template_file is not None: 266 textfsm_obj.ParseCmd(raw_output, templates=template_file) 267 else: 268 textfsm_obj.ParseCmd(raw_output, attrs) 269 structured_data = clitable_to_dict(textfsm_obj) 270 output = raw_output if structured_data == [] else structured_data 271 return output 272 except (FileNotFoundError, CliTableError): 273 return raw_output 274 275 276def get_structured_data(raw_output, platform=None, command=None, template=None): 277 """ 278 Convert raw CLI output to structured data using TextFSM template. 279 280 You can use a straight TextFSM file i.e. specify "template". If no template is specified, 281 then you must use an CliTable index file. 282 """ 283 if platform is None or command is None: 284 attrs = {} 285 else: 286 attrs = {"Command": command, "Platform": platform} 287 288 if template is None: 289 if attrs == {}: 290 raise ValueError( 291 "Either 'platform/command' or 'template' must be specified." 292 ) 293 template_dir = get_template_dir() 294 index_file = os.path.join(template_dir, "index") 295 textfsm_obj = clitable.CliTable(index_file, template_dir) 296 return _textfsm_parse(textfsm_obj, raw_output, attrs) 297 else: 298 template_path = Path(os.path.expanduser(template)) 299 template_file = template_path.name 300 template_dir = template_path.parents[0] 301 # CliTable with no index will fall-back to a TextFSM parsing behavior 302 textfsm_obj = clitable.CliTable(template_dir=template_dir) 303 return _textfsm_parse( 304 textfsm_obj, raw_output, attrs, template_file=template_file 305 ) 306 307 308def get_structured_data_genie(raw_output, platform, command): 309 if not sys.version_info >= (3, 4): 310 raise ValueError("Genie requires Python >= 3.4") 311 312 if not GENIE_INSTALLED: 313 msg = ( 314 "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n" 315 "pip install genie\npip install pyats\n" 316 ) 317 raise ValueError(msg) 318 319 if "cisco" not in platform: 320 return raw_output 321 322 genie_device_mapper = { 323 "cisco_ios": "ios", 324 "cisco_xe": "iosxe", 325 "cisco_xr": "iosxr", 326 "cisco_nxos": "nxos", 327 "cisco_asa": "asa", 328 } 329 330 os = None 331 # platform might be _ssh, _telnet, _serial strip that off 332 if platform.count("_") > 1: 333 base_platform = platform.split("_")[:-1] 334 base_platform = "_".join(base_platform) 335 else: 336 base_platform = platform 337 338 os = genie_device_mapper.get(base_platform) 339 if os is None: 340 return raw_output 341 342 # Genie specific construct for doing parsing (based on Genie in Ansible) 343 device = Device("new_device", os=os) 344 device.custom.setdefault("abstraction", {}) 345 device.custom["abstraction"]["order"] = ["os"] 346 device.cli = AttrDict({"execute": None}) 347 try: 348 # Test of whether their is a parser for the given command (will return Exception if fails) 349 get_parser(command, device) 350 parsed_output = device.parse(command, output=raw_output) 351 return parsed_output 352 except Exception: 353 return raw_output 354 355 356def select_cmd_verify(func): 357 """Override function cmd_verify argument with global setting.""" 358 359 @functools.wraps(func) 360 def wrapper_decorator(self, *args, **kwargs): 361 if self.global_cmd_verify is not None: 362 kwargs["cmd_verify"] = self.global_cmd_verify 363 return func(self, *args, **kwargs) 364 365 return wrapper_decorator 366 367 368def m_exec_time(func): 369 @functools.wraps(func) 370 def wrapper_decorator(self, *args, **kwargs): 371 start_time = datetime.now() 372 result = func(self, *args, **kwargs) 373 end_time = datetime.now() 374 method_name = str(func) 375 print(f"{method_name}: Elapsed time: {end_time - start_time}") 376 return result 377 378 return wrapper_decorator 379 380 381def f_exec_time(func): 382 @functools.wraps(func) 383 def wrapper_decorator(*args, **kwargs): 384 start_time = datetime.now() 385 result = func(*args, **kwargs) 386 end_time = datetime.now() 387 print(f"Elapsed time: {end_time - start_time}") 388 return result 389 390 return wrapper_decorator 391