1#!/usr/bin/env python 2############################################################################# 3# Copyright (c) 2020 One Identity 4# 5# This program is free software; you can redistribute it and/or modify it 6# under the terms of the GNU General Public License version 2 as published 7# by the Free Software Foundation, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 17# 18# As an additional exemption you are allowed to compile & link against the 19# OpenSSL libraries as published by the OpenSSL project. See the file 20# COPYING for details. 21# 22############################################################################# 23import os 24import re 25 26import pytest 27from pathlib2 import Path 28from psutil import TimeoutExpired 29 30import src.testcase_parameters.testcase_parameters as tc_parameters 31from src.common.blocking import wait_until_true 32from src.common.file import File 33from src.executors.process_executor import ProcessExecutor 34 35 36class SNMPtrapd(object): 37 TRAP_LOG_PREFIX = 'LIGHT_TEST_SNMP_TRAP_RECEIVED:' 38 39 def __init__(self, port): 40 self.snmptrapd_proc = None 41 self.port = port 42 43 self.snmptrapd_log = Path(tc_parameters.WORKING_DIR, "snmptrapd_log") 44 self.snmptrapd_stdout_path = Path(tc_parameters.WORKING_DIR, "snmptrapd_stdout") 45 self.snmptrapd_stderr_path = Path(tc_parameters.WORKING_DIR, "snmptrapd_stderr") 46 47 def wait_for_snmptrapd_log_creation(self): 48 return self.snmptrapd_log.exists() 49 50 def wait_for_snmptrapd_startup(self): 51 return "NET-SNMP version" in self.snmptrapd_log.read_text() 52 53 def start(self): 54 if self.snmptrapd_proc is not None: 55 return 56 57 self.snmptrapd_proc = ProcessExecutor().start( 58 [ 59 "snmptrapd", "-f", 60 "--disableAuthorization=yes", 61 "-C", 62 "-m ALL", 63 "-A", 64 "-Ddump", 65 "-On", 66 "--doNotLogTraps=no", 67 "--authCommunity=log public", 68 self.port, 69 "-d", 70 "-Lf", os.path.relpath(str(self.snmptrapd_log)), 71 "-F", "{}%v\n".format(self.TRAP_LOG_PREFIX), 72 ], 73 self.snmptrapd_stdout_path, 74 self.snmptrapd_stderr_path, 75 ) 76 wait_until_true(self.wait_for_snmptrapd_log_creation) 77 wait_until_true(self.wait_for_snmptrapd_startup) 78 return self.snmptrapd_proc.is_running() 79 80 def stop(self): 81 if self.snmptrapd_proc is None: 82 return 83 84 self.snmptrapd_proc.terminate() 85 try: 86 self.snmptrapd_proc.wait(4) 87 except TimeoutExpired: 88 self.snmptrapd_proc.kill() 89 90 self.snmptrapd_proc = None 91 92 def get_port(self): 93 return self.port 94 95 def get_traps(self, counter): 96 trap_list = [] 97 98 f = File(self.snmptrapd_log) 99 f.open("r") 100 101 while True: 102 trap_line = f.wait_for_lines([self.TRAP_LOG_PREFIX])[0] 103 res = re.match('({})(.*)'.format(self.TRAP_LOG_PREFIX), trap_line) 104 if (res): 105 trap_list.extend(res.group(2).rstrip().split("\t")) 106 if len(trap_list) == counter: 107 break 108 109 f.close() 110 111 return sorted(trap_list) 112 113 def get_log(self): 114 f = File(self.snmptrapd_log) 115 f.open("r") 116 117 log = f.read() 118 119 f.close() 120 121 return log 122 123 124@pytest.fixture 125def snmptrapd(port_allocator): 126 server = SNMPtrapd(port_allocator()) 127 server.start() 128 yield server 129 server.stop() 130 131 132class SNMPTestParams(object): 133 def __init__(self): 134 pass 135 136 def get_ip_address(self): 137 return '"127.0.0.1"' 138 139 def get_default_community(self): 140 return 'public' 141 142 def get_basic_snmp_obj(self): 143 return '".1.3.6.1.4.1.18372.3.1.1.1.1.1.0", "Octetstring", "admin"' 144 145 def get_basic_trap_obj(self): 146 return '".1.3.6.1.6.3.1.1.4.1.0", "Objectid", ".1.3.6.1.4.1.18372.3.1.1.1.2.1"' 147 148 def get_cisco_trap_obj(self): 149 return '".1.3.6.1.6.3.1.1.4.1.0","Objectid",".1.3.6.1.4.1.9.9.41.2.0.1"' 150 151 def get_cisco_snmp_obj(self): 152 cisco_snmp_obj = ( 153 '"1.3.6.1.4.1.9.9.41.1.2.3.1.2.55", "Octetstring", "SYS"', 154 '"1.3.6.1.4.1.9.9.41.1.2.3.1.3.55", "Integer", "6"', 155 '"1.3.6.1.4.1.9.9.41.1.2.3.1.4.55", "Octetstring", "CONFIG_I"', 156 '"1.3.6.1.4.1.9.9.41.1.2.3.1.5.55", "Octetstring", "Configured from console by vty1 (10.30.0.32)"', 157 '"1.3.6.1.4.1.9.9.41.1.2.3.1.6.55", "Timeticks", "97881"', 158 ) 159 return cisco_snmp_obj 160 161 def get_expected_cisco_trap(self): 162 return sorted([ 163 '.1.3.6.1.4.1.9.9.41.1.2.3.1.2.55 = STRING: "SYS"', 164 '.1.3.6.1.4.1.9.9.41.1.2.3.1.3.55 = INTEGER: 6', 165 '.1.3.6.1.4.1.9.9.41.1.2.3.1.4.55 = STRING: "CONFIG_I"', 166 '.1.3.6.1.4.1.9.9.41.1.2.3.1.5.55 = STRING: "Configured from console by vty1 (10.30.0.32)"', 167 '.1.3.6.1.4.1.9.9.41.1.2.3.1.6.55 = Timeticks: (97881) 0:16:18.81', 168 '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1', 169 ]) 170 171 def get_expected_basic_trap(self): 172 return sorted([ 173 '.1.3.6.1.4.1.18372.3.1.1.1.1.1.0 = STRING: "admin"', 174 '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1', 175 ]) 176 177 def get_expected_empty_trap(self): 178 return [ 179 '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1', 180 ] 181 182 183@pytest.fixture 184def snmp_test_params(): 185 return SNMPTestParams() 186