1#----------------------------------------------------------------------------- 2# Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors. 3# All rights reserved. 4# 5# The full license is in the file LICENSE.txt, distributed with this software. 6#----------------------------------------------------------------------------- 7 8#----------------------------------------------------------------------------- 9# Boilerplate 10#----------------------------------------------------------------------------- 11import pytest ; pytest 12 13#----------------------------------------------------------------------------- 14# Imports 15#----------------------------------------------------------------------------- 16 17# Standard library imports 18from concurrent.futures import ThreadPoolExecutor 19from itertools import repeat 20 21# External imports 22from flaky import flaky 23from tornado.ioloop import IOLoop 24 25# Module under test 26from bokeh.util.tornado import _CallbackGroup # isort:skip 27 28#----------------------------------------------------------------------------- 29# Setup 30#----------------------------------------------------------------------------- 31 32def _make_invocation_counter(loop, stop_after=1): 33 from types import MethodType 34 counter = { 'count' : 0 } 35 def func(): 36 counter['count'] += 1 37 if stop_after is not None and counter['count'] >= stop_after: 38 loop.stop() 39 def count(self): 40 return self.counter['count'] 41 func.count = MethodType(count, func) 42 func.counter = counter 43 return func 44 45# this is so ctrl+c out of the tests will show the actual 46# error, which pytest otherwise won't do by default 47def run(loop): 48 try: 49 loop.start() 50 except KeyboardInterrupt: 51 print("Keyboard interrupt") 52 53class LoopAndGroup: 54 def __init__(self, quit_after=None): 55 self.io_loop = IOLoop() 56 self.io_loop.make_current() 57 self.group = _CallbackGroup(self.io_loop) 58 59 if quit_after is not None: 60 self.io_loop.call_later(quit_after / 1000.0, 61 lambda: self.io_loop.stop()) 62 63 def __exit__(self, type, value, traceback): 64 run(self.io_loop) 65 self.io_loop.close() 66 67 def __enter__(self): 68 return self 69 70#----------------------------------------------------------------------------- 71# General API 72#----------------------------------------------------------------------------- 73 74 75class TestCallbackGroup: 76 @flaky(max_runs=10) 77 def test_next_tick_runs(self) -> None: 78 with (LoopAndGroup()) as ctx: 79 func = _make_invocation_counter(ctx.io_loop) 80 assert 0 == len(ctx.group._next_tick_callback_removers) 81 ctx.group.add_next_tick_callback(func) 82 assert 1 == len(ctx.group._next_tick_callback_removers) 83 assert 1 == func.count() 84 # check for leaks 85 assert 0 == len(ctx.group._next_tick_callback_removers) 86 87 @flaky(max_runs=10) 88 def test_timeout_runs(self) -> None: 89 with (LoopAndGroup()) as ctx: 90 func = _make_invocation_counter(ctx.io_loop) 91 assert 0 == len(ctx.group._timeout_callback_removers) 92 ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 93 assert 1 == len(ctx.group._timeout_callback_removers) 94 assert 1 == func.count() 95 # check for leaks 96 assert 0 == len(ctx.group._timeout_callback_removers) 97 98 @flaky(max_runs=10) 99 def test_periodic_runs(self) -> None: 100 with (LoopAndGroup()) as ctx: 101 func = _make_invocation_counter(ctx.io_loop, stop_after=5) 102 assert 0 == len(ctx.group._periodic_callback_removers) 103 cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1) 104 assert 1 == len(ctx.group._periodic_callback_removers) 105 assert 5 == func.count() 106 # check for leaks... periodic doesn't self-remove though 107 assert 1 == len(ctx.group._periodic_callback_removers) 108 ctx.group.remove_periodic_callback(cb_id) 109 assert 0 == len(ctx.group._periodic_callback_removers) 110 111 @flaky(max_runs=10) 112 def test_next_tick_does_not_run_if_removed_immediately(self) -> None: 113 with (LoopAndGroup(quit_after=15)) as ctx: 114 func = _make_invocation_counter(ctx.io_loop) 115 cb_id = ctx.group.add_next_tick_callback(func) 116 ctx.group.remove_next_tick_callback(cb_id) 117 assert 0 == func.count() 118 119 @flaky(max_runs=10) 120 def test_timeout_does_not_run_if_removed_immediately(self) -> None: 121 with (LoopAndGroup(quit_after=15)) as ctx: 122 func = _make_invocation_counter(ctx.io_loop) 123 cb_id = ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 124 ctx.group.remove_timeout_callback(cb_id) 125 assert 0 == func.count() 126 127 @flaky(max_runs=10) 128 def test_periodic_does_not_run_if_removed_immediately(self) -> None: 129 with (LoopAndGroup(quit_after=15)) as ctx: 130 func = _make_invocation_counter(ctx.io_loop, stop_after=5) 131 cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1) 132 ctx.group.remove_periodic_callback(cb_id) 133 assert 0 == func.count() 134 135 @flaky(max_runs=10) 136 def test_same_callback_as_all_three_types(self) -> None: 137 with (LoopAndGroup()) as ctx: 138 func = _make_invocation_counter(ctx.io_loop, stop_after=5) 139 # we want the timeout and next_tick to run before the periodic 140 ctx.group.add_periodic_callback(func, period_milliseconds=2) 141 ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 142 ctx.group.add_next_tick_callback(func) 143 assert 5 == func.count() 144 145 @flaky(max_runs=10) 146 def test_adding_next_tick_twice(self) -> None: 147 with (LoopAndGroup()) as ctx: 148 func = _make_invocation_counter(ctx.io_loop, stop_after=2) 149 ctx.group.add_next_tick_callback(func) 150 ctx.group.add_next_tick_callback(func) 151 assert 2 == func.count() 152 153 @flaky(max_runs=10) 154 def test_adding_timeout_twice(self) -> None: 155 with (LoopAndGroup()) as ctx: 156 func = _make_invocation_counter(ctx.io_loop, stop_after=2) 157 ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 158 ctx.group.add_timeout_callback(func, timeout_milliseconds=2) 159 assert 2 == func.count() 160 161 @flaky(max_runs=10) 162 def test_adding_periodic_twice(self) -> None: 163 with (LoopAndGroup()) as ctx: 164 func = _make_invocation_counter(ctx.io_loop, stop_after=2) 165 ctx.group.add_periodic_callback(func, period_milliseconds=3) 166 ctx.group.add_periodic_callback(func, period_milliseconds=2) 167 assert 2 == func.count() 168 169 @flaky(max_runs=10) 170 def test_remove_all_callbacks(self) -> None: 171 with (LoopAndGroup(quit_after=15)) as ctx: 172 # add a callback that will remove all the others 173 def remove_all(): 174 ctx.group.remove_all_callbacks() 175 ctx.group.add_next_tick_callback(remove_all) 176 # none of these should run 177 func = _make_invocation_counter(ctx.io_loop, stop_after=5) 178 ctx.group.add_periodic_callback(func, period_milliseconds=2) 179 ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 180 ctx.group.add_next_tick_callback(func) 181 assert 0 == func.count() 182 183 @flaky(max_runs=10) 184 def test_removing_next_tick_twice(self) -> None: 185 with (LoopAndGroup(quit_after=15)) as ctx: 186 func = _make_invocation_counter(ctx.io_loop) 187 cb_id = ctx.group.add_next_tick_callback(func) 188 ctx.group.remove_next_tick_callback(cb_id) 189 with pytest.raises(ValueError) as exc: 190 ctx.group.remove_next_tick_callback(cb_id) 191 assert 0 == func.count() 192 assert "twice" in repr(exc.value) 193 194 @flaky(max_runs=10) 195 def test_removing_timeout_twice(self) -> None: 196 with (LoopAndGroup(quit_after=15)) as ctx: 197 func = _make_invocation_counter(ctx.io_loop) 198 cb_id = ctx.group.add_timeout_callback(func, timeout_milliseconds=1) 199 ctx.group.remove_timeout_callback(cb_id) 200 with pytest.raises(ValueError) as exc: 201 ctx.group.remove_timeout_callback(cb_id) 202 assert 0 == func.count() 203 assert "twice" in repr(exc.value) 204 205 @flaky(max_runs=10) 206 def test_removing_periodic_twice(self) -> None: 207 with (LoopAndGroup(quit_after=15)) as ctx: 208 func = _make_invocation_counter(ctx.io_loop, stop_after=5) 209 cb_id = ctx.group.add_periodic_callback(func, period_milliseconds=1) 210 ctx.group.remove_periodic_callback(cb_id) 211 with pytest.raises(ValueError) as exc: 212 ctx.group.remove_periodic_callback(cb_id) 213 assert 0 == func.count() 214 assert "twice" in repr(exc.value) 215 216 @flaky(max_runs=10) 217 def test_adding_next_tick_from_another_thread(self) -> None: 218 # The test has probabilistic nature - there's a slight change it'll give a false negative 219 with LoopAndGroup(quit_after=15) as ctx: 220 n = 1000 221 func = _make_invocation_counter(ctx.io_loop, stop_after=n) 222 tpe = ThreadPoolExecutor(n) 223 list(tpe.map(ctx.group.add_next_tick_callback, repeat(func, n))) 224 assert n == func.count() 225 226#----------------------------------------------------------------------------- 227# Dev API 228#----------------------------------------------------------------------------- 229 230#----------------------------------------------------------------------------- 231# Private API 232#----------------------------------------------------------------------------- 233 234#----------------------------------------------------------------------------- 235# Code 236#----------------------------------------------------------------------------- 237