1#!/usr/bin/env python
2#############################################################################
3# Copyright (c) 2020 One Identity
4#
5# This program is free software; you can redistribute it and/or modify it
6# under the terms of the GNU General Public License version 2 as published
7# by the Free Software Foundation, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17#
18# As an additional exemption you are allowed to compile & link against the
19# OpenSSL libraries as published by the OpenSSL project. See the file
20# COPYING for details.
21#
22#############################################################################
23import os
24import re
25
26import pytest
27from pathlib2 import Path
28from psutil import TimeoutExpired
29
30import src.testcase_parameters.testcase_parameters as tc_parameters
31from src.common.blocking import wait_until_true
32from src.common.file import File
33from src.executors.process_executor import ProcessExecutor
34
35
36class SNMPtrapd(object):
37    TRAP_LOG_PREFIX = 'LIGHT_TEST_SNMP_TRAP_RECEIVED:'
38
39    def __init__(self, port):
40        self.snmptrapd_proc = None
41        self.port = port
42
43        self.snmptrapd_log = Path(tc_parameters.WORKING_DIR, "snmptrapd_log")
44        self.snmptrapd_stdout_path = Path(tc_parameters.WORKING_DIR, "snmptrapd_stdout")
45        self.snmptrapd_stderr_path = Path(tc_parameters.WORKING_DIR, "snmptrapd_stderr")
46
47    def wait_for_snmptrapd_log_creation(self):
48        return self.snmptrapd_log.exists()
49
50    def wait_for_snmptrapd_startup(self):
51        return "NET-SNMP version" in self.snmptrapd_log.read_text()
52
53    def start(self):
54        if self.snmptrapd_proc is not None:
55            return
56
57        self.snmptrapd_proc = ProcessExecutor().start(
58            [
59                "snmptrapd", "-f",
60                "--disableAuthorization=yes",
61                "-C",
62                "-m ALL",
63                "-A",
64                "-Ddump",
65                "-On",
66                "--doNotLogTraps=no",
67                "--authCommunity=log public",
68                self.port,
69                "-d",
70                "-Lf", os.path.relpath(str(self.snmptrapd_log)),
71                "-F", "{}%v\n".format(self.TRAP_LOG_PREFIX),
72            ],
73            self.snmptrapd_stdout_path,
74            self.snmptrapd_stderr_path,
75        )
76        wait_until_true(self.wait_for_snmptrapd_log_creation)
77        wait_until_true(self.wait_for_snmptrapd_startup)
78        return self.snmptrapd_proc.is_running()
79
80    def stop(self):
81        if self.snmptrapd_proc is None:
82            return
83
84        self.snmptrapd_proc.terminate()
85        try:
86            self.snmptrapd_proc.wait(4)
87        except TimeoutExpired:
88            self.snmptrapd_proc.kill()
89
90        self.snmptrapd_proc = None
91
92    def get_port(self):
93        return self.port
94
95    def get_traps(self, counter):
96        trap_list = []
97
98        f = File(self.snmptrapd_log)
99        f.open("r")
100
101        while True:
102            trap_line = f.wait_for_lines([self.TRAP_LOG_PREFIX])[0]
103            res = re.match('({})(.*)'.format(self.TRAP_LOG_PREFIX), trap_line)
104            if (res):
105                trap_list.extend(res.group(2).rstrip().split("\t"))
106            if len(trap_list) == counter:
107                break
108
109        f.close()
110
111        return sorted(trap_list)
112
113    def get_log(self):
114        f = File(self.snmptrapd_log)
115        f.open("r")
116
117        log = f.read()
118
119        f.close()
120
121        return log
122
123
124@pytest.fixture
125def snmptrapd(port_allocator):
126    server = SNMPtrapd(port_allocator())
127    server.start()
128    yield server
129    server.stop()
130
131
132class SNMPTestParams(object):
133    def __init__(self):
134        pass
135
136    def get_ip_address(self):
137        return '"127.0.0.1"'
138
139    def get_default_community(self):
140        return 'public'
141
142    def get_basic_snmp_obj(self):
143        return '".1.3.6.1.4.1.18372.3.1.1.1.1.1.0", "Octetstring", "admin"'
144
145    def get_basic_trap_obj(self):
146        return '".1.3.6.1.6.3.1.1.4.1.0", "Objectid", ".1.3.6.1.4.1.18372.3.1.1.1.2.1"'
147
148    def get_cisco_trap_obj(self):
149        return '".1.3.6.1.6.3.1.1.4.1.0","Objectid",".1.3.6.1.4.1.9.9.41.2.0.1"'
150
151    def get_cisco_snmp_obj(self):
152        cisco_snmp_obj = (
153            '"1.3.6.1.4.1.9.9.41.1.2.3.1.2.55", "Octetstring", "SYS"',
154            '"1.3.6.1.4.1.9.9.41.1.2.3.1.3.55", "Integer", "6"',
155            '"1.3.6.1.4.1.9.9.41.1.2.3.1.4.55", "Octetstring", "CONFIG_I"',
156            '"1.3.6.1.4.1.9.9.41.1.2.3.1.5.55", "Octetstring", "Configured from console by vty1 (10.30.0.32)"',
157            '"1.3.6.1.4.1.9.9.41.1.2.3.1.6.55", "Timeticks", "97881"',
158        )
159        return cisco_snmp_obj
160
161    def get_expected_cisco_trap(self):
162        return sorted([
163            '.1.3.6.1.4.1.9.9.41.1.2.3.1.2.55 = STRING: "SYS"',
164            '.1.3.6.1.4.1.9.9.41.1.2.3.1.3.55 = INTEGER: 6',
165            '.1.3.6.1.4.1.9.9.41.1.2.3.1.4.55 = STRING: "CONFIG_I"',
166            '.1.3.6.1.4.1.9.9.41.1.2.3.1.5.55 = STRING: "Configured from console by vty1 (10.30.0.32)"',
167            '.1.3.6.1.4.1.9.9.41.1.2.3.1.6.55 = Timeticks: (97881) 0:16:18.81',
168            '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1',
169        ])
170
171    def get_expected_basic_trap(self):
172        return sorted([
173            '.1.3.6.1.4.1.18372.3.1.1.1.1.1.0 = STRING: "admin"',
174            '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1',
175        ])
176
177    def get_expected_empty_trap(self):
178        return [
179            '.1.3.6.1.6.3.1.1.4.1.0 = OID: .1.3.6.1.4.1.18372.3.1.1.1.2.1',
180        ]
181
182
183@pytest.fixture
184def snmp_test_params():
185    return SNMPTestParams()
186