1#-----------------------------------------------------------------------------
2# Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors.
3# All rights reserved.
4#
5# The full license is in the file LICENSE.txt, distributed with this software.
6#-----------------------------------------------------------------------------
7
8#-----------------------------------------------------------------------------
9# Boilerplate
10#-----------------------------------------------------------------------------
11import pytest ; pytest
12
13#-----------------------------------------------------------------------------
14# Imports
15#-----------------------------------------------------------------------------
16
17# Standard library imports
18from concurrent.futures import ThreadPoolExecutor
19from itertools import repeat
20
21# External imports
22from flaky import flaky
23from tornado.ioloop import IOLoop
24
25# Module under test
26from bokeh.util.tornado import _CallbackGroup # isort:skip
27
28#-----------------------------------------------------------------------------
29# Setup
30#-----------------------------------------------------------------------------
31
32def _make_invocation_counter(loop, stop_after=1):
33    from types import MethodType
34    counter = { 'count' : 0 }
35    def func():
36        counter['count'] += 1
37        if stop_after is not None and counter['count'] >= stop_after:
38            loop.stop()
39    def count(self):
40        return self.counter['count']
41    func.count = MethodType(count, func)
42    func.counter = counter
43    return func
44
45# this is so ctrl+c out of the tests will show the actual
46# error, which pytest otherwise won't do by default
47def run(loop):
48    try:
49        loop.start()
50    except KeyboardInterrupt:
51        print("Keyboard interrupt")
52
53class LoopAndGroup:
54    def __init__(self, quit_after=None):
55        self.io_loop = IOLoop()
56        self.io_loop.make_current()
57        self.group = _CallbackGroup(self.io_loop)
58
59        if quit_after is not None:
60            self.io_loop.call_later(quit_after / 1000.0,
61                                    lambda: self.io_loop.stop())
62
63    def __exit__(self, type, value, traceback):
64        run(self.io_loop)
65        self.io_loop.close()
66
67    def __enter__(self):
68        return self
69
70#-----------------------------------------------------------------------------
71# General API
72#-----------------------------------------------------------------------------
73
74
75class TestCallbackGroup:
76    @flaky(max_runs=10)
77    def test_next_tick_runs(self) -> None:
78        with (LoopAndGroup()) as ctx:
79            func = _make_invocation_counter(ctx.io_loop)
80            assert 0 == len(ctx.group._next_tick_callback_removers)
81            ctx.group.add_next_tick_callback(func)
82            assert 1 == len(ctx.group._next_tick_callback_removers)
83        assert 1 == func.count()
84        # check for leaks
85        assert 0 == len(ctx.group._next_tick_callback_removers)
86
87    @flaky(max_runs=10)
88    def test_timeout_runs(self) -> None:
89        with (LoopAndGroup()) as ctx:
90            func = _make_invocation_counter(ctx.io_loop)
91            assert 0 == len(ctx.group._timeout_callback_removers)
92            ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
93            assert 1 == len(ctx.group._timeout_callback_removers)
94        assert 1 == func.count()
95        # check for leaks
96        assert 0 == len(ctx.group._timeout_callback_removers)
97
98    @flaky(max_runs=10)
99    def test_periodic_runs(self) -> None:
100        with (LoopAndGroup()) as ctx:
101            func = _make_invocation_counter(ctx.io_loop, stop_after=5)
102            assert 0 == len(ctx.group._periodic_callback_removers)
103            cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1)
104            assert 1 == len(ctx.group._periodic_callback_removers)
105        assert 5 == func.count()
106        # check for leaks... periodic doesn't self-remove though
107        assert 1 == len(ctx.group._periodic_callback_removers)
108        ctx.group.remove_periodic_callback(cb_id)
109        assert 0 == len(ctx.group._periodic_callback_removers)
110
111    @flaky(max_runs=10)
112    def test_next_tick_does_not_run_if_removed_immediately(self) -> None:
113        with (LoopAndGroup(quit_after=15)) as ctx:
114            func = _make_invocation_counter(ctx.io_loop)
115            cb_id = ctx.group.add_next_tick_callback(func)
116            ctx.group.remove_next_tick_callback(cb_id)
117        assert 0 == func.count()
118
119    @flaky(max_runs=10)
120    def test_timeout_does_not_run_if_removed_immediately(self) -> None:
121        with (LoopAndGroup(quit_after=15)) as ctx:
122            func = _make_invocation_counter(ctx.io_loop)
123            cb_id = ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
124            ctx.group.remove_timeout_callback(cb_id)
125        assert 0 == func.count()
126
127    @flaky(max_runs=10)
128    def test_periodic_does_not_run_if_removed_immediately(self) -> None:
129        with (LoopAndGroup(quit_after=15)) as ctx:
130            func = _make_invocation_counter(ctx.io_loop, stop_after=5)
131            cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1)
132            ctx.group.remove_periodic_callback(cb_id)
133        assert 0 == func.count()
134
135    @flaky(max_runs=10)
136    def test_same_callback_as_all_three_types(self) -> None:
137        with (LoopAndGroup()) as ctx:
138            func = _make_invocation_counter(ctx.io_loop, stop_after=5)
139            # we want the timeout and next_tick to run before the periodic
140            ctx.group.add_periodic_callback(func, period_milliseconds=2)
141            ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
142            ctx.group.add_next_tick_callback(func)
143        assert 5 == func.count()
144
145    @flaky(max_runs=10)
146    def test_adding_next_tick_twice(self) -> None:
147        with (LoopAndGroup()) as ctx:
148            func = _make_invocation_counter(ctx.io_loop, stop_after=2)
149            ctx.group.add_next_tick_callback(func)
150            ctx.group.add_next_tick_callback(func)
151        assert 2 == func.count()
152
153    @flaky(max_runs=10)
154    def test_adding_timeout_twice(self) -> None:
155        with (LoopAndGroup()) as ctx:
156            func = _make_invocation_counter(ctx.io_loop, stop_after=2)
157            ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
158            ctx.group.add_timeout_callback(func, timeout_milliseconds=2)
159        assert 2 == func.count()
160
161    @flaky(max_runs=10)
162    def test_adding_periodic_twice(self) -> None:
163        with (LoopAndGroup()) as ctx:
164            func = _make_invocation_counter(ctx.io_loop, stop_after=2)
165            ctx.group.add_periodic_callback(func, period_milliseconds=3)
166            ctx.group.add_periodic_callback(func, period_milliseconds=2)
167        assert 2 == func.count()
168
169    @flaky(max_runs=10)
170    def test_remove_all_callbacks(self) -> None:
171        with (LoopAndGroup(quit_after=15)) as ctx:
172            # add a callback that will remove all the others
173            def remove_all():
174                ctx.group.remove_all_callbacks()
175            ctx.group.add_next_tick_callback(remove_all)
176            # none of these should run
177            func = _make_invocation_counter(ctx.io_loop, stop_after=5)
178            ctx.group.add_periodic_callback(func, period_milliseconds=2)
179            ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
180            ctx.group.add_next_tick_callback(func)
181        assert 0 == func.count()
182
183    @flaky(max_runs=10)
184    def test_removing_next_tick_twice(self) -> None:
185        with (LoopAndGroup(quit_after=15)) as ctx:
186            func = _make_invocation_counter(ctx.io_loop)
187            cb_id = ctx.group.add_next_tick_callback(func)
188            ctx.group.remove_next_tick_callback(cb_id)
189            with pytest.raises(ValueError) as exc:
190                ctx.group.remove_next_tick_callback(cb_id)
191        assert 0 == func.count()
192        assert "twice" in repr(exc.value)
193
194    @flaky(max_runs=10)
195    def test_removing_timeout_twice(self) -> None:
196        with (LoopAndGroup(quit_after=15)) as ctx:
197            func = _make_invocation_counter(ctx.io_loop)
198            cb_id = ctx.group.add_timeout_callback(func, timeout_milliseconds=1)
199            ctx.group.remove_timeout_callback(cb_id)
200            with pytest.raises(ValueError) as exc:
201                ctx.group.remove_timeout_callback(cb_id)
202        assert 0 == func.count()
203        assert "twice" in repr(exc.value)
204
205    @flaky(max_runs=10)
206    def test_removing_periodic_twice(self) -> None:
207        with (LoopAndGroup(quit_after=15)) as ctx:
208            func = _make_invocation_counter(ctx.io_loop, stop_after=5)
209            cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1)
210            ctx.group.remove_periodic_callback(cb_id)
211            with pytest.raises(ValueError) as exc:
212                ctx.group.remove_periodic_callback(cb_id)
213        assert 0 == func.count()
214        assert "twice" in repr(exc.value)
215
216    @flaky(max_runs=10)
217    def test_adding_next_tick_from_another_thread(self) -> None:
218        # The test has probabilistic nature - there's a slight change it'll give a false negative
219        with LoopAndGroup(quit_after=15) as ctx:
220            n = 1000
221            func = _make_invocation_counter(ctx.io_loop, stop_after=n)
222            tpe = ThreadPoolExecutor(n)
223            list(tpe.map(ctx.group.add_next_tick_callback, repeat(func, n)))
224        assert n == func.count()
225
226#-----------------------------------------------------------------------------
227# Dev API
228#-----------------------------------------------------------------------------
229
230#-----------------------------------------------------------------------------
231# Private API
232#-----------------------------------------------------------------------------
233
234#-----------------------------------------------------------------------------
235# Code
236#-----------------------------------------------------------------------------
237