1""" 2Test of `pika.diagnostic_utils` 3""" 4 5import unittest 6 7import logging 8 9import pika.compat 10from pika import diagnostic_utils 11 12 13# Suppress invalid-name, since our test names are descriptive and quite long 14# pylint: disable=C0103 15 16# Suppress missing-docstring to allow test method names to be printed by our 17# test runner 18# pylint: disable=C0111 19 20 21class DiagnosticUtilsTest(unittest.TestCase): 22 23 def test_args_and_return_value_propagation(self): 24 25 bucket = [] 26 27 log_exception = diagnostic_utils.create_log_exception_decorator( 28 logging.getLogger(__name__)) 29 30 return_value = (1, 2, 3) 31 32 @log_exception 33 def my_func(*args, **kwargs): 34 bucket.append((args, kwargs)) 35 return return_value 36 37 # Test with args and kwargs 38 expected_args = ('a', 2, 'B', Exception('oh-oh')) 39 expected_kwargs = dict(hello='world', bye='hello', error=RuntimeError()) 40 41 result = my_func(*expected_args, **expected_kwargs) 42 43 self.assertIs(result, return_value) 44 self.assertEqual(bucket, [(expected_args, expected_kwargs)]) 45 46 # Make sure that the original instances were passed through, not copies 47 for i in pika.compat.xrange(len(expected_args)): 48 self.assertIs(bucket[0][0][i], expected_args[i]) 49 50 for key in pika.compat.dictkeys(expected_kwargs): 51 self.assertIs(bucket[0][1][key], expected_kwargs[key]) 52 53 # Now, repeat without any args/kwargs 54 expected_args = tuple() 55 expected_kwargs = dict() 56 del bucket[:] # .clear() doesn't exist in python 2.7 57 58 result = my_func() 59 60 self.assertIs(result, return_value) 61 self.assertEqual(bucket, [(expected_args, expected_kwargs)]) 62 63 def test_exception_propagation(self): 64 logger = logging.getLogger(__name__) 65 log_exception = diagnostic_utils.create_log_exception_decorator(logger) 66 67 # Suppress log output and capture LogRecord 68 log_record_bucket = [] 69 logger.handle = log_record_bucket.append 70 71 exception = Exception('Oops!') 72 73 @log_exception 74 def my_func_that_raises(): 75 raise exception 76 77 with self.assertRaises(Exception) as ctx: 78 my_func_that_raises() 79 80 # Make sure the expected exception was raised 81 self.assertIs(ctx.exception, exception) 82 83 # Check log message 84 self.assertEqual(len(log_record_bucket), 1) 85 log_record = log_record_bucket[0] # type: logging.LogRecord 86 print(log_record.getMessage()) 87 expected_ending = 'Exception: Oops!\n' 88 self.assertEqual(log_record.getMessage()[-len(expected_ending):], 89 expected_ending) 90