1#-----------------------------------------------------------------------------
2# Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors.
3# All rights reserved.
4#
5# The full license is in the file LICENSE.txt, distributed with this software.
6#-----------------------------------------------------------------------------
7""" Internal utils related to Tornado
8
9"""
10
11#-----------------------------------------------------------------------------
12# Boilerplate
13#-----------------------------------------------------------------------------
14import logging # isort:skip
15log = logging.getLogger(__name__)
16
17#-----------------------------------------------------------------------------
18# Imports
19#-----------------------------------------------------------------------------
20
21# Standard library imports
22import sys
23import threading
24from collections import defaultdict
25from traceback import format_exception
26
27# External imports
28from tornado import gen
29
30# Bokeh imports
31from ..util.serialization import make_id
32
33#-----------------------------------------------------------------------------
34# Globals and constants
35#-----------------------------------------------------------------------------
36
37__all__ = ()
38
39#-----------------------------------------------------------------------------
40# General API
41#-----------------------------------------------------------------------------
42
43#-----------------------------------------------------------------------------
44# Dev API
45#-----------------------------------------------------------------------------
46
47# See https://github.com/bokeh/bokeh/issues/9507
48def fixup_windows_event_loop_policy() -> None:
49    if sys.platform == 'win32' and sys.version_info[:3] >= (3, 8, 0):
50        import asyncio
51        if type(asyncio.get_event_loop_policy()) is asyncio.WindowsProactorEventLoopPolicy:
52            # WindowsProactorEventLoopPolicy is not compatible with tornado 6
53            # fallback to the pre-3.8 default of WindowsSelectorEventLoopPolicy
54            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
55
56#-----------------------------------------------------------------------------
57# Private API
58#-----------------------------------------------------------------------------
59
60class _AsyncPeriodic:
61    """Like ioloop.PeriodicCallback except the 'func' can be async and
62        return a Future, and we wait for func to finish each time
63        before we call it again.  Plain ioloop.PeriodicCallback
64        can "pile up" invocations if they are taking too long.
65
66    """
67
68    def __init__(self, func, period, io_loop):
69        self._func = func
70        self._loop = io_loop
71        self._period = period
72        self._started = False
73        self._stopped = False
74
75    # this is like gen.sleep but uses our IOLoop instead of the
76    # current IOLoop
77    def sleep(self):
78        f = gen.Future()
79        self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None))
80        return f
81
82    def start(self):
83        if self._started:
84            raise RuntimeError("called start() twice on _AsyncPeriodic")
85        self._started = True
86
87        def invoke():
88            # important to start the sleep before starting callback so any initial
89            # time spent in callback "counts against" the period.
90            sleep_future = self.sleep()
91            result = self._func()
92
93            if result is None:
94                return sleep_future
95
96            callback_future = gen.convert_yielded(result)
97            return gen.multi([sleep_future, callback_future])
98
99        def on_done(future):
100            if not self._stopped:
101                self._loop.add_future(invoke(), on_done)
102            ex = future.exception()
103            if ex is not None:
104                log.error("Error thrown from periodic callback:")
105                lines = format_exception(ex.__class__, ex, ex.__traceback__)
106                log.error("".join(lines))
107
108        self._loop.add_future(self.sleep(), on_done)
109
110    def stop(self):
111        self._stopped = True
112
113class _CallbackGroup:
114    """ A collection of callbacks added to a Tornado IOLoop that we may
115    want to remove as a group. """
116
117    def __init__(self, io_loop=None):
118        if io_loop is None:
119            raise ValueError("must provide an io loop")
120        self._loop = io_loop
121        # dicts from callback to remove callable. These are
122        # separate only because it's allowed to add the same
123        # callback as multiple kinds of callback at once.
124        self._next_tick_callback_removers = {}
125        self._timeout_callback_removers = {}
126        self._periodic_callback_removers = {}
127        self._removers_lock = threading.Lock()
128
129        self._next_tick_removers_by_callable = defaultdict(set)
130        self._timeout_removers_by_callable = defaultdict(set)
131        self._periodic_removers_by_callable = defaultdict(set)
132
133    def remove_all_callbacks(self):
134        """ Removes all registered callbacks."""
135        for cb_id in list(self._next_tick_callback_removers.keys()):
136            self.remove_next_tick_callback(cb_id)
137        for cb_id in list(self._timeout_callback_removers.keys()):
138            self.remove_timeout_callback(cb_id)
139        for cb_id in list(self._periodic_callback_removers.keys()):
140            self.remove_periodic_callback(cb_id)
141
142    def _get_removers_ids_by_callable(self, removers):
143        if removers is self._next_tick_callback_removers:
144            return self._next_tick_removers_by_callable
145        elif removers is self._timeout_callback_removers:
146            return self._timeout_removers_by_callable
147        elif removers is self._periodic_callback_removers:
148            return self._periodic_removers_by_callable
149        else:
150            raise RuntimeError('Unhandled removers', removers)
151
152    def _assign_remover(self, callback, callback_id, removers, remover):
153        with self._removers_lock:
154            if callback_id is None:
155                callback_id = make_id()
156            elif callback_id in removers:
157                raise ValueError("A callback of the same type has already been added with this ID")
158            removers[callback_id] = remover
159            return callback_id
160
161    def _execute_remover(self, callback_id, removers):
162        try:
163            with self._removers_lock:
164                remover = removers.pop(callback_id)
165                for cb, cb_ids in list(self._get_removers_ids_by_callable(removers).items()):
166                    try:
167                        cb_ids.remove(callback_id)
168                        if not cb_ids:
169                            del self._get_removers_ids_by_callable(removers)[cb]
170                    except KeyError:
171                        pass
172        except KeyError:
173            raise ValueError("Removing a callback twice (or after it's already been run)")
174        remover()
175
176    def add_next_tick_callback(self, callback, callback_id=None):
177        """ Adds a callback to be run on the next tick.
178        Returns an ID that can be used with remove_next_tick_callback."""
179        def wrapper(*args, **kwargs):
180            # this 'removed' flag is a hack because Tornado has no way
181            # to remove a "next tick" callback added with
182            # IOLoop.add_callback. So instead we make our wrapper skip
183            # invoking the callback.
184            if not wrapper.removed:
185                self.remove_next_tick_callback(callback_id)
186                return callback(*args, **kwargs)
187            else:
188                return None
189
190        wrapper.removed = False
191
192        def remover():
193            wrapper.removed = True
194
195        callback_id = self._assign_remover(callback, callback_id, self._next_tick_callback_removers, remover)
196        self._loop.add_callback(wrapper)
197        return callback_id
198
199    def remove_next_tick_callback(self, callback_id):
200        """ Removes a callback added with add_next_tick_callback."""
201        self._execute_remover(callback_id, self._next_tick_callback_removers)
202
203    def add_timeout_callback(self, callback, timeout_milliseconds, callback_id=None):
204        """ Adds a callback to be run once after timeout_milliseconds.
205        Returns an ID that can be used with remove_timeout_callback."""
206        def wrapper(*args, **kwargs):
207            self.remove_timeout_callback(callback_id)
208            return callback(*args, **kwargs)
209
210        handle = None
211
212        def remover():
213            if handle is not None:
214                self._loop.remove_timeout(handle)
215
216        callback_id = self._assign_remover(callback, callback_id, self._timeout_callback_removers, remover)
217        handle = self._loop.call_later(timeout_milliseconds / 1000.0, wrapper)
218        return callback_id
219
220    def remove_timeout_callback(self, callback_id):
221        """ Removes a callback added with add_timeout_callback, before it runs."""
222        self._execute_remover(callback_id, self._timeout_callback_removers)
223
224    def add_periodic_callback(self, callback, period_milliseconds, callback_id=None):
225        """ Adds a callback to be run every period_milliseconds until it is removed.
226        Returns an ID that can be used with remove_periodic_callback."""
227
228        cb = _AsyncPeriodic(callback, period_milliseconds, io_loop=self._loop)
229        callback_id = self._assign_remover(callback, callback_id, self._periodic_callback_removers, cb.stop)
230        cb.start()
231        return callback_id
232
233    def remove_periodic_callback(self, callback_id):
234        """ Removes a callback added with add_periodic_callback."""
235        self._execute_remover(callback_id, self._periodic_callback_removers)
236
237#-----------------------------------------------------------------------------
238# Code
239#-----------------------------------------------------------------------------
240