1"""
2Test of `pika.diagnostic_utils`
3"""
4
5import unittest
6
7import logging
8
9import pika.compat
10from pika import diagnostic_utils
11
12
13# Suppress invalid-name, since our test names are descriptive and quite long
14# pylint: disable=C0103
15
16# Suppress missing-docstring to allow test method names to be printed by our
17# test runner
18# pylint: disable=C0111
19
20
21class DiagnosticUtilsTest(unittest.TestCase):
22
23    def test_args_and_return_value_propagation(self):
24
25        bucket = []
26
27        log_exception = diagnostic_utils.create_log_exception_decorator(
28            logging.getLogger(__name__))
29
30        return_value = (1, 2, 3)
31
32        @log_exception
33        def my_func(*args, **kwargs):
34            bucket.append((args, kwargs))
35            return return_value
36
37        # Test with args and kwargs
38        expected_args = ('a', 2, 'B', Exception('oh-oh'))
39        expected_kwargs = dict(hello='world', bye='hello', error=RuntimeError())
40
41        result = my_func(*expected_args, **expected_kwargs)
42
43        self.assertIs(result, return_value)
44        self.assertEqual(bucket, [(expected_args, expected_kwargs)])
45
46        # Make sure that the original instances were passed through, not copies
47        for i in pika.compat.xrange(len(expected_args)):
48            self.assertIs(bucket[0][0][i], expected_args[i])
49
50        for key in pika.compat.dictkeys(expected_kwargs):
51            self.assertIs(bucket[0][1][key], expected_kwargs[key])
52
53        # Now, repeat without any args/kwargs
54        expected_args = tuple()
55        expected_kwargs = dict()
56        del bucket[:]  # .clear() doesn't exist in python 2.7
57
58        result = my_func()
59
60        self.assertIs(result, return_value)
61        self.assertEqual(bucket, [(expected_args, expected_kwargs)])
62
63    def test_exception_propagation(self):
64        logger = logging.getLogger(__name__)
65        log_exception = diagnostic_utils.create_log_exception_decorator(logger)
66
67        # Suppress log output and capture LogRecord
68        log_record_bucket = []
69        logger.handle = log_record_bucket.append
70
71        exception = Exception('Oops!')
72
73        @log_exception
74        def my_func_that_raises():
75            raise exception
76
77        with self.assertRaises(Exception) as ctx:
78            my_func_that_raises()
79
80        # Make sure the expected exception was raised
81        self.assertIs(ctx.exception, exception)
82
83        # Check log message
84        self.assertEqual(len(log_record_bucket), 1)
85        log_record = log_record_bucket[0]  # type: logging.LogRecord
86        print(log_record.getMessage())
87        expected_ending = 'Exception: Oops!\n'
88        self.assertEqual(log_record.getMessage()[-len(expected_ending):],
89                         expected_ending)
90