1from __future__ import absolute_import, unicode_literals
2
3import pytest
4from case import Mock, patch, skip
5
6from celery.exceptions import ImproperlyConfigured
7from celery.worker.components import Beat, Hub, Pool, Timer
8
9# some of these are tested in test_worker, so I've only written tests
10# here to complete coverage.  Should move everything to this module at some
11# point [-ask]
12
13
14class test_Timer:
15
16    def test_create__eventloop(self):
17        w = Mock(name='w')
18        w.use_eventloop = True
19        Timer(w).create(w)
20        assert not w.timer.queue
21
22
23class test_Hub:
24
25    def setup(self):
26        self.w = Mock(name='w')
27        self.hub = Hub(self.w)
28        self.w.hub = Mock(name='w.hub')
29
30    @patch('celery.worker.components.set_event_loop')
31    @patch('celery.worker.components.get_event_loop')
32    def test_create(self, get_event_loop, set_event_loop):
33        self.hub._patch_thread_primitives = Mock(name='ptp')
34        assert self.hub.create(self.w) is self.hub
35        self.hub._patch_thread_primitives.assert_called_with(self.w)
36
37    def test_start(self):
38        self.hub.start(self.w)
39
40    def test_stop(self):
41        self.hub.stop(self.w)
42        self.w.hub.close.assert_called_with()
43
44    def test_terminate(self):
45        self.hub.terminate(self.w)
46        self.w.hub.close.assert_called_with()
47
48
49class test_Pool:
50
51    def test_close_terminate(self):
52        w = Mock()
53        comp = Pool(w)
54        pool = w.pool = Mock()
55        comp.close(w)
56        pool.close.assert_called_with()
57        comp.terminate(w)
58        pool.terminate.assert_called_with()
59
60        w.pool = None
61        comp.close(w)
62        comp.terminate(w)
63
64    @skip.if_win32()
65    def test_create_when_eventloop(self):
66        w = Mock()
67        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
68        comp = Pool(w)
69        w.pool = Mock()
70        comp.create(w)
71        assert w.process_task is w._process_task_sem
72
73    def test_create_calls_instantiate_with_max_memory(self):
74        w = Mock()
75        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
76        comp = Pool(w)
77        comp.instantiate = Mock()
78        w.max_memory_per_child = 32
79
80        comp.create(w)
81
82        assert comp.instantiate.call_args[1]['max_memory_per_child'] == 32
83
84
85class test_Beat:
86
87    def test_create__green(self):
88        w = Mock(name='w')
89        w.pool_cls.__module__ = 'foo_gevent'
90        with pytest.raises(ImproperlyConfigured):
91            Beat(w).create(w)
92