1# -*- coding: utf-8 -*-
2'''
3Test fixtures for the napalm-logs profiles.
4'''
5from __future__ import absolute_import
6
7# Import python std lib
8import os
9import json
10import time
11import socket
12import logging
13from multiprocessing import Process
14
15# Import third party lib
16import zmq
17import pytest
18
19# Import napalm-logs pkgs
20import napalm_logs.config
21from napalm_logs.base import NapalmLogs
22
23log = logging.getLogger(__name__)
24
25NL_BASE = None
26NL_PROC = None
27TEST_SKT = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
28TEST_CLIENT = None
29
30NAPALM_LOGS_TEST_LOG_LEVEL = os.getenv('NAPALM_LOGS_TEST_LOG_LEVEL', default='warning')
31NAPALM_LOGS_TEST_ADDR = os.getenv('NAPALM_LOGS_TEST_ADDR', default='0.0.0.0')
32NAPALM_LOGS_TEST_PORT = os.getenv('NAPALM_LOGS_TEST_PORT', default=17191)
33NAPALM_LOGS_TEST_PUB_ADDR = os.getenv('NAPALM_LOGS_TEST_PUB_ADDR', default='0.0.0.0')
34NAPALM_LOGS_TEST_PUB_PORT = os.getenv('NAPALM_LOGS_TEST_PUB_PORT', default=17193)
35
36logging_level = napalm_logs.config.LOGGING_LEVEL.get(NAPALM_LOGS_TEST_LOG_LEVEL.lower())
37logging.basicConfig(level=logging_level, format=napalm_logs.config.LOG_FORMAT)
38
39
40def startup_proc():
41    '''
42    Startup the napalm-logs process.
43    '''
44    global NL_BASE
45    global NL_PROC
46    log.debug('Starting up the napalm-logs process')
47    NL_BASE = NapalmLogs(disable_security=True,
48                         address=NAPALM_LOGS_TEST_ADDR,
49                         port=NAPALM_LOGS_TEST_PORT,
50                         publisher=[{'zmq': {'send_unknown': True}}],
51                         listener=[{'udp': {}}],
52                         publish_address=NAPALM_LOGS_TEST_PUB_ADDR,
53                         publish_port=NAPALM_LOGS_TEST_PUB_PORT,
54                         log_level=NAPALM_LOGS_TEST_LOG_LEVEL)
55    NL_PROC = Process(target=NL_BASE.start_engine)
56    NL_PROC.start()
57
58
59# Startup the napalm-logs process
60startup_proc()
61
62
63def startup_local_client():
64    '''
65    Startup a local ZMQ client to receive the published messages.
66    '''
67    time.sleep(2)
68    global TEST_CLIENT
69    context = zmq.Context()
70    TEST_CLIENT = context.socket(zmq.SUB)
71    TEST_CLIENT.connect('tcp://{addr}:{port}'.format(
72        addr=NAPALM_LOGS_TEST_PUB_ADDR,
73        port=NAPALM_LOGS_TEST_PUB_PORT)
74    )
75    TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'')
76
77
78# Startup the local ZMQ client.
79startup_local_client()
80
81
82def generate_tests():
83    '''
84    Generate the list of tests.
85    '''
86    expected_os_errors = {}
87    for os_name, os_cfg in NL_BASE.config_dict.items():
88        expected_os_errors[os_name] = []
89        for message in os_cfg['messages']:
90            expected_os_errors[os_name].append(message['error'])
91    test_cases = []
92    cwd = os.path.dirname(__file__)
93    test_path = os.path.join(cwd, 'config')
94    os_dir_list = [name for name in os.listdir(test_path) if os.path.isdir(os.path.join(test_path, name))]
95    expected_oss = set(expected_os_errors.keys())
96    tested_oss = set(os_dir_list)
97    missing_oss = expected_oss - tested_oss
98    for missing_os in missing_oss:
99        test_cases.append(('__missing__{}'.format(missing_os), '', ''))
100    for os_name in os_dir_list:
101        # Subdir is the OS name
102        os_path = os.path.join(test_path, os_name)
103        errors = [name for name in os.listdir(os_path) if os.path.isdir(os.path.join(os_path, name))]
104        expected_errors = set(expected_os_errors[os_name])
105        defined_errors = set(errors)
106        missing_errors = expected_errors - defined_errors
107        for mising_err in missing_errors:
108            test_cases.append((os_name, '__missing__{}'.format(mising_err), ''))
109        for error_name in errors:
110            error_path = os.path.join(os_path, error_name)
111            cases = [name for name in os.listdir(error_path) if os.path.isdir(os.path.join(error_path, name))]
112            if not cases:
113                test_cases.append((os_name, error_name, '__missing__'))
114            for test_case in cases:
115                test_cases.append((os_name, error_name, test_case))
116    return test_cases
117
118
119# Determine the test cases.
120tests = generate_tests()
121
122
123@pytest.mark.parametrize("os_name,error_name,test_case", tests)
124def test_config(os_name, error_name, test_case):
125    assert not os_name.startswith('__missing__'), 'No tests defined for {}'.format(os_name.replace('__missing__', ''))
126    assert not error_name.startswith('__missing__'),\
127        'No tests defined for {}, under {}'.format(error_name.replace('__missing__', ''), os_name)
128    assert test_case != '__missing__', 'No test cases defined for {}, under {}'.format(error_name, os_name)
129    print('Testing {} for {}, under the test case "{}"'.format(
130          error_name, os_name, test_case))
131    cwd = os.path.dirname(__file__)
132    test_path = os.path.join(cwd, 'config', os_name, error_name, test_case)
133    raw_message_filepath = os.path.join(test_path, 'syslog.msg')
134    log.debug('Looking for %s', raw_message_filepath)
135    assert os.path.isfile(raw_message_filepath)
136    with open(raw_message_filepath, 'r') as raw_message_fh:
137        raw_message = raw_message_fh.read()
138    log.debug('Read raw message:')
139    log.debug(raw_message)
140    yang_message_filepath = os.path.join(test_path, 'yang.json')
141    log.debug('Looking for %s', yang_message_filepath)
142    try:
143        with open(yang_message_filepath, 'r') as yang_message_fh:
144            yang_message = yang_message_fh.read()
145    except IOError:
146        yang_message = ''
147    log.debug('Read YANG text:')
148    log.debug(yang_message)
149    if yang_message:
150        struct_yang_message = json.loads(yang_message)
151    else:
152        struct_yang_message = {}
153    log.debug('Struct YANG message:')
154    log.debug(struct_yang_message)
155    log.debug('Sending the raw message to the napalm-logs daemon')
156    TEST_SKT.sendto(raw_message.strip().encode('utf-8'), (NAPALM_LOGS_TEST_ADDR, NAPALM_LOGS_TEST_PORT))
157    zmq_msg = TEST_CLIENT.recv()
158    deserialised_zmq_msg = napalm_logs.utils.unserialize(zmq_msg)
159    log.debug('Received from the napalm-logs daemon:')
160    log.debug(deserialised_zmq_msg)
161    returned_yang = json.loads(json.dumps(deserialised_zmq_msg))
162    if not struct_yang_message:
163        # First run, the expected document is empty still empty, so we can
164        # provide the document napalm-logs expects (returns based on the raw
165        # syslog message)
166        assert False, json.dumps(deserialised_zmq_msg, indent=2)
167    # Pop the timestamp from both as most syslog messages do not specify year
168    # which means that once a year we will have to update all tests if we
169    # check the timestamp.
170    # We still expect both to contain a timestamp though.
171    assert struct_yang_message.pop('timestamp', False),\
172        'Yang test file does not contain a timestamp key for {} under {}'.format(error_name, os_name)
173    assert returned_yang.pop('timestamp', False),\
174        'The returned yang does not contain a timestamp key for {} under {}'.format(error_name, os_name)
175    assert struct_yang_message == returned_yang
176
177
178def test_napalm_logs_shut():
179    '''
180    Shutdown the napalm-logs engine.
181    '''
182    NL_BASE.stop_engine()
183    assert NL_PROC.is_alive()
184    NL_PROC.terminate()
185    NL_PROC.join()
186