1# Copyright 2015 NetEase Corp.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15import logging
16import uuid
17
18import testscenarios
19
20import oslo_messaging
21from oslo_messaging.tests.functional import utils
22
23load_tests = testscenarios.load_tests_apply_scenarios
24
25
26class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
27    """Test case for `oslo_messaging.LoggingNotificationHandler`
28
29    Build up a logger using this handler, then test logging under messaging and
30    messagingv2 driver. Make sure receive expected logging notifications.
31    """
32
33    _priority = [
34        ('debug', dict(priority='debug')),
35        ('info', dict(priority='info')),
36        ('warn', dict(priority='warn')),
37        ('error', dict(priority='error')),
38        ('critical', dict(priority='critical')),
39    ]
40
41    _driver = [
42        ('messaging', dict(driver='messaging')),
43        ('messagingv2', dict(driver='messagingv2')),
44    ]
45
46    @classmethod
47    def generate_scenarios(cls):
48        cls.scenarios = testscenarios.multiply_scenarios(cls._priority,
49                                                         cls._driver)
50
51    def test_logging(self):
52        # NOTE(gtt): Using different topic to make tests run in parallel
53        topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver)
54
55        if self.notify_url.startswith("kafka://"):
56            self.conf.set_override('consumer_group', str(uuid.uuid4()),
57                                   group='oslo_messaging_kafka')
58
59        self.config(driver=[self.driver],
60                    topics=[topic],
61                    group='oslo_messaging_notifications')
62
63        listener = self.useFixture(
64            utils.NotificationFixture(self.conf, self.notify_url, [topic]))
65
66        log_notify = oslo_messaging.LoggingNotificationHandler(self.notify_url)
67
68        log = logging.getLogger(topic)
69        log.setLevel(logging.DEBUG)
70        log.addHandler(log_notify)
71
72        log_method = getattr(log, self.priority)
73        log_method('Test logging at priority: %s' % self.priority)
74
75        events = listener.get_events(timeout=15)
76        self.assertEqual(1, len(events))
77
78        info_event = events[0]
79
80        self.assertEqual(self.priority, info_event[0])
81        self.assertEqual('logrecord', info_event[1])
82
83        for key in ['name', 'thread', 'extra', 'process', 'funcName',
84                    'levelno', 'processName', 'pathname', 'lineno',
85                    'msg', 'exc_info', 'levelname']:
86            self.assertIn(key, info_event[2])
87
88
89LoggingNotificationHandlerTestCase.generate_scenarios()
90