1from __future__ import absolute_import, unicode_literals
2
3from time import time
4
5from case import Mock, patch
6
7from celery.events.dumper import Dumper, evdump, humanize_type
8from celery.five import WhateverIO
9
10
11class test_Dumper:
12
13    def setup(self):
14        self.out = WhateverIO()
15        self.dumper = Dumper(out=self.out)
16
17    def test_humanize_type(self):
18        assert humanize_type('worker-offline') == 'shutdown'
19        assert humanize_type('task-started') == 'task started'
20
21    def test_format_task_event(self):
22        self.dumper.format_task_event(
23            'worker@example.com', time(), 'task-started', 'tasks.add', {})
24        assert self.out.getvalue()
25
26    def test_on_event(self):
27        event = {
28            'hostname': 'worker@example.com',
29            'timestamp': time(),
30            'uuid': '1ef',
31            'name': 'tasks.add',
32            'args': '(2, 2)',
33            'kwargs': '{}',
34        }
35        self.dumper.on_event(dict(event, type='task-received'))
36        assert self.out.getvalue()
37        self.dumper.on_event(dict(event, type='task-revoked'))
38        self.dumper.on_event(dict(event, type='worker-online'))
39
40    @patch('celery.events.EventReceiver.capture')
41    def test_evdump(self, capture):
42        capture.side_effect = KeyboardInterrupt()
43        evdump(app=self.app)
44
45    def test_evdump_error_handler(self):
46        app = Mock(name='app')
47        with patch('celery.events.dumper.Dumper') as Dumper:
48            Dumper.return_value = Mock(name='dumper')
49            recv = app.events.Receiver.return_value = Mock()
50
51            def se(*_a, **_k):
52                recv.capture.side_effect = SystemExit()
53                raise KeyError()
54            recv.capture.side_effect = se
55
56            Conn = app.connection_for_read.return_value = Mock(name='conn')
57            conn = Conn.clone.return_value = Mock(name='cloned_conn')
58            conn.connection_errors = (KeyError,)
59            conn.channel_errors = ()
60
61            evdump(app)
62            conn.ensure_connection.assert_called()
63            errback = conn.ensure_connection.call_args[0][0]
64            errback(KeyError(), 1)
65            conn.as_uri.assert_called()
66