1#!/usr/local/bin/python3.8
2#
3# Copyright 2015-2016 Ettus Research LLC
4# Copyright 2018 Ettus Research, a National Instruments Company
5# Copyright 2019 Ettus Research, a National Instruments Brand
6#
7# SPDX-License-Identifier: GPL-3.0-or-later
8#
9"""
10Devtest: Base module. Provides classes for running devtest tests.
11"""
12
13import os
14import sys
15import unittest
16import re
17import time
18import logging
19from subprocess import Popen, PIPE
20# For what we're doing here, ruamel.yaml and yaml are copatible, and we'll use
21# whatever we can find
22try:
23    from ruamel import yaml
24except:
25    import yaml
26from usrp_probe import get_usrp_list
27
28#--------------------------------------------------------------------------
29# Helpers
30#--------------------------------------------------------------------------
31def filter_warnings(errstr):
32    """
33    Searches errstr for UHD warnings, removes them, and puts them into a
34    separate string.
35    Returns (errstr, warnstr), where errstr no longer has warnings. """
36    warn_re = re.compile("UHD Warning:\n(?:    .*\n)+")
37    warnstr = "\n".join(warn_re.findall(errstr)).strip()
38    errstr = warn_re.sub('', errstr).strip()
39    return (errstr, warnstr)
40
41def filter_stderr(stderr, run_results=None):
42    """
43    Filters the output to stderr. run_results[] is a dictionary.
44    This function will:
45    - Remove warnings and put them in run_results['warnings']
46    - Put the filtered error string into run_results['errors'] and returns the dictionary
47    """
48    run_results = run_results or {}
49    errstr, run_results['warnings'] = filter_warnings(stderr)
50    # Scan for underruns and sequence errors / dropped packets  not detected in the counter
51    errstr = re.sub("\n\n+", "\n", errstr)
52    run_results['errors'] = errstr.strip()
53    return run_results
54
55#--------------------------------------------------------------------------
56# Application
57#--------------------------------------------------------------------------
58class shell_application(object):
59    """
60    Wrapper for applications that are in $PATH.
61    Note: The CMake infrastructure makes sure all examples and utils are in $PATH.
62    """
63    def __init__(self, name):
64        self.name = name
65        self.stdout = ''
66        self.stderr = ''
67        self.returncode = None
68        self.exec_time = None
69
70    def run(self, args=None):
71        """Test executor."""
72        args = args or []
73        cmd_line = [self.name]
74        cmd_line.extend(args)
75        start_time = time.time()
76        env = os.environ
77        env["UHD_LOG_FASTPATH_DISABLE"] = "1"
78        try:
79            proc = Popen(
80                cmd_line,
81                stdout=PIPE,
82                stderr=PIPE,
83                close_fds=True,
84                env=env,
85                universal_newlines=True
86            )
87            self.stdout, self.stderr = proc.communicate()
88            self.returncode = proc.returncode
89            self.exec_time = time.time() - start_time
90        except OSError as ex:
91            raise RuntimeError("Failed to execute command: `{}'\n{}"
92                               .format(cmd_line, str(ex)))
93
94
95#--------------------------------------------------------------------------
96# Test case base
97#--------------------------------------------------------------------------
98class uhd_test_case(unittest.TestCase):
99    """
100    Base class for UHD test cases.
101    """
102    test_name = '--TEST--'
103
104    def set_up(self):
105        """
106        Override this to add own setup code per test.
107        """
108        pass
109
110    def setUp(self):
111        self.name = self.__class__.__name__
112        self.test_id = self.id().split('.')[-1]
113        self.results = {}
114        self.results_file = os.getenv('_UHD_TEST_RESULTSFILE', "")
115        if self.results_file and os.path.isfile(self.results_file):
116            with open(self.results_file) as res_file:
117                self.results = yaml.safe_load(res_file.read()) or {}
118        self.args_str = os.getenv('_UHD_TEST_ARGS_STR', "")
119        self.usrp_info = get_usrp_list(self.args_str)[0]
120        if self.usrp_info['serial'] not in self.results:
121            self.results[self.usrp_info['serial']] = {}
122        if self.name not in self.results[self.usrp_info['serial']]:
123            self.results[self.usrp_info['serial']][self.name] = {}
124        self.setup_logger()
125        self.set_up()
126
127    def setup_logger(self):
128        " Add logging infrastructure "
129        self.log = logging.getLogger("devtest.{name}".format(name=self.name))
130        self.log_file = os.getenv('_UHD_TEST_LOGFILE', "devtest.log")
131        #self.log_level = int(os.getenv('_UHD_TEST_LOG_LEVEL', logging.DEBUG))
132        #self.print_level = int(os.getenv('_UHD_TEST_PRINT_LEVEL', logging.WARNING))
133        self.log_level = logging.DEBUG
134        self.print_level = logging.WARNING
135        file_handler = logging.FileHandler(self.log_file)
136        file_handler.setLevel(self.log_level)
137        console_handler = logging.StreamHandler()
138        console_handler.setLevel(self.print_level)
139        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
140        file_handler.setFormatter(formatter)
141        console_handler.setFormatter(formatter)
142        self.log.setLevel(logging.DEBUG)
143        self.log.addHandler(file_handler)
144        self.log.addHandler(console_handler)
145        self.log.info("Starting test with device: %s", str(self.args_str))
146
147    def tear_down(self):
148        """Nothing to do."""
149        pass
150
151    def tearDown(self):
152        self.tear_down()
153        if self.results_file:
154            with open(self.results_file, 'w') as res_file:
155                res_file.write(yaml.dump(self.results, default_flow_style=False))
156        time.sleep(15)
157
158    def report_result(self, testname, key, value):
159        """ Store a result as a key/value pair.
160        After completion, all results for one test are written to the results file.
161        """
162        if not testname in self.results[self.usrp_info['serial']][self.name]:
163            self.results[self.usrp_info['serial']][self.name][testname] = {}
164        self.results[self.usrp_info['serial']][self.name][testname][key] = value
165
166    def create_addr_args_str(self, argname="args"):
167        """ Returns an args string, usually '--args "type=XXX,serial=YYY" """
168        if not self.args_str:
169            return ''
170        return '--{}={}'.format(argname, self.args_str)
171
172class uhd_example_test_case(uhd_test_case):
173    """
174    A test case that runs an example.
175    """
176
177    def setup_example(self):
178        """
179        Override this to add specific setup code.
180        """
181        pass
182
183    def set_up(self):
184        """Called by the unit testing framework on tests. """
185        self.setup_example()
186
187    def run_test(self, test_name, test_args):
188        """
189        Override this to run the actual example.
190
191        Needs to return either a boolean or a dict with key 'passed' to determine
192        pass/fail.
193        """
194        raise NotImplementedError
195
196    def run_example(self, example, args):
197        """
198        Run `example' (which has to be a UHD example or utility) with `args'.
199        Return results and the app object.
200
201        Note: UHD_LOG_FASTPATH_DISABLE will be set to 1.
202        """
203        self.log.info("Running example: `%s %s'", example, " ".join(args))
204        app = shell_application(example)
205        app.run(args)
206        run_results = {
207            'return_code': app.returncode,
208            'passed': False,
209        }
210        run_results = filter_stderr(app.stderr, run_results)
211        self.log.info('STDERR Output:')
212        self.log.info(str(app.stderr))
213        return (app, run_results)
214
215
216    def report_example_results(self, test_name, run_results):
217        """
218        Helper function for report_result() when running examples.
219        """
220        for key in sorted(run_results):
221            self.log.info('%s = %s', str(key), str(run_results[key]))
222            self.report_result(
223                test_name,
224                key, run_results[key]
225            )
226        if 'passed' in run_results:
227            self.report_result(
228                test_name,
229                'status',
230                'Passed' if run_results['passed'] else 'Failed',
231            )
232        if 'errors' in run_results:
233            self.report_result(
234                test_name,
235                'errors',
236                'Yes' if run_results['errors'] else 'No',
237            )
238
239    def test_all(self):
240        """
241        Hook for test runner. Needs to be a class method that starts with 'test'.
242        Calls run_test().
243        """
244        test_params = getattr(self, 'test_params', {})
245        for test_name, test_args in test_params.items():
246            time.sleep(15) # Wait for X300 devices to reclaim them
247            if not 'products' in test_args \
248                    or (self.usrp_info['product'] in test_args.get('products', [])):
249                run_results = self.run_test(test_name, test_args)
250                passed = bool(run_results)
251                if isinstance(run_results, dict):
252                    passed = run_results['passed']
253                errors = run_results.pop("errors", None)
254                if not passed:
255                    print("Error log:", file=sys.stderr)
256                    print(errors)
257                self.assertTrue(
258                    passed,
259                    msg="Errors occurred during test `{t}'. "
260                        "Check log file for details.\n"
261                        "Run results:\n{r}".format(
262                            t=test_name,
263                            r=yaml.dump(run_results, default_flow_style=False)
264                        )
265                )
266