1# 2# Copyright 2009 Facebook 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""An I/O event loop for non-blocking sockets. 17 18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event 19loop, with a slightly different interface for historical reasons. 20Applications can use either the `.IOLoop` interface or the underlying 21`asyncio` event loop directly (unless compatibility with older 22versions of Tornado is desired, in which case `.IOLoop` must be used). 23 24Typical applications will use a single `IOLoop` object, accessed via 25`IOLoop.current` class method. The `IOLoop.start` method (or 26equivalently, `asyncio.AbstractEventLoop.run_forever`) should usually 27be called at the end of the ``main()`` function. Atypical applications 28may use more than one `IOLoop`, such as one `IOLoop` per thread, or 29per `unittest` case. 30 31""" 32 33import asyncio 34import concurrent.futures 35import datetime 36import functools 37import logging 38import numbers 39import os 40import sys 41import time 42import math 43import random 44 45from tornado.concurrent import ( 46 Future, 47 is_future, 48 chain_future, 49 future_set_exc_info, 50 future_add_done_callback, 51) 52from tornado.log import app_log 53from tornado.util import Configurable, TimeoutError, import_object 54 55import typing 56from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable 57 58if typing.TYPE_CHECKING: 59 from typing import Dict, List # noqa: F401 60 61 from typing_extensions import Protocol 62else: 63 Protocol = object 64 65 66class _Selectable(Protocol): 67 def fileno(self) -> int: 68 pass 69 70 def close(self) -> None: 71 pass 72 73 74_T = TypeVar("_T") 75_S = TypeVar("_S", bound=_Selectable) 76 77 78class IOLoop(Configurable): 79 """An I/O event loop. 80 81 As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event 82 loop. 83 84 Example usage for a simple TCP server: 85 86 .. testcode:: 87 88 import errno 89 import functools 90 import socket 91 92 import tornado.ioloop 93 from tornado.iostream import IOStream 94 95 async def handle_connection(connection, address): 96 stream = IOStream(connection) 97 message = await stream.read_until_close() 98 print("message from client:", message.decode().strip()) 99 100 def connection_ready(sock, fd, events): 101 while True: 102 try: 103 connection, address = sock.accept() 104 except BlockingIOError: 105 return 106 connection.setblocking(0) 107 io_loop = tornado.ioloop.IOLoop.current() 108 io_loop.spawn_callback(handle_connection, connection, address) 109 110 if __name__ == '__main__': 111 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 112 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 113 sock.setblocking(0) 114 sock.bind(("", 8888)) 115 sock.listen(128) 116 117 io_loop = tornado.ioloop.IOLoop.current() 118 callback = functools.partial(connection_ready, sock) 119 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) 120 io_loop.start() 121 122 .. testoutput:: 123 :hide: 124 125 By default, a newly-constructed `IOLoop` becomes the thread's current 126 `IOLoop`, unless there already is a current `IOLoop`. This behavior 127 can be controlled with the ``make_current`` argument to the `IOLoop` 128 constructor: if ``make_current=True``, the new `IOLoop` will always 129 try to become current and it raises an error if there is already a 130 current instance. If ``make_current=False``, the new `IOLoop` will 131 not try to become current. 132 133 In general, an `IOLoop` cannot survive a fork or be shared across 134 processes in any way. When multiple processes are being used, each 135 process should create its own `IOLoop`, which also implies that 136 any objects which depend on the `IOLoop` (such as 137 `.AsyncHTTPClient`) must also be created in the child processes. 138 As a guideline, anything that starts processes (including the 139 `tornado.process` and `multiprocessing` modules) should do so as 140 early as possible, ideally the first thing the application does 141 after loading its configuration in ``main()``. 142 143 .. versionchanged:: 4.2 144 Added the ``make_current`` keyword argument to the `IOLoop` 145 constructor. 146 147 .. versionchanged:: 5.0 148 149 Uses the `asyncio` event loop by default. The 150 ``IOLoop.configure`` method cannot be used on Python 3 except 151 to redundantly specify the `asyncio` event loop. 152 153 """ 154 155 # These constants were originally based on constants from the epoll module. 156 NONE = 0 157 READ = 0x001 158 WRITE = 0x004 159 ERROR = 0x018 160 161 # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops. 162 _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop] 163 164 @classmethod 165 def configure( 166 cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any 167 ) -> None: 168 if asyncio is not None: 169 from tornado.platform.asyncio import BaseAsyncIOLoop 170 171 if isinstance(impl, str): 172 impl = import_object(impl) 173 if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop): 174 raise RuntimeError( 175 "only AsyncIOLoop is allowed when asyncio is available" 176 ) 177 super(IOLoop, cls).configure(impl, **kwargs) 178 179 @staticmethod 180 def instance() -> "IOLoop": 181 """Deprecated alias for `IOLoop.current()`. 182 183 .. versionchanged:: 5.0 184 185 Previously, this method returned a global singleton 186 `IOLoop`, in contrast with the per-thread `IOLoop` returned 187 by `current()`. In nearly all cases the two were the same 188 (when they differed, it was generally used from non-Tornado 189 threads to communicate back to the main thread's `IOLoop`). 190 This distinction is not present in `asyncio`, so in order 191 to facilitate integration with that package `instance()` 192 was changed to be an alias to `current()`. Applications 193 using the cross-thread communications aspect of 194 `instance()` should instead set their own global variable 195 to point to the `IOLoop` they want to use. 196 197 .. deprecated:: 5.0 198 """ 199 return IOLoop.current() 200 201 def install(self) -> None: 202 """Deprecated alias for `make_current()`. 203 204 .. versionchanged:: 5.0 205 206 Previously, this method would set this `IOLoop` as the 207 global singleton used by `IOLoop.instance()`. Now that 208 `instance()` is an alias for `current()`, `install()` 209 is an alias for `make_current()`. 210 211 .. deprecated:: 5.0 212 """ 213 self.make_current() 214 215 @staticmethod 216 def clear_instance() -> None: 217 """Deprecated alias for `clear_current()`. 218 219 .. versionchanged:: 5.0 220 221 Previously, this method would clear the `IOLoop` used as 222 the global singleton by `IOLoop.instance()`. Now that 223 `instance()` is an alias for `current()`, 224 `clear_instance()` is an alias for `clear_current()`. 225 226 .. deprecated:: 5.0 227 228 """ 229 IOLoop.clear_current() 230 231 @typing.overload 232 @staticmethod 233 def current() -> "IOLoop": 234 pass 235 236 @typing.overload 237 @staticmethod 238 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 239 pass 240 241 @staticmethod 242 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 243 """Returns the current thread's `IOLoop`. 244 245 If an `IOLoop` is currently running or has been marked as 246 current by `make_current`, returns that instance. If there is 247 no current `IOLoop` and ``instance`` is true, creates one. 248 249 .. versionchanged:: 4.1 250 Added ``instance`` argument to control the fallback to 251 `IOLoop.instance()`. 252 .. versionchanged:: 5.0 253 On Python 3, control of the current `IOLoop` is delegated 254 to `asyncio`, with this and other methods as pass-through accessors. 255 The ``instance`` argument now controls whether an `IOLoop` 256 is created automatically when there is none, instead of 257 whether we fall back to `IOLoop.instance()` (which is now 258 an alias for this method). ``instance=False`` is deprecated, 259 since even if we do not create an `IOLoop`, this method 260 may initialize the asyncio loop. 261 """ 262 try: 263 loop = asyncio.get_event_loop() 264 except (RuntimeError, AssertionError): 265 if not instance: 266 return None 267 raise 268 try: 269 return IOLoop._ioloop_for_asyncio[loop] 270 except KeyError: 271 if instance: 272 from tornado.platform.asyncio import AsyncIOMainLoop 273 274 current = AsyncIOMainLoop(make_current=True) # type: Optional[IOLoop] 275 else: 276 current = None 277 return current 278 279 def make_current(self) -> None: 280 """Makes this the `IOLoop` for the current thread. 281 282 An `IOLoop` automatically becomes current for its thread 283 when it is started, but it is sometimes useful to call 284 `make_current` explicitly before starting the `IOLoop`, 285 so that code run at startup time can find the right 286 instance. 287 288 .. versionchanged:: 4.1 289 An `IOLoop` created while there is no current `IOLoop` 290 will automatically become current. 291 292 .. versionchanged:: 5.0 293 This method also sets the current `asyncio` event loop. 294 """ 295 # The asyncio event loops override this method. 296 raise NotImplementedError() 297 298 @staticmethod 299 def clear_current() -> None: 300 """Clears the `IOLoop` for the current thread. 301 302 Intended primarily for use by test frameworks in between tests. 303 304 .. versionchanged:: 5.0 305 This method also clears the current `asyncio` event loop. 306 """ 307 old = IOLoop.current(instance=False) 308 if old is not None: 309 old._clear_current_hook() 310 if asyncio is None: 311 IOLoop._current.instance = None 312 313 def _clear_current_hook(self) -> None: 314 """Instance method called when an IOLoop ceases to be current. 315 316 May be overridden by subclasses as a counterpart to make_current. 317 """ 318 pass 319 320 @classmethod 321 def configurable_base(cls) -> Type[Configurable]: 322 return IOLoop 323 324 @classmethod 325 def configurable_default(cls) -> Type[Configurable]: 326 from tornado.platform.asyncio import AsyncIOLoop 327 328 return AsyncIOLoop 329 330 def initialize(self, make_current: Optional[bool] = None) -> None: 331 if make_current is None: 332 if IOLoop.current(instance=False) is None: 333 self.make_current() 334 elif make_current: 335 current = IOLoop.current(instance=False) 336 # AsyncIO loops can already be current by this point. 337 if current is not None and current is not self: 338 raise RuntimeError("current IOLoop already exists") 339 self.make_current() 340 341 def close(self, all_fds: bool = False) -> None: 342 """Closes the `IOLoop`, freeing any resources used. 343 344 If ``all_fds`` is true, all file descriptors registered on the 345 IOLoop will be closed (not just the ones created by the 346 `IOLoop` itself). 347 348 Many applications will only use a single `IOLoop` that runs for the 349 entire lifetime of the process. In that case closing the `IOLoop` 350 is not necessary since everything will be cleaned up when the 351 process exits. `IOLoop.close` is provided mainly for scenarios 352 such as unit tests, which create and destroy a large number of 353 ``IOLoops``. 354 355 An `IOLoop` must be completely stopped before it can be closed. This 356 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must 357 be allowed to return before attempting to call `IOLoop.close()`. 358 Therefore the call to `close` will usually appear just after 359 the call to `start` rather than near the call to `stop`. 360 361 .. versionchanged:: 3.1 362 If the `IOLoop` implementation supports non-integer objects 363 for "file descriptors", those objects will have their 364 ``close`` method when ``all_fds`` is true. 365 """ 366 raise NotImplementedError() 367 368 @typing.overload 369 def add_handler( 370 self, fd: int, handler: Callable[[int, int], None], events: int 371 ) -> None: 372 pass 373 374 @typing.overload # noqa: F811 375 def add_handler( 376 self, fd: _S, handler: Callable[[_S, int], None], events: int 377 ) -> None: 378 pass 379 380 def add_handler( # noqa: F811 381 self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int 382 ) -> None: 383 """Registers the given handler to receive the given events for ``fd``. 384 385 The ``fd`` argument may either be an integer file descriptor or 386 a file-like object with a ``fileno()`` and ``close()`` method. 387 388 The ``events`` argument is a bitwise or of the constants 389 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. 390 391 When an event occurs, ``handler(fd, events)`` will be run. 392 393 .. versionchanged:: 4.0 394 Added the ability to pass file-like objects in addition to 395 raw file descriptors. 396 """ 397 raise NotImplementedError() 398 399 def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: 400 """Changes the events we listen for ``fd``. 401 402 .. versionchanged:: 4.0 403 Added the ability to pass file-like objects in addition to 404 raw file descriptors. 405 """ 406 raise NotImplementedError() 407 408 def remove_handler(self, fd: Union[int, _Selectable]) -> None: 409 """Stop listening for events on ``fd``. 410 411 .. versionchanged:: 4.0 412 Added the ability to pass file-like objects in addition to 413 raw file descriptors. 414 """ 415 raise NotImplementedError() 416 417 def start(self) -> None: 418 """Starts the I/O loop. 419 420 The loop will run until one of the callbacks calls `stop()`, which 421 will make the loop stop after the current event iteration completes. 422 """ 423 raise NotImplementedError() 424 425 def _setup_logging(self) -> None: 426 """The IOLoop catches and logs exceptions, so it's 427 important that log output be visible. However, python's 428 default behavior for non-root loggers (prior to python 429 3.2) is to print an unhelpful "no handlers could be 430 found" message rather than the actual log entry, so we 431 must explicitly configure logging if we've made it this 432 far without anything. 433 434 This method should be called from start() in subclasses. 435 """ 436 if not any( 437 [ 438 logging.getLogger().handlers, 439 logging.getLogger("tornado").handlers, 440 logging.getLogger("tornado.application").handlers, 441 ] 442 ): 443 logging.basicConfig() 444 445 def stop(self) -> None: 446 """Stop the I/O loop. 447 448 If the event loop is not currently running, the next call to `start()` 449 will return immediately. 450 451 Note that even after `stop` has been called, the `IOLoop` is not 452 completely stopped until `IOLoop.start` has also returned. 453 Some work that was scheduled before the call to `stop` may still 454 be run before the `IOLoop` shuts down. 455 """ 456 raise NotImplementedError() 457 458 def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any: 459 """Starts the `IOLoop`, runs the given function, and stops the loop. 460 461 The function must return either an awaitable object or 462 ``None``. If the function returns an awaitable object, the 463 `IOLoop` will run until the awaitable is resolved (and 464 `run_sync()` will return the awaitable's result). If it raises 465 an exception, the `IOLoop` will stop and the exception will be 466 re-raised to the caller. 467 468 The keyword-only argument ``timeout`` may be used to set 469 a maximum duration for the function. If the timeout expires, 470 a `tornado.util.TimeoutError` is raised. 471 472 This method is useful to allow asynchronous calls in a 473 ``main()`` function:: 474 475 async def main(): 476 # do stuff... 477 478 if __name__ == '__main__': 479 IOLoop.current().run_sync(main) 480 481 .. versionchanged:: 4.3 482 Returning a non-``None``, non-awaitable value is now an error. 483 484 .. versionchanged:: 5.0 485 If a timeout occurs, the ``func`` coroutine will be cancelled. 486 487 """ 488 future_cell = [None] # type: List[Optional[Future]] 489 490 def run() -> None: 491 try: 492 result = func() 493 if result is not None: 494 from tornado.gen import convert_yielded 495 496 result = convert_yielded(result) 497 except Exception: 498 fut = Future() # type: Future[Any] 499 future_cell[0] = fut 500 future_set_exc_info(fut, sys.exc_info()) 501 else: 502 if is_future(result): 503 future_cell[0] = result 504 else: 505 fut = Future() 506 future_cell[0] = fut 507 fut.set_result(result) 508 assert future_cell[0] is not None 509 self.add_future(future_cell[0], lambda future: self.stop()) 510 511 self.add_callback(run) 512 if timeout is not None: 513 514 def timeout_callback() -> None: 515 # If we can cancel the future, do so and wait on it. If not, 516 # Just stop the loop and return with the task still pending. 517 # (If we neither cancel nor wait for the task, a warning 518 # will be logged). 519 assert future_cell[0] is not None 520 if not future_cell[0].cancel(): 521 self.stop() 522 523 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) 524 self.start() 525 if timeout is not None: 526 self.remove_timeout(timeout_handle) 527 assert future_cell[0] is not None 528 if future_cell[0].cancelled() or not future_cell[0].done(): 529 raise TimeoutError("Operation timed out after %s seconds" % timeout) 530 return future_cell[0].result() 531 532 def time(self) -> float: 533 """Returns the current time according to the `IOLoop`'s clock. 534 535 The return value is a floating-point number relative to an 536 unspecified time in the past. 537 538 Historically, the IOLoop could be customized to use e.g. 539 `time.monotonic` instead of `time.time`, but this is not 540 currently supported and so this method is equivalent to 541 `time.time`. 542 543 """ 544 return time.time() 545 546 def add_timeout( 547 self, 548 deadline: Union[float, datetime.timedelta], 549 callback: Callable[..., None], 550 *args: Any, 551 **kwargs: Any 552 ) -> object: 553 """Runs the ``callback`` at the time ``deadline`` from the I/O loop. 554 555 Returns an opaque handle that may be passed to 556 `remove_timeout` to cancel. 557 558 ``deadline`` may be a number denoting a time (on the same 559 scale as `IOLoop.time`, normally `time.time`), or a 560 `datetime.timedelta` object for a deadline relative to the 561 current time. Since Tornado 4.0, `call_later` is a more 562 convenient alternative for the relative case since it does not 563 require a timedelta object. 564 565 Note that it is not safe to call `add_timeout` from other threads. 566 Instead, you must use `add_callback` to transfer control to the 567 `IOLoop`'s thread, and then call `add_timeout` from there. 568 569 Subclasses of IOLoop must implement either `add_timeout` or 570 `call_at`; the default implementations of each will call 571 the other. `call_at` is usually easier to implement, but 572 subclasses that wish to maintain compatibility with Tornado 573 versions prior to 4.0 must use `add_timeout` instead. 574 575 .. versionchanged:: 4.0 576 Now passes through ``*args`` and ``**kwargs`` to the callback. 577 """ 578 if isinstance(deadline, numbers.Real): 579 return self.call_at(deadline, callback, *args, **kwargs) 580 elif isinstance(deadline, datetime.timedelta): 581 return self.call_at( 582 self.time() + deadline.total_seconds(), callback, *args, **kwargs 583 ) 584 else: 585 raise TypeError("Unsupported deadline %r" % deadline) 586 587 def call_later( 588 self, delay: float, callback: Callable[..., None], *args: Any, **kwargs: Any 589 ) -> object: 590 """Runs the ``callback`` after ``delay`` seconds have passed. 591 592 Returns an opaque handle that may be passed to `remove_timeout` 593 to cancel. Note that unlike the `asyncio` method of the same 594 name, the returned object does not have a ``cancel()`` method. 595 596 See `add_timeout` for comments on thread-safety and subclassing. 597 598 .. versionadded:: 4.0 599 """ 600 return self.call_at(self.time() + delay, callback, *args, **kwargs) 601 602 def call_at( 603 self, when: float, callback: Callable[..., None], *args: Any, **kwargs: Any 604 ) -> object: 605 """Runs the ``callback`` at the absolute time designated by ``when``. 606 607 ``when`` must be a number using the same reference point as 608 `IOLoop.time`. 609 610 Returns an opaque handle that may be passed to `remove_timeout` 611 to cancel. Note that unlike the `asyncio` method of the same 612 name, the returned object does not have a ``cancel()`` method. 613 614 See `add_timeout` for comments on thread-safety and subclassing. 615 616 .. versionadded:: 4.0 617 """ 618 return self.add_timeout(when, callback, *args, **kwargs) 619 620 def remove_timeout(self, timeout: object) -> None: 621 """Cancels a pending timeout. 622 623 The argument is a handle as returned by `add_timeout`. It is 624 safe to call `remove_timeout` even if the callback has already 625 been run. 626 """ 627 raise NotImplementedError() 628 629 def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 630 """Calls the given callback on the next I/O loop iteration. 631 632 It is safe to call this method from any thread at any time, 633 except from a signal handler. Note that this is the **only** 634 method in `IOLoop` that makes this thread-safety guarantee; all 635 other interaction with the `IOLoop` must be done from that 636 `IOLoop`'s thread. `add_callback()` may be used to transfer 637 control from other threads to the `IOLoop`'s thread. 638 639 To add a callback from a signal handler, see 640 `add_callback_from_signal`. 641 """ 642 raise NotImplementedError() 643 644 def add_callback_from_signal( 645 self, callback: Callable, *args: Any, **kwargs: Any 646 ) -> None: 647 """Calls the given callback on the next I/O loop iteration. 648 649 Safe for use from a Python signal handler; should not be used 650 otherwise. 651 """ 652 raise NotImplementedError() 653 654 def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 655 """Calls the given callback on the next IOLoop iteration. 656 657 As of Tornado 6.0, this method is equivalent to `add_callback`. 658 659 .. versionadded:: 4.0 660 """ 661 self.add_callback(callback, *args, **kwargs) 662 663 def add_future( 664 self, 665 future: "Union[Future[_T], concurrent.futures.Future[_T]]", 666 callback: Callable[["Future[_T]"], None], 667 ) -> None: 668 """Schedules a callback on the ``IOLoop`` when the given 669 `.Future` is finished. 670 671 The callback is invoked with one argument, the 672 `.Future`. 673 674 This method only accepts `.Future` objects and not other 675 awaitables (unlike most of Tornado where the two are 676 interchangeable). 677 """ 678 if isinstance(future, Future): 679 # Note that we specifically do not want the inline behavior of 680 # tornado.concurrent.future_add_done_callback. We always want 681 # this callback scheduled on the next IOLoop iteration (which 682 # asyncio.Future always does). 683 # 684 # Wrap the callback in self._run_callback so we control 685 # the error logging (i.e. it goes to tornado.log.app_log 686 # instead of asyncio's log). 687 future.add_done_callback( 688 lambda f: self._run_callback(functools.partial(callback, future)) 689 ) 690 else: 691 assert is_future(future) 692 # For concurrent futures, we use self.add_callback, so 693 # it's fine if future_add_done_callback inlines that call. 694 future_add_done_callback( 695 future, lambda f: self.add_callback(callback, future) 696 ) 697 698 def run_in_executor( 699 self, 700 executor: Optional[concurrent.futures.Executor], 701 func: Callable[..., _T], 702 *args: Any 703 ) -> Awaitable[_T]: 704 """Runs a function in a ``concurrent.futures.Executor``. If 705 ``executor`` is ``None``, the IO loop's default executor will be used. 706 707 Use `functools.partial` to pass keyword arguments to ``func``. 708 709 .. versionadded:: 5.0 710 """ 711 if executor is None: 712 if not hasattr(self, "_executor"): 713 from tornado.process import cpu_count 714 715 self._executor = concurrent.futures.ThreadPoolExecutor( 716 max_workers=(cpu_count() * 5) 717 ) # type: concurrent.futures.Executor 718 executor = self._executor 719 c_future = executor.submit(func, *args) 720 # Concurrent Futures are not usable with await. Wrap this in a 721 # Tornado Future instead, using self.add_future for thread-safety. 722 t_future = Future() # type: Future[_T] 723 self.add_future(c_future, lambda f: chain_future(f, t_future)) 724 return t_future 725 726 def set_default_executor(self, executor: concurrent.futures.Executor) -> None: 727 """Sets the default executor to use with :meth:`run_in_executor`. 728 729 .. versionadded:: 5.0 730 """ 731 self._executor = executor 732 733 def _run_callback(self, callback: Callable[[], Any]) -> None: 734 """Runs a callback with error handling. 735 736 .. versionchanged:: 6.0 737 738 CancelledErrors are no longer logged. 739 """ 740 try: 741 ret = callback() 742 if ret is not None: 743 from tornado import gen 744 745 # Functions that return Futures typically swallow all 746 # exceptions and store them in the Future. If a Future 747 # makes it out to the IOLoop, ensure its exception (if any) 748 # gets logged too. 749 try: 750 ret = gen.convert_yielded(ret) 751 except gen.BadYieldError: 752 # It's not unusual for add_callback to be used with 753 # methods returning a non-None and non-yieldable 754 # result, which should just be ignored. 755 pass 756 else: 757 self.add_future(ret, self._discard_future_result) 758 except asyncio.CancelledError: 759 pass 760 except Exception: 761 app_log.error("Exception in callback %r", callback, exc_info=True) 762 763 def _discard_future_result(self, future: Future) -> None: 764 """Avoid unhandled-exception warnings from spawned coroutines.""" 765 future.result() 766 767 def split_fd( 768 self, fd: Union[int, _Selectable] 769 ) -> Tuple[int, Union[int, _Selectable]]: 770 # """Returns an (fd, obj) pair from an ``fd`` parameter. 771 772 # We accept both raw file descriptors and file-like objects as 773 # input to `add_handler` and related methods. When a file-like 774 # object is passed, we must retain the object itself so we can 775 # close it correctly when the `IOLoop` shuts down, but the 776 # poller interfaces favor file descriptors (they will accept 777 # file-like objects and call ``fileno()`` for you, but they 778 # always return the descriptor itself). 779 780 # This method is provided for use by `IOLoop` subclasses and should 781 # not generally be used by application code. 782 783 # .. versionadded:: 4.0 784 # """ 785 if isinstance(fd, int): 786 return fd, fd 787 return fd.fileno(), fd 788 789 def close_fd(self, fd: Union[int, _Selectable]) -> None: 790 # """Utility method to close an ``fd``. 791 792 # If ``fd`` is a file-like object, we close it directly; otherwise 793 # we use `os.close`. 794 795 # This method is provided for use by `IOLoop` subclasses (in 796 # implementations of ``IOLoop.close(all_fds=True)`` and should 797 # not generally be used by application code. 798 799 # .. versionadded:: 4.0 800 # """ 801 try: 802 if isinstance(fd, int): 803 os.close(fd) 804 else: 805 fd.close() 806 except OSError: 807 pass 808 809 810class _Timeout(object): 811 """An IOLoop timeout, a UNIX timestamp and a callback""" 812 813 # Reduce memory overhead when there are lots of pending callbacks 814 __slots__ = ["deadline", "callback", "tdeadline"] 815 816 def __init__( 817 self, deadline: float, callback: Callable[[], None], io_loop: IOLoop 818 ) -> None: 819 if not isinstance(deadline, numbers.Real): 820 raise TypeError("Unsupported deadline %r" % deadline) 821 self.deadline = deadline 822 self.callback = callback 823 self.tdeadline = ( 824 deadline, 825 next(io_loop._timeout_counter), 826 ) # type: Tuple[float, int] 827 828 # Comparison methods to sort by deadline, with object id as a tiebreaker 829 # to guarantee a consistent ordering. The heapq module uses __le__ 830 # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons 831 # use __lt__). 832 def __lt__(self, other: "_Timeout") -> bool: 833 return self.tdeadline < other.tdeadline 834 835 def __le__(self, other: "_Timeout") -> bool: 836 return self.tdeadline <= other.tdeadline 837 838 839class PeriodicCallback(object): 840 """Schedules the given callback to be called periodically. 841 842 The callback is called every ``callback_time`` milliseconds. 843 Note that the timeout is given in milliseconds, while most other 844 time-related functions in Tornado use seconds. 845 846 If ``jitter`` is specified, each callback time will be randomly selected 847 within a window of ``jitter * callback_time`` milliseconds. 848 Jitter can be used to reduce alignment of events with similar periods. 849 A jitter of 0.1 means allowing a 10% variation in callback time. 850 The window is centered on ``callback_time`` so the total number of calls 851 within a given interval should not be significantly affected by adding 852 jitter. 853 854 If the callback runs for longer than ``callback_time`` milliseconds, 855 subsequent invocations will be skipped to get back on schedule. 856 857 `start` must be called after the `PeriodicCallback` is created. 858 859 .. versionchanged:: 5.0 860 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 861 862 .. versionchanged:: 5.1 863 The ``jitter`` argument is added. 864 """ 865 866 def __init__( 867 self, callback: Callable[[], None], callback_time: float, jitter: float = 0 868 ) -> None: 869 self.callback = callback 870 if callback_time <= 0: 871 raise ValueError("Periodic callback must have a positive callback_time") 872 self.callback_time = callback_time 873 self.jitter = jitter 874 self._running = False 875 self._timeout = None # type: object 876 877 def start(self) -> None: 878 """Starts the timer.""" 879 # Looking up the IOLoop here allows to first instantiate the 880 # PeriodicCallback in another thread, then start it using 881 # IOLoop.add_callback(). 882 self.io_loop = IOLoop.current() 883 self._running = True 884 self._next_timeout = self.io_loop.time() 885 self._schedule_next() 886 887 def stop(self) -> None: 888 """Stops the timer.""" 889 self._running = False 890 if self._timeout is not None: 891 self.io_loop.remove_timeout(self._timeout) 892 self._timeout = None 893 894 def is_running(self) -> bool: 895 """Returns ``True`` if this `.PeriodicCallback` has been started. 896 897 .. versionadded:: 4.1 898 """ 899 return self._running 900 901 def _run(self) -> None: 902 if not self._running: 903 return 904 try: 905 return self.callback() 906 except Exception: 907 app_log.error("Exception in callback %r", self.callback, exc_info=True) 908 finally: 909 self._schedule_next() 910 911 def _schedule_next(self) -> None: 912 if self._running: 913 self._update_next(self.io_loop.time()) 914 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) 915 916 def _update_next(self, current_time: float) -> None: 917 callback_time_sec = self.callback_time / 1000.0 918 if self.jitter: 919 # apply jitter fraction 920 callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) 921 if self._next_timeout <= current_time: 922 # The period should be measured from the start of one call 923 # to the start of the next. If one call takes too long, 924 # skip cycles to get back to a multiple of the original 925 # schedule. 926 self._next_timeout += ( 927 math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 928 ) * callback_time_sec 929 else: 930 # If the clock moved backwards, ensure we advance the next 931 # timeout instead of recomputing the same value again. 932 # This may result in long gaps between callbacks if the 933 # clock jumps backwards by a lot, but the far more common 934 # scenario is a small NTP adjustment that should just be 935 # ignored. 936 # 937 # Note that on some systems if time.time() runs slower 938 # than time.monotonic() (most common on windows), we 939 # effectively experience a small backwards time jump on 940 # every iteration because PeriodicCallback uses 941 # time.time() while asyncio schedules callbacks using 942 # time.monotonic(). 943 # https://github.com/tornadoweb/tornado/issues/2333 944 self._next_timeout += callback_time_sec 945