1# -*- coding: utf-8 -*- 2''' 3Test fixtures for the napalm-logs profiles. 4''' 5from __future__ import absolute_import 6 7# Import python std lib 8import os 9import json 10import time 11import socket 12import logging 13from multiprocessing import Process 14 15# Import third party lib 16import zmq 17import pytest 18 19# Import napalm-logs pkgs 20import napalm_logs.config 21from napalm_logs.base import NapalmLogs 22 23log = logging.getLogger(__name__) 24 25NL_BASE = None 26NL_PROC = None 27TEST_SKT = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 28TEST_CLIENT = None 29 30NAPALM_LOGS_TEST_LOG_LEVEL = os.getenv('NAPALM_LOGS_TEST_LOG_LEVEL', default='warning') 31NAPALM_LOGS_TEST_ADDR = os.getenv('NAPALM_LOGS_TEST_ADDR', default='0.0.0.0') 32NAPALM_LOGS_TEST_PORT = os.getenv('NAPALM_LOGS_TEST_PORT', default=17191) 33NAPALM_LOGS_TEST_PUB_ADDR = os.getenv('NAPALM_LOGS_TEST_PUB_ADDR', default='0.0.0.0') 34NAPALM_LOGS_TEST_PUB_PORT = os.getenv('NAPALM_LOGS_TEST_PUB_PORT', default=17193) 35 36logging_level = napalm_logs.config.LOGGING_LEVEL.get(NAPALM_LOGS_TEST_LOG_LEVEL.lower()) 37logging.basicConfig(level=logging_level, format=napalm_logs.config.LOG_FORMAT) 38 39 40def startup_proc(): 41 ''' 42 Startup the napalm-logs process. 43 ''' 44 global NL_BASE 45 global NL_PROC 46 log.debug('Starting up the napalm-logs process') 47 NL_BASE = NapalmLogs(disable_security=True, 48 address=NAPALM_LOGS_TEST_ADDR, 49 port=NAPALM_LOGS_TEST_PORT, 50 publisher=[{'zmq': {'send_unknown': True}}], 51 listener=[{'udp': {}}], 52 publish_address=NAPALM_LOGS_TEST_PUB_ADDR, 53 publish_port=NAPALM_LOGS_TEST_PUB_PORT, 54 log_level=NAPALM_LOGS_TEST_LOG_LEVEL) 55 NL_PROC = Process(target=NL_BASE.start_engine) 56 NL_PROC.start() 57 58 59# Startup the napalm-logs process 60startup_proc() 61 62 63def startup_local_client(): 64 ''' 65 Startup a local ZMQ client to receive the published messages. 66 ''' 67 time.sleep(2) 68 global TEST_CLIENT 69 context = zmq.Context() 70 TEST_CLIENT = context.socket(zmq.SUB) 71 TEST_CLIENT.connect('tcp://{addr}:{port}'.format( 72 addr=NAPALM_LOGS_TEST_PUB_ADDR, 73 port=NAPALM_LOGS_TEST_PUB_PORT) 74 ) 75 TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'') 76 77 78# Startup the local ZMQ client. 79startup_local_client() 80 81 82def generate_tests(): 83 ''' 84 Generate the list of tests. 85 ''' 86 expected_os_errors = {} 87 for os_name, os_cfg in NL_BASE.config_dict.items(): 88 expected_os_errors[os_name] = [] 89 for message in os_cfg['messages']: 90 expected_os_errors[os_name].append(message['error']) 91 test_cases = [] 92 cwd = os.path.dirname(__file__) 93 test_path = os.path.join(cwd, 'config') 94 os_dir_list = [name for name in os.listdir(test_path) if os.path.isdir(os.path.join(test_path, name))] 95 expected_oss = set(expected_os_errors.keys()) 96 tested_oss = set(os_dir_list) 97 missing_oss = expected_oss - tested_oss 98 for missing_os in missing_oss: 99 test_cases.append(('__missing__{}'.format(missing_os), '', '')) 100 for os_name in os_dir_list: 101 # Subdir is the OS name 102 os_path = os.path.join(test_path, os_name) 103 errors = [name for name in os.listdir(os_path) if os.path.isdir(os.path.join(os_path, name))] 104 expected_errors = set(expected_os_errors[os_name]) 105 defined_errors = set(errors) 106 missing_errors = expected_errors - defined_errors 107 for mising_err in missing_errors: 108 test_cases.append((os_name, '__missing__{}'.format(mising_err), '')) 109 for error_name in errors: 110 error_path = os.path.join(os_path, error_name) 111 cases = [name for name in os.listdir(error_path) if os.path.isdir(os.path.join(error_path, name))] 112 if not cases: 113 test_cases.append((os_name, error_name, '__missing__')) 114 for test_case in cases: 115 test_cases.append((os_name, error_name, test_case)) 116 return test_cases 117 118 119# Determine the test cases. 120tests = generate_tests() 121 122 123@pytest.mark.parametrize("os_name,error_name,test_case", tests) 124def test_config(os_name, error_name, test_case): 125 assert not os_name.startswith('__missing__'), 'No tests defined for {}'.format(os_name.replace('__missing__', '')) 126 assert not error_name.startswith('__missing__'),\ 127 'No tests defined for {}, under {}'.format(error_name.replace('__missing__', ''), os_name) 128 assert test_case != '__missing__', 'No test cases defined for {}, under {}'.format(error_name, os_name) 129 print('Testing {} for {}, under the test case "{}"'.format( 130 error_name, os_name, test_case)) 131 cwd = os.path.dirname(__file__) 132 test_path = os.path.join(cwd, 'config', os_name, error_name, test_case) 133 raw_message_filepath = os.path.join(test_path, 'syslog.msg') 134 log.debug('Looking for %s', raw_message_filepath) 135 assert os.path.isfile(raw_message_filepath) 136 with open(raw_message_filepath, 'r') as raw_message_fh: 137 raw_message = raw_message_fh.read() 138 log.debug('Read raw message:') 139 log.debug(raw_message) 140 yang_message_filepath = os.path.join(test_path, 'yang.json') 141 log.debug('Looking for %s', yang_message_filepath) 142 try: 143 with open(yang_message_filepath, 'r') as yang_message_fh: 144 yang_message = yang_message_fh.read() 145 except IOError: 146 yang_message = '' 147 log.debug('Read YANG text:') 148 log.debug(yang_message) 149 if yang_message: 150 struct_yang_message = json.loads(yang_message) 151 else: 152 struct_yang_message = {} 153 log.debug('Struct YANG message:') 154 log.debug(struct_yang_message) 155 log.debug('Sending the raw message to the napalm-logs daemon') 156 TEST_SKT.sendto(raw_message.strip().encode('utf-8'), (NAPALM_LOGS_TEST_ADDR, NAPALM_LOGS_TEST_PORT)) 157 zmq_msg = TEST_CLIENT.recv() 158 deserialised_zmq_msg = napalm_logs.utils.unserialize(zmq_msg) 159 log.debug('Received from the napalm-logs daemon:') 160 log.debug(deserialised_zmq_msg) 161 returned_yang = json.loads(json.dumps(deserialised_zmq_msg)) 162 if not struct_yang_message: 163 # First run, the expected document is empty still empty, so we can 164 # provide the document napalm-logs expects (returns based on the raw 165 # syslog message) 166 assert False, json.dumps(deserialised_zmq_msg, indent=2) 167 # Pop the timestamp from both as most syslog messages do not specify year 168 # which means that once a year we will have to update all tests if we 169 # check the timestamp. 170 # We still expect both to contain a timestamp though. 171 assert struct_yang_message.pop('timestamp', False),\ 172 'Yang test file does not contain a timestamp key for {} under {}'.format(error_name, os_name) 173 assert returned_yang.pop('timestamp', False),\ 174 'The returned yang does not contain a timestamp key for {} under {}'.format(error_name, os_name) 175 assert struct_yang_message == returned_yang 176 177 178def test_napalm_logs_shut(): 179 ''' 180 Shutdown the napalm-logs engine. 181 ''' 182 NL_BASE.stop_engine() 183 assert NL_PROC.is_alive() 184 NL_PROC.terminate() 185 NL_PROC.join() 186