1from __future__ import absolute_import, unicode_literals 2 3from time import time 4 5from case import Mock, patch 6 7from celery.events.dumper import Dumper, evdump, humanize_type 8from celery.five import WhateverIO 9 10 11class test_Dumper: 12 13 def setup(self): 14 self.out = WhateverIO() 15 self.dumper = Dumper(out=self.out) 16 17 def test_humanize_type(self): 18 assert humanize_type('worker-offline') == 'shutdown' 19 assert humanize_type('task-started') == 'task started' 20 21 def test_format_task_event(self): 22 self.dumper.format_task_event( 23 'worker@example.com', time(), 'task-started', 'tasks.add', {}) 24 assert self.out.getvalue() 25 26 def test_on_event(self): 27 event = { 28 'hostname': 'worker@example.com', 29 'timestamp': time(), 30 'uuid': '1ef', 31 'name': 'tasks.add', 32 'args': '(2, 2)', 33 'kwargs': '{}', 34 } 35 self.dumper.on_event(dict(event, type='task-received')) 36 assert self.out.getvalue() 37 self.dumper.on_event(dict(event, type='task-revoked')) 38 self.dumper.on_event(dict(event, type='worker-online')) 39 40 @patch('celery.events.EventReceiver.capture') 41 def test_evdump(self, capture): 42 capture.side_effect = KeyboardInterrupt() 43 evdump(app=self.app) 44 45 def test_evdump_error_handler(self): 46 app = Mock(name='app') 47 with patch('celery.events.dumper.Dumper') as Dumper: 48 Dumper.return_value = Mock(name='dumper') 49 recv = app.events.Receiver.return_value = Mock() 50 51 def se(*_a, **_k): 52 recv.capture.side_effect = SystemExit() 53 raise KeyError() 54 recv.capture.side_effect = se 55 56 Conn = app.connection_for_read.return_value = Mock(name='conn') 57 conn = Conn.clone.return_value = Mock(name='cloned_conn') 58 conn.connection_errors = (KeyError,) 59 conn.channel_errors = () 60 61 evdump(app) 62 conn.ensure_connection.assert_called() 63 errback = conn.ensure_connection.call_args[0][0] 64 errback(KeyError(), 1) 65 conn.as_uri.assert_called() 66