1#
2# Copyright 2009 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""An I/O event loop for non-blocking sockets.
17
18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event
19loop, with a slightly different interface for historical reasons.
20Applications can use either the `.IOLoop` interface or the underlying
21`asyncio` event loop directly (unless compatibility with older
22versions of Tornado is desired, in which case `.IOLoop` must be used).
23
24Typical applications will use a single `IOLoop` object, accessed via
25`IOLoop.current` class method. The `IOLoop.start` method (or
26equivalently, `asyncio.AbstractEventLoop.run_forever`) should usually
27be called at the end of the ``main()`` function. Atypical applications
28may use more than one `IOLoop`, such as one `IOLoop` per thread, or
29per `unittest` case.
30
31"""
32
33import asyncio
34import concurrent.futures
35import datetime
36import functools
37import logging
38import numbers
39import os
40import sys
41import time
42import math
43import random
44
45from tornado.concurrent import (
46    Future,
47    is_future,
48    chain_future,
49    future_set_exc_info,
50    future_add_done_callback,
51)
52from tornado.log import app_log
53from tornado.util import Configurable, TimeoutError, import_object
54
55import typing
56from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable
57
58if typing.TYPE_CHECKING:
59    from typing import Dict, List  # noqa: F401
60
61    from typing_extensions import Protocol
62else:
63    Protocol = object
64
65
66class _Selectable(Protocol):
67    def fileno(self) -> int:
68        pass
69
70    def close(self) -> None:
71        pass
72
73
74_T = TypeVar("_T")
75_S = TypeVar("_S", bound=_Selectable)
76
77
78class IOLoop(Configurable):
79    """An I/O event loop.
80
81    As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event
82    loop.
83
84    Example usage for a simple TCP server:
85
86    .. testcode::
87
88        import errno
89        import functools
90        import socket
91
92        import tornado.ioloop
93        from tornado.iostream import IOStream
94
95        async def handle_connection(connection, address):
96            stream = IOStream(connection)
97            message = await stream.read_until_close()
98            print("message from client:", message.decode().strip())
99
100        def connection_ready(sock, fd, events):
101            while True:
102                try:
103                    connection, address = sock.accept()
104                except BlockingIOError:
105                    return
106                connection.setblocking(0)
107                io_loop = tornado.ioloop.IOLoop.current()
108                io_loop.spawn_callback(handle_connection, connection, address)
109
110        if __name__ == '__main__':
111            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
112            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
113            sock.setblocking(0)
114            sock.bind(("", 8888))
115            sock.listen(128)
116
117            io_loop = tornado.ioloop.IOLoop.current()
118            callback = functools.partial(connection_ready, sock)
119            io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
120            io_loop.start()
121
122    .. testoutput::
123       :hide:
124
125    By default, a newly-constructed `IOLoop` becomes the thread's current
126    `IOLoop`, unless there already is a current `IOLoop`. This behavior
127    can be controlled with the ``make_current`` argument to the `IOLoop`
128    constructor: if ``make_current=True``, the new `IOLoop` will always
129    try to become current and it raises an error if there is already a
130    current instance. If ``make_current=False``, the new `IOLoop` will
131    not try to become current.
132
133    In general, an `IOLoop` cannot survive a fork or be shared across
134    processes in any way. When multiple processes are being used, each
135    process should create its own `IOLoop`, which also implies that
136    any objects which depend on the `IOLoop` (such as
137    `.AsyncHTTPClient`) must also be created in the child processes.
138    As a guideline, anything that starts processes (including the
139    `tornado.process` and `multiprocessing` modules) should do so as
140    early as possible, ideally the first thing the application does
141    after loading its configuration in ``main()``.
142
143    .. versionchanged:: 4.2
144       Added the ``make_current`` keyword argument to the `IOLoop`
145       constructor.
146
147    .. versionchanged:: 5.0
148
149       Uses the `asyncio` event loop by default. The
150       ``IOLoop.configure`` method cannot be used on Python 3 except
151       to redundantly specify the `asyncio` event loop.
152
153    """
154
155    # These constants were originally based on constants from the epoll module.
156    NONE = 0
157    READ = 0x001
158    WRITE = 0x004
159    ERROR = 0x018
160
161    # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
162    _ioloop_for_asyncio = dict()  # type: Dict[asyncio.AbstractEventLoop, IOLoop]
163
164    @classmethod
165    def configure(
166        cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any
167    ) -> None:
168        if asyncio is not None:
169            from tornado.platform.asyncio import BaseAsyncIOLoop
170
171            if isinstance(impl, str):
172                impl = import_object(impl)
173            if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop):
174                raise RuntimeError(
175                    "only AsyncIOLoop is allowed when asyncio is available"
176                )
177        super(IOLoop, cls).configure(impl, **kwargs)
178
179    @staticmethod
180    def instance() -> "IOLoop":
181        """Deprecated alias for `IOLoop.current()`.
182
183        .. versionchanged:: 5.0
184
185           Previously, this method returned a global singleton
186           `IOLoop`, in contrast with the per-thread `IOLoop` returned
187           by `current()`. In nearly all cases the two were the same
188           (when they differed, it was generally used from non-Tornado
189           threads to communicate back to the main thread's `IOLoop`).
190           This distinction is not present in `asyncio`, so in order
191           to facilitate integration with that package `instance()`
192           was changed to be an alias to `current()`. Applications
193           using the cross-thread communications aspect of
194           `instance()` should instead set their own global variable
195           to point to the `IOLoop` they want to use.
196
197        .. deprecated:: 5.0
198        """
199        return IOLoop.current()
200
201    def install(self) -> None:
202        """Deprecated alias for `make_current()`.
203
204        .. versionchanged:: 5.0
205
206           Previously, this method would set this `IOLoop` as the
207           global singleton used by `IOLoop.instance()`. Now that
208           `instance()` is an alias for `current()`, `install()`
209           is an alias for `make_current()`.
210
211        .. deprecated:: 5.0
212        """
213        self.make_current()
214
215    @staticmethod
216    def clear_instance() -> None:
217        """Deprecated alias for `clear_current()`.
218
219        .. versionchanged:: 5.0
220
221           Previously, this method would clear the `IOLoop` used as
222           the global singleton by `IOLoop.instance()`. Now that
223           `instance()` is an alias for `current()`,
224           `clear_instance()` is an alias for `clear_current()`.
225
226        .. deprecated:: 5.0
227
228        """
229        IOLoop.clear_current()
230
231    @typing.overload
232    @staticmethod
233    def current() -> "IOLoop":
234        pass
235
236    @typing.overload
237    @staticmethod
238    def current(instance: bool = True) -> Optional["IOLoop"]:  # noqa: F811
239        pass
240
241    @staticmethod
242    def current(instance: bool = True) -> Optional["IOLoop"]:  # noqa: F811
243        """Returns the current thread's `IOLoop`.
244
245        If an `IOLoop` is currently running or has been marked as
246        current by `make_current`, returns that instance.  If there is
247        no current `IOLoop` and ``instance`` is true, creates one.
248
249        .. versionchanged:: 4.1
250           Added ``instance`` argument to control the fallback to
251           `IOLoop.instance()`.
252        .. versionchanged:: 5.0
253           On Python 3, control of the current `IOLoop` is delegated
254           to `asyncio`, with this and other methods as pass-through accessors.
255           The ``instance`` argument now controls whether an `IOLoop`
256           is created automatically when there is none, instead of
257           whether we fall back to `IOLoop.instance()` (which is now
258           an alias for this method). ``instance=False`` is deprecated,
259           since even if we do not create an `IOLoop`, this method
260           may initialize the asyncio loop.
261        """
262        try:
263            loop = asyncio.get_event_loop()
264        except (RuntimeError, AssertionError):
265            if not instance:
266                return None
267            raise
268        try:
269            return IOLoop._ioloop_for_asyncio[loop]
270        except KeyError:
271            if instance:
272                from tornado.platform.asyncio import AsyncIOMainLoop
273
274                current = AsyncIOMainLoop(make_current=True)  # type: Optional[IOLoop]
275            else:
276                current = None
277        return current
278
279    def make_current(self) -> None:
280        """Makes this the `IOLoop` for the current thread.
281
282        An `IOLoop` automatically becomes current for its thread
283        when it is started, but it is sometimes useful to call
284        `make_current` explicitly before starting the `IOLoop`,
285        so that code run at startup time can find the right
286        instance.
287
288        .. versionchanged:: 4.1
289           An `IOLoop` created while there is no current `IOLoop`
290           will automatically become current.
291
292        .. versionchanged:: 5.0
293           This method also sets the current `asyncio` event loop.
294        """
295        # The asyncio event loops override this method.
296        raise NotImplementedError()
297
298    @staticmethod
299    def clear_current() -> None:
300        """Clears the `IOLoop` for the current thread.
301
302        Intended primarily for use by test frameworks in between tests.
303
304        .. versionchanged:: 5.0
305           This method also clears the current `asyncio` event loop.
306        """
307        old = IOLoop.current(instance=False)
308        if old is not None:
309            old._clear_current_hook()
310        if asyncio is None:
311            IOLoop._current.instance = None
312
313    def _clear_current_hook(self) -> None:
314        """Instance method called when an IOLoop ceases to be current.
315
316        May be overridden by subclasses as a counterpart to make_current.
317        """
318        pass
319
320    @classmethod
321    def configurable_base(cls) -> Type[Configurable]:
322        return IOLoop
323
324    @classmethod
325    def configurable_default(cls) -> Type[Configurable]:
326        from tornado.platform.asyncio import AsyncIOLoop
327
328        return AsyncIOLoop
329
330    def initialize(self, make_current: Optional[bool] = None) -> None:
331        if make_current is None:
332            if IOLoop.current(instance=False) is None:
333                self.make_current()
334        elif make_current:
335            current = IOLoop.current(instance=False)
336            # AsyncIO loops can already be current by this point.
337            if current is not None and current is not self:
338                raise RuntimeError("current IOLoop already exists")
339            self.make_current()
340
341    def close(self, all_fds: bool = False) -> None:
342        """Closes the `IOLoop`, freeing any resources used.
343
344        If ``all_fds`` is true, all file descriptors registered on the
345        IOLoop will be closed (not just the ones created by the
346        `IOLoop` itself).
347
348        Many applications will only use a single `IOLoop` that runs for the
349        entire lifetime of the process.  In that case closing the `IOLoop`
350        is not necessary since everything will be cleaned up when the
351        process exits.  `IOLoop.close` is provided mainly for scenarios
352        such as unit tests, which create and destroy a large number of
353        ``IOLoops``.
354
355        An `IOLoop` must be completely stopped before it can be closed.  This
356        means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
357        be allowed to return before attempting to call `IOLoop.close()`.
358        Therefore the call to `close` will usually appear just after
359        the call to `start` rather than near the call to `stop`.
360
361        .. versionchanged:: 3.1
362           If the `IOLoop` implementation supports non-integer objects
363           for "file descriptors", those objects will have their
364           ``close`` method when ``all_fds`` is true.
365        """
366        raise NotImplementedError()
367
368    @typing.overload
369    def add_handler(
370        self, fd: int, handler: Callable[[int, int], None], events: int
371    ) -> None:
372        pass
373
374    @typing.overload  # noqa: F811
375    def add_handler(
376        self, fd: _S, handler: Callable[[_S, int], None], events: int
377    ) -> None:
378        pass
379
380    def add_handler(  # noqa: F811
381        self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int
382    ) -> None:
383        """Registers the given handler to receive the given events for ``fd``.
384
385        The ``fd`` argument may either be an integer file descriptor or
386        a file-like object with a ``fileno()`` and ``close()`` method.
387
388        The ``events`` argument is a bitwise or of the constants
389        ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
390
391        When an event occurs, ``handler(fd, events)`` will be run.
392
393        .. versionchanged:: 4.0
394           Added the ability to pass file-like objects in addition to
395           raw file descriptors.
396        """
397        raise NotImplementedError()
398
399    def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
400        """Changes the events we listen for ``fd``.
401
402        .. versionchanged:: 4.0
403           Added the ability to pass file-like objects in addition to
404           raw file descriptors.
405        """
406        raise NotImplementedError()
407
408    def remove_handler(self, fd: Union[int, _Selectable]) -> None:
409        """Stop listening for events on ``fd``.
410
411        .. versionchanged:: 4.0
412           Added the ability to pass file-like objects in addition to
413           raw file descriptors.
414        """
415        raise NotImplementedError()
416
417    def start(self) -> None:
418        """Starts the I/O loop.
419
420        The loop will run until one of the callbacks calls `stop()`, which
421        will make the loop stop after the current event iteration completes.
422        """
423        raise NotImplementedError()
424
425    def _setup_logging(self) -> None:
426        """The IOLoop catches and logs exceptions, so it's
427        important that log output be visible.  However, python's
428        default behavior for non-root loggers (prior to python
429        3.2) is to print an unhelpful "no handlers could be
430        found" message rather than the actual log entry, so we
431        must explicitly configure logging if we've made it this
432        far without anything.
433
434        This method should be called from start() in subclasses.
435        """
436        if not any(
437            [
438                logging.getLogger().handlers,
439                logging.getLogger("tornado").handlers,
440                logging.getLogger("tornado.application").handlers,
441            ]
442        ):
443            logging.basicConfig()
444
445    def stop(self) -> None:
446        """Stop the I/O loop.
447
448        If the event loop is not currently running, the next call to `start()`
449        will return immediately.
450
451        Note that even after `stop` has been called, the `IOLoop` is not
452        completely stopped until `IOLoop.start` has also returned.
453        Some work that was scheduled before the call to `stop` may still
454        be run before the `IOLoop` shuts down.
455        """
456        raise NotImplementedError()
457
458    def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any:
459        """Starts the `IOLoop`, runs the given function, and stops the loop.
460
461        The function must return either an awaitable object or
462        ``None``. If the function returns an awaitable object, the
463        `IOLoop` will run until the awaitable is resolved (and
464        `run_sync()` will return the awaitable's result). If it raises
465        an exception, the `IOLoop` will stop and the exception will be
466        re-raised to the caller.
467
468        The keyword-only argument ``timeout`` may be used to set
469        a maximum duration for the function.  If the timeout expires,
470        a `tornado.util.TimeoutError` is raised.
471
472        This method is useful to allow asynchronous calls in a
473        ``main()`` function::
474
475            async def main():
476                # do stuff...
477
478            if __name__ == '__main__':
479                IOLoop.current().run_sync(main)
480
481        .. versionchanged:: 4.3
482           Returning a non-``None``, non-awaitable value is now an error.
483
484        .. versionchanged:: 5.0
485           If a timeout occurs, the ``func`` coroutine will be cancelled.
486
487        """
488        future_cell = [None]  # type: List[Optional[Future]]
489
490        def run() -> None:
491            try:
492                result = func()
493                if result is not None:
494                    from tornado.gen import convert_yielded
495
496                    result = convert_yielded(result)
497            except Exception:
498                fut = Future()  # type: Future[Any]
499                future_cell[0] = fut
500                future_set_exc_info(fut, sys.exc_info())
501            else:
502                if is_future(result):
503                    future_cell[0] = result
504                else:
505                    fut = Future()
506                    future_cell[0] = fut
507                    fut.set_result(result)
508            assert future_cell[0] is not None
509            self.add_future(future_cell[0], lambda future: self.stop())
510
511        self.add_callback(run)
512        if timeout is not None:
513
514            def timeout_callback() -> None:
515                # If we can cancel the future, do so and wait on it. If not,
516                # Just stop the loop and return with the task still pending.
517                # (If we neither cancel nor wait for the task, a warning
518                # will be logged).
519                assert future_cell[0] is not None
520                if not future_cell[0].cancel():
521                    self.stop()
522
523            timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
524        self.start()
525        if timeout is not None:
526            self.remove_timeout(timeout_handle)
527        assert future_cell[0] is not None
528        if future_cell[0].cancelled() or not future_cell[0].done():
529            raise TimeoutError("Operation timed out after %s seconds" % timeout)
530        return future_cell[0].result()
531
532    def time(self) -> float:
533        """Returns the current time according to the `IOLoop`'s clock.
534
535        The return value is a floating-point number relative to an
536        unspecified time in the past.
537
538        Historically, the IOLoop could be customized to use e.g.
539        `time.monotonic` instead of `time.time`, but this is not
540        currently supported and so this method is equivalent to
541        `time.time`.
542
543        """
544        return time.time()
545
546    def add_timeout(
547        self,
548        deadline: Union[float, datetime.timedelta],
549        callback: Callable[..., None],
550        *args: Any,
551        **kwargs: Any
552    ) -> object:
553        """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
554
555        Returns an opaque handle that may be passed to
556        `remove_timeout` to cancel.
557
558        ``deadline`` may be a number denoting a time (on the same
559        scale as `IOLoop.time`, normally `time.time`), or a
560        `datetime.timedelta` object for a deadline relative to the
561        current time.  Since Tornado 4.0, `call_later` is a more
562        convenient alternative for the relative case since it does not
563        require a timedelta object.
564
565        Note that it is not safe to call `add_timeout` from other threads.
566        Instead, you must use `add_callback` to transfer control to the
567        `IOLoop`'s thread, and then call `add_timeout` from there.
568
569        Subclasses of IOLoop must implement either `add_timeout` or
570        `call_at`; the default implementations of each will call
571        the other.  `call_at` is usually easier to implement, but
572        subclasses that wish to maintain compatibility with Tornado
573        versions prior to 4.0 must use `add_timeout` instead.
574
575        .. versionchanged:: 4.0
576           Now passes through ``*args`` and ``**kwargs`` to the callback.
577        """
578        if isinstance(deadline, numbers.Real):
579            return self.call_at(deadline, callback, *args, **kwargs)
580        elif isinstance(deadline, datetime.timedelta):
581            return self.call_at(
582                self.time() + deadline.total_seconds(), callback, *args, **kwargs
583            )
584        else:
585            raise TypeError("Unsupported deadline %r" % deadline)
586
587    def call_later(
588        self, delay: float, callback: Callable[..., None], *args: Any, **kwargs: Any
589    ) -> object:
590        """Runs the ``callback`` after ``delay`` seconds have passed.
591
592        Returns an opaque handle that may be passed to `remove_timeout`
593        to cancel.  Note that unlike the `asyncio` method of the same
594        name, the returned object does not have a ``cancel()`` method.
595
596        See `add_timeout` for comments on thread-safety and subclassing.
597
598        .. versionadded:: 4.0
599        """
600        return self.call_at(self.time() + delay, callback, *args, **kwargs)
601
602    def call_at(
603        self, when: float, callback: Callable[..., None], *args: Any, **kwargs: Any
604    ) -> object:
605        """Runs the ``callback`` at the absolute time designated by ``when``.
606
607        ``when`` must be a number using the same reference point as
608        `IOLoop.time`.
609
610        Returns an opaque handle that may be passed to `remove_timeout`
611        to cancel.  Note that unlike the `asyncio` method of the same
612        name, the returned object does not have a ``cancel()`` method.
613
614        See `add_timeout` for comments on thread-safety and subclassing.
615
616        .. versionadded:: 4.0
617        """
618        return self.add_timeout(when, callback, *args, **kwargs)
619
620    def remove_timeout(self, timeout: object) -> None:
621        """Cancels a pending timeout.
622
623        The argument is a handle as returned by `add_timeout`.  It is
624        safe to call `remove_timeout` even if the callback has already
625        been run.
626        """
627        raise NotImplementedError()
628
629    def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
630        """Calls the given callback on the next I/O loop iteration.
631
632        It is safe to call this method from any thread at any time,
633        except from a signal handler.  Note that this is the **only**
634        method in `IOLoop` that makes this thread-safety guarantee; all
635        other interaction with the `IOLoop` must be done from that
636        `IOLoop`'s thread.  `add_callback()` may be used to transfer
637        control from other threads to the `IOLoop`'s thread.
638
639        To add a callback from a signal handler, see
640        `add_callback_from_signal`.
641        """
642        raise NotImplementedError()
643
644    def add_callback_from_signal(
645        self, callback: Callable, *args: Any, **kwargs: Any
646    ) -> None:
647        """Calls the given callback on the next I/O loop iteration.
648
649        Safe for use from a Python signal handler; should not be used
650        otherwise.
651        """
652        raise NotImplementedError()
653
654    def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
655        """Calls the given callback on the next IOLoop iteration.
656
657        As of Tornado 6.0, this method is equivalent to `add_callback`.
658
659        .. versionadded:: 4.0
660        """
661        self.add_callback(callback, *args, **kwargs)
662
663    def add_future(
664        self,
665        future: "Union[Future[_T], concurrent.futures.Future[_T]]",
666        callback: Callable[["Future[_T]"], None],
667    ) -> None:
668        """Schedules a callback on the ``IOLoop`` when the given
669        `.Future` is finished.
670
671        The callback is invoked with one argument, the
672        `.Future`.
673
674        This method only accepts `.Future` objects and not other
675        awaitables (unlike most of Tornado where the two are
676        interchangeable).
677        """
678        if isinstance(future, Future):
679            # Note that we specifically do not want the inline behavior of
680            # tornado.concurrent.future_add_done_callback. We always want
681            # this callback scheduled on the next IOLoop iteration (which
682            # asyncio.Future always does).
683            #
684            # Wrap the callback in self._run_callback so we control
685            # the error logging (i.e. it goes to tornado.log.app_log
686            # instead of asyncio's log).
687            future.add_done_callback(
688                lambda f: self._run_callback(functools.partial(callback, future))
689            )
690        else:
691            assert is_future(future)
692            # For concurrent futures, we use self.add_callback, so
693            # it's fine if future_add_done_callback inlines that call.
694            future_add_done_callback(
695                future, lambda f: self.add_callback(callback, future)
696            )
697
698    def run_in_executor(
699        self,
700        executor: Optional[concurrent.futures.Executor],
701        func: Callable[..., _T],
702        *args: Any
703    ) -> Awaitable[_T]:
704        """Runs a function in a ``concurrent.futures.Executor``. If
705        ``executor`` is ``None``, the IO loop's default executor will be used.
706
707        Use `functools.partial` to pass keyword arguments to ``func``.
708
709        .. versionadded:: 5.0
710        """
711        if executor is None:
712            if not hasattr(self, "_executor"):
713                from tornado.process import cpu_count
714
715                self._executor = concurrent.futures.ThreadPoolExecutor(
716                    max_workers=(cpu_count() * 5)
717                )  # type: concurrent.futures.Executor
718            executor = self._executor
719        c_future = executor.submit(func, *args)
720        # Concurrent Futures are not usable with await. Wrap this in a
721        # Tornado Future instead, using self.add_future for thread-safety.
722        t_future = Future()  # type: Future[_T]
723        self.add_future(c_future, lambda f: chain_future(f, t_future))
724        return t_future
725
726    def set_default_executor(self, executor: concurrent.futures.Executor) -> None:
727        """Sets the default executor to use with :meth:`run_in_executor`.
728
729        .. versionadded:: 5.0
730        """
731        self._executor = executor
732
733    def _run_callback(self, callback: Callable[[], Any]) -> None:
734        """Runs a callback with error handling.
735
736        .. versionchanged:: 6.0
737
738           CancelledErrors are no longer logged.
739        """
740        try:
741            ret = callback()
742            if ret is not None:
743                from tornado import gen
744
745                # Functions that return Futures typically swallow all
746                # exceptions and store them in the Future.  If a Future
747                # makes it out to the IOLoop, ensure its exception (if any)
748                # gets logged too.
749                try:
750                    ret = gen.convert_yielded(ret)
751                except gen.BadYieldError:
752                    # It's not unusual for add_callback to be used with
753                    # methods returning a non-None and non-yieldable
754                    # result, which should just be ignored.
755                    pass
756                else:
757                    self.add_future(ret, self._discard_future_result)
758        except asyncio.CancelledError:
759            pass
760        except Exception:
761            app_log.error("Exception in callback %r", callback, exc_info=True)
762
763    def _discard_future_result(self, future: Future) -> None:
764        """Avoid unhandled-exception warnings from spawned coroutines."""
765        future.result()
766
767    def split_fd(
768        self, fd: Union[int, _Selectable]
769    ) -> Tuple[int, Union[int, _Selectable]]:
770        # """Returns an (fd, obj) pair from an ``fd`` parameter.
771
772        # We accept both raw file descriptors and file-like objects as
773        # input to `add_handler` and related methods.  When a file-like
774        # object is passed, we must retain the object itself so we can
775        # close it correctly when the `IOLoop` shuts down, but the
776        # poller interfaces favor file descriptors (they will accept
777        # file-like objects and call ``fileno()`` for you, but they
778        # always return the descriptor itself).
779
780        # This method is provided for use by `IOLoop` subclasses and should
781        # not generally be used by application code.
782
783        # .. versionadded:: 4.0
784        # """
785        if isinstance(fd, int):
786            return fd, fd
787        return fd.fileno(), fd
788
789    def close_fd(self, fd: Union[int, _Selectable]) -> None:
790        # """Utility method to close an ``fd``.
791
792        # If ``fd`` is a file-like object, we close it directly; otherwise
793        # we use `os.close`.
794
795        # This method is provided for use by `IOLoop` subclasses (in
796        # implementations of ``IOLoop.close(all_fds=True)`` and should
797        # not generally be used by application code.
798
799        # .. versionadded:: 4.0
800        # """
801        try:
802            if isinstance(fd, int):
803                os.close(fd)
804            else:
805                fd.close()
806        except OSError:
807            pass
808
809
810class _Timeout(object):
811    """An IOLoop timeout, a UNIX timestamp and a callback"""
812
813    # Reduce memory overhead when there are lots of pending callbacks
814    __slots__ = ["deadline", "callback", "tdeadline"]
815
816    def __init__(
817        self, deadline: float, callback: Callable[[], None], io_loop: IOLoop
818    ) -> None:
819        if not isinstance(deadline, numbers.Real):
820            raise TypeError("Unsupported deadline %r" % deadline)
821        self.deadline = deadline
822        self.callback = callback
823        self.tdeadline = (
824            deadline,
825            next(io_loop._timeout_counter),
826        )  # type: Tuple[float, int]
827
828    # Comparison methods to sort by deadline, with object id as a tiebreaker
829    # to guarantee a consistent ordering.  The heapq module uses __le__
830    # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
831    # use __lt__).
832    def __lt__(self, other: "_Timeout") -> bool:
833        return self.tdeadline < other.tdeadline
834
835    def __le__(self, other: "_Timeout") -> bool:
836        return self.tdeadline <= other.tdeadline
837
838
839class PeriodicCallback(object):
840    """Schedules the given callback to be called periodically.
841
842    The callback is called every ``callback_time`` milliseconds.
843    Note that the timeout is given in milliseconds, while most other
844    time-related functions in Tornado use seconds.
845
846    If ``jitter`` is specified, each callback time will be randomly selected
847    within a window of ``jitter * callback_time`` milliseconds.
848    Jitter can be used to reduce alignment of events with similar periods.
849    A jitter of 0.1 means allowing a 10% variation in callback time.
850    The window is centered on ``callback_time`` so the total number of calls
851    within a given interval should not be significantly affected by adding
852    jitter.
853
854    If the callback runs for longer than ``callback_time`` milliseconds,
855    subsequent invocations will be skipped to get back on schedule.
856
857    `start` must be called after the `PeriodicCallback` is created.
858
859    .. versionchanged:: 5.0
860       The ``io_loop`` argument (deprecated since version 4.1) has been removed.
861
862    .. versionchanged:: 5.1
863       The ``jitter`` argument is added.
864    """
865
866    def __init__(
867        self, callback: Callable[[], None], callback_time: float, jitter: float = 0
868    ) -> None:
869        self.callback = callback
870        if callback_time <= 0:
871            raise ValueError("Periodic callback must have a positive callback_time")
872        self.callback_time = callback_time
873        self.jitter = jitter
874        self._running = False
875        self._timeout = None  # type: object
876
877    def start(self) -> None:
878        """Starts the timer."""
879        # Looking up the IOLoop here allows to first instantiate the
880        # PeriodicCallback in another thread, then start it using
881        # IOLoop.add_callback().
882        self.io_loop = IOLoop.current()
883        self._running = True
884        self._next_timeout = self.io_loop.time()
885        self._schedule_next()
886
887    def stop(self) -> None:
888        """Stops the timer."""
889        self._running = False
890        if self._timeout is not None:
891            self.io_loop.remove_timeout(self._timeout)
892            self._timeout = None
893
894    def is_running(self) -> bool:
895        """Returns ``True`` if this `.PeriodicCallback` has been started.
896
897        .. versionadded:: 4.1
898        """
899        return self._running
900
901    def _run(self) -> None:
902        if not self._running:
903            return
904        try:
905            return self.callback()
906        except Exception:
907            app_log.error("Exception in callback %r", self.callback, exc_info=True)
908        finally:
909            self._schedule_next()
910
911    def _schedule_next(self) -> None:
912        if self._running:
913            self._update_next(self.io_loop.time())
914            self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
915
916    def _update_next(self, current_time: float) -> None:
917        callback_time_sec = self.callback_time / 1000.0
918        if self.jitter:
919            # apply jitter fraction
920            callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5))
921        if self._next_timeout <= current_time:
922            # The period should be measured from the start of one call
923            # to the start of the next. If one call takes too long,
924            # skip cycles to get back to a multiple of the original
925            # schedule.
926            self._next_timeout += (
927                math.floor((current_time - self._next_timeout) / callback_time_sec) + 1
928            ) * callback_time_sec
929        else:
930            # If the clock moved backwards, ensure we advance the next
931            # timeout instead of recomputing the same value again.
932            # This may result in long gaps between callbacks if the
933            # clock jumps backwards by a lot, but the far more common
934            # scenario is a small NTP adjustment that should just be
935            # ignored.
936            #
937            # Note that on some systems if time.time() runs slower
938            # than time.monotonic() (most common on windows), we
939            # effectively experience a small backwards time jump on
940            # every iteration because PeriodicCallback uses
941            # time.time() while asyncio schedules callbacks using
942            # time.monotonic().
943            # https://github.com/tornadoweb/tornado/issues/2333
944            self._next_timeout += callback_time_sec
945