1"""Miscellaneous utility functions."""
2from glob import glob
3import sys
4import io
5import os
6from pathlib import Path
7import functools
8from datetime import datetime
9from netmiko._textfsm import _clitable as clitable
10from netmiko._textfsm._clitable import CliTableError
11
12try:
13    from genie.conf.base import Device
14    from genie.libs.parser.utils import get_parser
15    from pyats.datastructures import AttrDict
16
17    GENIE_INSTALLED = True
18except ImportError:
19    GENIE_INSTALLED = False
20
21try:
22    import serial.tools.list_ports
23
24    PYSERIAL_INSTALLED = True
25except ImportError:
26    PYSERIAL_INSTALLED = False
27
28
29# Dictionary mapping 'show run' for vendors with different command
30SHOW_RUN_MAPPER = {
31    "juniper": "show configuration",
32    "juniper_junos": "show configuration",
33    "extreme": "show configuration",
34    "extreme_ers": "show running-config",
35    "extreme_exos": "show configuration",
36    "extreme_netiron": "show running-config",
37    "extreme_nos": "show running-config",
38    "extreme_slx": "show running-config",
39    "extreme_vdx": "show running-config",
40    "extreme_vsp": "show running-config",
41    "extreme_wing": "show running-config",
42    "hp_comware": "display current-configuration",
43    "huawei": "display current-configuration",
44    "fortinet": "show full-configuration",
45    "checkpoint": "show configuration",
46    "cisco_wlc": "show run-config",
47    "enterasys": "show running-config",
48    "dell_force10": "show running-config",
49    "avaya_vsp": "show running-config",
50    "avaya_ers": "show running-config",
51    "brocade_vdx": "show running-config",
52    "brocade_nos": "show running-config",
53    "brocade_fastiron": "show running-config",
54    "brocade_netiron": "show running-config",
55    "alcatel_aos": "show configuration snapshot",
56}
57
58# Expand SHOW_RUN_MAPPER to include '_ssh' key
59new_dict = {}
60for k, v in SHOW_RUN_MAPPER.items():
61    new_key = k + "_ssh"
62    new_dict[k] = v
63    new_dict[new_key] = v
64SHOW_RUN_MAPPER = new_dict
65
66# Default location of netmiko temp directory for netmiko tools
67NETMIKO_BASE_DIR = "~/.netmiko"
68
69
70def load_yaml_file(yaml_file):
71    """Read YAML file."""
72    try:
73        import yaml
74    except ImportError:
75        sys.exit("Unable to import yaml module.")
76    try:
77        with io.open(yaml_file, "rt", encoding="utf-8") as fname:
78            return yaml.safe_load(fname)
79    except IOError:
80        sys.exit(f"Unable to open YAML file: {yaml_file}")
81
82
83def load_devices(file_name=None):
84    """Find and load .netmiko.yml file."""
85    yaml_devices_file = find_cfg_file(file_name)
86    return load_yaml_file(yaml_devices_file)
87
88
89def find_cfg_file(file_name=None):
90    """
91    Search for netmiko_tools inventory file in the following order:
92    NETMIKO_TOOLS_CFG environment variable
93    Current directory
94    Home directory
95    Look for file named: .netmiko.yml or netmiko.yml
96    Also allow NETMIKO_TOOLS_CFG to point directly at a file
97    """
98    if file_name:
99        if os.path.isfile(file_name):
100            return file_name
101    optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "")
102    if os.path.isfile(optional_path):
103        return optional_path
104    search_paths = [optional_path, ".", os.path.expanduser("~")]
105    # Filter optional_path if null
106    search_paths = [path for path in search_paths if path]
107    for path in search_paths:
108        files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml")
109        if files:
110            return files[0]
111    raise IOError(
112        ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory, current "
113        "directory, or home directory."
114    )
115
116
117def display_inventory(my_devices):
118    """Print out inventory devices and groups."""
119    inventory_groups = ["all"]
120    inventory_devices = []
121    for k, v in my_devices.items():
122        if isinstance(v, list):
123            inventory_groups.append(k)
124        elif isinstance(v, dict):
125            inventory_devices.append((k, v["device_type"]))
126
127    inventory_groups.sort()
128    inventory_devices.sort(key=lambda x: x[0])
129    print("\nDevices:")
130    print("-" * 40)
131    for a_device, device_type in inventory_devices:
132        device_type = f"  ({device_type})"
133        print(f"{a_device:<25}{device_type:>15}")
134    print("\n\nGroups:")
135    print("-" * 40)
136    for a_group in inventory_groups:
137        print(a_group)
138    print()
139
140
141def obtain_all_devices(my_devices):
142    """Dynamically create 'all' group."""
143    new_devices = {}
144    for device_name, device_or_group in my_devices.items():
145        # Skip any groups
146        if not isinstance(device_or_group, list):
147            new_devices[device_name] = device_or_group
148    return new_devices
149
150
151def obtain_netmiko_filename(device_name):
152    """Create file name based on device_name."""
153    _, netmiko_full_dir = find_netmiko_dir()
154    return f"{netmiko_full_dir}/{device_name}.txt"
155
156
157def write_tmp_file(device_name, output):
158    file_name = obtain_netmiko_filename(device_name)
159    with open(file_name, "w") as f:
160        f.write(output)
161    return file_name
162
163
164def ensure_dir_exists(verify_dir):
165    """Ensure directory exists. Create if necessary."""
166    if not os.path.exists(verify_dir):
167        # Doesn't exist create dir
168        os.makedirs(verify_dir)
169    else:
170        # Exists
171        if not os.path.isdir(verify_dir):
172            # Not a dir, raise an exception
173            raise ValueError(f"{verify_dir} is not a directory")
174
175
176def find_netmiko_dir():
177    """Check environment first, then default dir"""
178    try:
179        netmiko_base_dir = os.environ["NETMIKO_DIR"]
180    except KeyError:
181        netmiko_base_dir = NETMIKO_BASE_DIR
182    netmiko_base_dir = os.path.expanduser(netmiko_base_dir)
183    if netmiko_base_dir == "/":
184        raise ValueError("/ cannot be netmiko_base_dir")
185    netmiko_full_dir = f"{netmiko_base_dir}/tmp"
186    return (netmiko_base_dir, netmiko_full_dir)
187
188
189def write_bytes(out_data, encoding="ascii"):
190    """Legacy for Python2 and Python3 compatible byte stream."""
191    if sys.version_info[0] >= 3:
192        if isinstance(out_data, type("")):
193            if encoding == "utf-8":
194                return out_data.encode("utf-8")
195            else:
196                return out_data.encode("ascii", "ignore")
197        elif isinstance(out_data, type(b"")):
198            return out_data
199    msg = "Invalid value for out_data neither unicode nor byte string: {}".format(
200        out_data
201    )
202    raise ValueError(msg)
203
204
205def check_serial_port(name):
206    """returns valid COM Port."""
207
208    if not PYSERIAL_INSTALLED:
209        msg = (
210            "\npyserial is not installed. Please PIP install pyserial:\n\n"
211            "pip install pyserial\n\n"
212        )
213        raise ValueError(msg)
214
215    try:
216        cdc = next(serial.tools.list_ports.grep(name))
217        return cdc[0]
218    except StopIteration:
219        msg = f"device {name} not found. "
220        msg += "available devices are: "
221        ports = list(serial.tools.list_ports.comports())
222        for p in ports:
223            msg += f"{str(p)},"
224        raise ValueError(msg)
225
226
227def get_template_dir():
228    """Find and return the ntc-templates/templates dir."""
229    try:
230        template_dir = os.path.expanduser(os.environ["NET_TEXTFSM"])
231        index = os.path.join(template_dir, "index")
232        if not os.path.isfile(index):
233            # Assume only base ./ntc-templates specified
234            template_dir = os.path.join(template_dir, "templates")
235    except KeyError:
236        # Construct path ~/ntc-templates/templates
237        home_dir = os.path.expanduser("~")
238        template_dir = os.path.join(home_dir, "ntc-templates", "templates")
239
240    index = os.path.join(template_dir, "index")
241    if not os.path.isdir(template_dir) or not os.path.isfile(index):
242        msg = """
243Valid ntc-templates not found, please install https://github.com/networktocode/ntc-templates
244and then set the NET_TEXTFSM environment variable to point to the ./ntc-templates/templates
245directory."""
246        raise ValueError(msg)
247    return os.path.abspath(template_dir)
248
249
250def clitable_to_dict(cli_table):
251    """Converts TextFSM cli_table object to list of dictionaries."""
252    objs = []
253    for row in cli_table:
254        temp_dict = {}
255        for index, element in enumerate(row):
256            temp_dict[cli_table.header[index].lower()] = element
257        objs.append(temp_dict)
258    return objs
259
260
261def _textfsm_parse(textfsm_obj, raw_output, attrs, template_file=None):
262    """Perform the actual TextFSM parsing using the CliTable object."""
263    try:
264        # Parse output through template
265        if template_file is not None:
266            textfsm_obj.ParseCmd(raw_output, templates=template_file)
267        else:
268            textfsm_obj.ParseCmd(raw_output, attrs)
269        structured_data = clitable_to_dict(textfsm_obj)
270        output = raw_output if structured_data == [] else structured_data
271        return output
272    except (FileNotFoundError, CliTableError):
273        return raw_output
274
275
276def get_structured_data(raw_output, platform=None, command=None, template=None):
277    """
278    Convert raw CLI output to structured data using TextFSM template.
279
280    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
281    then you must use an CliTable index file.
282    """
283    if platform is None or command is None:
284        attrs = {}
285    else:
286        attrs = {"Command": command, "Platform": platform}
287
288    if template is None:
289        if attrs == {}:
290            raise ValueError(
291                "Either 'platform/command' or 'template' must be specified."
292            )
293        template_dir = get_template_dir()
294        index_file = os.path.join(template_dir, "index")
295        textfsm_obj = clitable.CliTable(index_file, template_dir)
296        return _textfsm_parse(textfsm_obj, raw_output, attrs)
297    else:
298        template_path = Path(os.path.expanduser(template))
299        template_file = template_path.name
300        template_dir = template_path.parents[0]
301        # CliTable with no index will fall-back to a TextFSM parsing behavior
302        textfsm_obj = clitable.CliTable(template_dir=template_dir)
303        return _textfsm_parse(
304            textfsm_obj, raw_output, attrs, template_file=template_file
305        )
306
307
308def get_structured_data_genie(raw_output, platform, command):
309    if not sys.version_info >= (3, 4):
310        raise ValueError("Genie requires Python >= 3.4")
311
312    if not GENIE_INSTALLED:
313        msg = (
314            "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n"
315            "pip install genie\npip install pyats\n"
316        )
317        raise ValueError(msg)
318
319    if "cisco" not in platform:
320        return raw_output
321
322    genie_device_mapper = {
323        "cisco_ios": "ios",
324        "cisco_xe": "iosxe",
325        "cisco_xr": "iosxr",
326        "cisco_nxos": "nxos",
327        "cisco_asa": "asa",
328    }
329
330    os = None
331    # platform might be _ssh, _telnet, _serial strip that off
332    if platform.count("_") > 1:
333        base_platform = platform.split("_")[:-1]
334        base_platform = "_".join(base_platform)
335    else:
336        base_platform = platform
337
338    os = genie_device_mapper.get(base_platform)
339    if os is None:
340        return raw_output
341
342    # Genie specific construct for doing parsing (based on Genie in Ansible)
343    device = Device("new_device", os=os)
344    device.custom.setdefault("abstraction", {})
345    device.custom["abstraction"]["order"] = ["os"]
346    device.cli = AttrDict({"execute": None})
347    try:
348        # Test of whether their is a parser for the given command (will return Exception if fails)
349        get_parser(command, device)
350        parsed_output = device.parse(command, output=raw_output)
351        return parsed_output
352    except Exception:
353        return raw_output
354
355
356def select_cmd_verify(func):
357    """Override function cmd_verify argument with global setting."""
358
359    @functools.wraps(func)
360    def wrapper_decorator(self, *args, **kwargs):
361        if self.global_cmd_verify is not None:
362            kwargs["cmd_verify"] = self.global_cmd_verify
363        return func(self, *args, **kwargs)
364
365    return wrapper_decorator
366
367
368def m_exec_time(func):
369    @functools.wraps(func)
370    def wrapper_decorator(self, *args, **kwargs):
371        start_time = datetime.now()
372        result = func(self, *args, **kwargs)
373        end_time = datetime.now()
374        method_name = str(func)
375        print(f"{method_name}: Elapsed time: {end_time - start_time}")
376        return result
377
378    return wrapper_decorator
379
380
381def f_exec_time(func):
382    @functools.wraps(func)
383    def wrapper_decorator(*args, **kwargs):
384        start_time = datetime.now()
385        result = func(*args, **kwargs)
386        end_time = datetime.now()
387        print(f"Elapsed time: {end_time - start_time}")
388        return result
389
390    return wrapper_decorator
391