1#----------------------------------------------------------------------------- 2# Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors. 3# All rights reserved. 4# 5# The full license is in the file LICENSE.txt, distributed with this software. 6#----------------------------------------------------------------------------- 7""" Internal utils related to Tornado 8 9""" 10 11#----------------------------------------------------------------------------- 12# Boilerplate 13#----------------------------------------------------------------------------- 14import logging # isort:skip 15log = logging.getLogger(__name__) 16 17#----------------------------------------------------------------------------- 18# Imports 19#----------------------------------------------------------------------------- 20 21# Standard library imports 22import sys 23import threading 24from collections import defaultdict 25from traceback import format_exception 26 27# External imports 28from tornado import gen 29 30# Bokeh imports 31from ..util.serialization import make_id 32 33#----------------------------------------------------------------------------- 34# Globals and constants 35#----------------------------------------------------------------------------- 36 37__all__ = () 38 39#----------------------------------------------------------------------------- 40# General API 41#----------------------------------------------------------------------------- 42 43#----------------------------------------------------------------------------- 44# Dev API 45#----------------------------------------------------------------------------- 46 47# See https://github.com/bokeh/bokeh/issues/9507 48def fixup_windows_event_loop_policy() -> None: 49 if sys.platform == 'win32' and sys.version_info[:3] >= (3, 8, 0): 50 import asyncio 51 if type(asyncio.get_event_loop_policy()) is asyncio.WindowsProactorEventLoopPolicy: 52 # WindowsProactorEventLoopPolicy is not compatible with tornado 6 53 # fallback to the pre-3.8 default of WindowsSelectorEventLoopPolicy 54 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 55 56#----------------------------------------------------------------------------- 57# Private API 58#----------------------------------------------------------------------------- 59 60class _AsyncPeriodic: 61 """Like ioloop.PeriodicCallback except the 'func' can be async and 62 return a Future, and we wait for func to finish each time 63 before we call it again. Plain ioloop.PeriodicCallback 64 can "pile up" invocations if they are taking too long. 65 66 """ 67 68 def __init__(self, func, period, io_loop): 69 self._func = func 70 self._loop = io_loop 71 self._period = period 72 self._started = False 73 self._stopped = False 74 75 # this is like gen.sleep but uses our IOLoop instead of the 76 # current IOLoop 77 def sleep(self): 78 f = gen.Future() 79 self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None)) 80 return f 81 82 def start(self): 83 if self._started: 84 raise RuntimeError("called start() twice on _AsyncPeriodic") 85 self._started = True 86 87 def invoke(): 88 # important to start the sleep before starting callback so any initial 89 # time spent in callback "counts against" the period. 90 sleep_future = self.sleep() 91 result = self._func() 92 93 if result is None: 94 return sleep_future 95 96 callback_future = gen.convert_yielded(result) 97 return gen.multi([sleep_future, callback_future]) 98 99 def on_done(future): 100 if not self._stopped: 101 self._loop.add_future(invoke(), on_done) 102 ex = future.exception() 103 if ex is not None: 104 log.error("Error thrown from periodic callback:") 105 lines = format_exception(ex.__class__, ex, ex.__traceback__) 106 log.error("".join(lines)) 107 108 self._loop.add_future(self.sleep(), on_done) 109 110 def stop(self): 111 self._stopped = True 112 113class _CallbackGroup: 114 """ A collection of callbacks added to a Tornado IOLoop that we may 115 want to remove as a group. """ 116 117 def __init__(self, io_loop=None): 118 if io_loop is None: 119 raise ValueError("must provide an io loop") 120 self._loop = io_loop 121 # dicts from callback to remove callable. These are 122 # separate only because it's allowed to add the same 123 # callback as multiple kinds of callback at once. 124 self._next_tick_callback_removers = {} 125 self._timeout_callback_removers = {} 126 self._periodic_callback_removers = {} 127 self._removers_lock = threading.Lock() 128 129 self._next_tick_removers_by_callable = defaultdict(set) 130 self._timeout_removers_by_callable = defaultdict(set) 131 self._periodic_removers_by_callable = defaultdict(set) 132 133 def remove_all_callbacks(self): 134 """ Removes all registered callbacks.""" 135 for cb_id in list(self._next_tick_callback_removers.keys()): 136 self.remove_next_tick_callback(cb_id) 137 for cb_id in list(self._timeout_callback_removers.keys()): 138 self.remove_timeout_callback(cb_id) 139 for cb_id in list(self._periodic_callback_removers.keys()): 140 self.remove_periodic_callback(cb_id) 141 142 def _get_removers_ids_by_callable(self, removers): 143 if removers is self._next_tick_callback_removers: 144 return self._next_tick_removers_by_callable 145 elif removers is self._timeout_callback_removers: 146 return self._timeout_removers_by_callable 147 elif removers is self._periodic_callback_removers: 148 return self._periodic_removers_by_callable 149 else: 150 raise RuntimeError('Unhandled removers', removers) 151 152 def _assign_remover(self, callback, callback_id, removers, remover): 153 with self._removers_lock: 154 if callback_id is None: 155 callback_id = make_id() 156 elif callback_id in removers: 157 raise ValueError("A callback of the same type has already been added with this ID") 158 removers[callback_id] = remover 159 return callback_id 160 161 def _execute_remover(self, callback_id, removers): 162 try: 163 with self._removers_lock: 164 remover = removers.pop(callback_id) 165 for cb, cb_ids in list(self._get_removers_ids_by_callable(removers).items()): 166 try: 167 cb_ids.remove(callback_id) 168 if not cb_ids: 169 del self._get_removers_ids_by_callable(removers)[cb] 170 except KeyError: 171 pass 172 except KeyError: 173 raise ValueError("Removing a callback twice (or after it's already been run)") 174 remover() 175 176 def add_next_tick_callback(self, callback, callback_id=None): 177 """ Adds a callback to be run on the next tick. 178 Returns an ID that can be used with remove_next_tick_callback.""" 179 def wrapper(*args, **kwargs): 180 # this 'removed' flag is a hack because Tornado has no way 181 # to remove a "next tick" callback added with 182 # IOLoop.add_callback. So instead we make our wrapper skip 183 # invoking the callback. 184 if not wrapper.removed: 185 self.remove_next_tick_callback(callback_id) 186 return callback(*args, **kwargs) 187 else: 188 return None 189 190 wrapper.removed = False 191 192 def remover(): 193 wrapper.removed = True 194 195 callback_id = self._assign_remover(callback, callback_id, self._next_tick_callback_removers, remover) 196 self._loop.add_callback(wrapper) 197 return callback_id 198 199 def remove_next_tick_callback(self, callback_id): 200 """ Removes a callback added with add_next_tick_callback.""" 201 self._execute_remover(callback_id, self._next_tick_callback_removers) 202 203 def add_timeout_callback(self, callback, timeout_milliseconds, callback_id=None): 204 """ Adds a callback to be run once after timeout_milliseconds. 205 Returns an ID that can be used with remove_timeout_callback.""" 206 def wrapper(*args, **kwargs): 207 self.remove_timeout_callback(callback_id) 208 return callback(*args, **kwargs) 209 210 handle = None 211 212 def remover(): 213 if handle is not None: 214 self._loop.remove_timeout(handle) 215 216 callback_id = self._assign_remover(callback, callback_id, self._timeout_callback_removers, remover) 217 handle = self._loop.call_later(timeout_milliseconds / 1000.0, wrapper) 218 return callback_id 219 220 def remove_timeout_callback(self, callback_id): 221 """ Removes a callback added with add_timeout_callback, before it runs.""" 222 self._execute_remover(callback_id, self._timeout_callback_removers) 223 224 def add_periodic_callback(self, callback, period_milliseconds, callback_id=None): 225 """ Adds a callback to be run every period_milliseconds until it is removed. 226 Returns an ID that can be used with remove_periodic_callback.""" 227 228 cb = _AsyncPeriodic(callback, period_milliseconds, io_loop=self._loop) 229 callback_id = self._assign_remover(callback, callback_id, self._periodic_callback_removers, cb.stop) 230 cb.start() 231 return callback_id 232 233 def remove_periodic_callback(self, callback_id): 234 """ Removes a callback added with add_periodic_callback.""" 235 self._execute_remover(callback_id, self._periodic_callback_removers) 236 237#----------------------------------------------------------------------------- 238# Code 239#----------------------------------------------------------------------------- 240