1from __future__ import absolute_import, unicode_literals
2
3import sys
4import time
5
6from case import Mock, call, patch
7
8import celery.utils.timer2 as timer2
9
10
11class test_Timer:
12
13    def test_enter_after(self):
14        t = timer2.Timer()
15        try:
16            done = [False]
17
18            def set_done():
19                done[0] = True
20
21            t.call_after(0.3, set_done)
22            mss = 0
23            while not done[0]:
24                if mss >= 2.0:
25                    raise Exception('test timed out')
26                time.sleep(0.1)
27                mss += 0.1
28        finally:
29            t.stop()
30
31    def test_exit_after(self):
32        t = timer2.Timer()
33        t.call_after = Mock()
34        t.exit_after(0.3, priority=10)
35        t.call_after.assert_called_with(0.3, sys.exit, 10)
36
37    def test_ensure_started_not_started(self):
38        t = timer2.Timer()
39        t.running = True
40        t.start = Mock()
41        t.ensure_started()
42        t.start.assert_not_called()
43        t.running = False
44        t.on_start = Mock()
45        t.ensure_started()
46        t.on_start.assert_called_with(t)
47        t.start.assert_called_with()
48
49    @patch('celery.utils.timer2.sleep')
50    def test_on_tick(self, sleep):
51        on_tick = Mock(name='on_tick')
52        t = timer2.Timer(on_tick=on_tick)
53        ne = t._next_entry = Mock(name='_next_entry')
54        ne.return_value = 3.33
55        ne.on_nth_call_do(t._is_shutdown.set, 3)
56        t.run()
57        sleep.assert_called_with(3.33)
58        on_tick.assert_has_calls([call(3.33), call(3.33), call(3.33)])
59
60    @patch('os._exit')
61    def test_thread_crash(self, _exit):
62        t = timer2.Timer()
63        t._next_entry = Mock()
64        t._next_entry.side_effect = OSError(131)
65        t.run()
66        _exit.assert_called_with(1)
67
68    def test_gc_race_lost(self):
69        t = timer2.Timer()
70        t._is_stopped.set = Mock()
71        t._is_stopped.set.side_effect = TypeError()
72
73        t._is_shutdown.set()
74        t.run()
75        t._is_stopped.set.assert_called_with()
76
77    def test_test_enter(self):
78        t = timer2.Timer()
79        t._do_enter = Mock()
80        e = Mock()
81        t.enter(e, 13, 0)
82        t._do_enter.assert_called_with('enter_at', e, 13, priority=0)
83
84    def test_test_enter_after(self):
85        t = timer2.Timer()
86        t._do_enter = Mock()
87        t.enter_after()
88        t._do_enter.assert_called_with('enter_after')
89
90    def test_cancel(self):
91        t = timer2.Timer()
92        tref = Mock()
93        t.cancel(tref)
94        tref.cancel.assert_called_with()
95