1# Copyright 2015 NetEase Corp. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import logging 16import uuid 17 18import testscenarios 19 20import oslo_messaging 21from oslo_messaging.tests.functional import utils 22 23load_tests = testscenarios.load_tests_apply_scenarios 24 25 26class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): 27 """Test case for `oslo_messaging.LoggingNotificationHandler` 28 29 Build up a logger using this handler, then test logging under messaging and 30 messagingv2 driver. Make sure receive expected logging notifications. 31 """ 32 33 _priority = [ 34 ('debug', dict(priority='debug')), 35 ('info', dict(priority='info')), 36 ('warn', dict(priority='warn')), 37 ('error', dict(priority='error')), 38 ('critical', dict(priority='critical')), 39 ] 40 41 _driver = [ 42 ('messaging', dict(driver='messaging')), 43 ('messagingv2', dict(driver='messagingv2')), 44 ] 45 46 @classmethod 47 def generate_scenarios(cls): 48 cls.scenarios = testscenarios.multiply_scenarios(cls._priority, 49 cls._driver) 50 51 def test_logging(self): 52 # NOTE(gtt): Using different topic to make tests run in parallel 53 topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver) 54 55 if self.notify_url.startswith("kafka://"): 56 self.conf.set_override('consumer_group', str(uuid.uuid4()), 57 group='oslo_messaging_kafka') 58 59 self.config(driver=[self.driver], 60 topics=[topic], 61 group='oslo_messaging_notifications') 62 63 listener = self.useFixture( 64 utils.NotificationFixture(self.conf, self.notify_url, [topic])) 65 66 log_notify = oslo_messaging.LoggingNotificationHandler(self.notify_url) 67 68 log = logging.getLogger(topic) 69 log.setLevel(logging.DEBUG) 70 log.addHandler(log_notify) 71 72 log_method = getattr(log, self.priority) 73 log_method('Test logging at priority: %s' % self.priority) 74 75 events = listener.get_events(timeout=15) 76 self.assertEqual(1, len(events)) 77 78 info_event = events[0] 79 80 self.assertEqual(self.priority, info_event[0]) 81 self.assertEqual('logrecord', info_event[1]) 82 83 for key in ['name', 'thread', 'extra', 'process', 'funcName', 84 'levelno', 'processName', 'pathname', 'lineno', 85 'msg', 'exc_info', 'levelname']: 86 self.assertIn(key, info_event[2]) 87 88 89LoggingNotificationHandlerTestCase.generate_scenarios() 90