1"""``tornado.gen`` implements generator-based coroutines. 2 3.. note:: 4 5 The "decorator and generator" approach in this module is a 6 precursor to native coroutines (using ``async def`` and ``await``) 7 which were introduced in Python 3.5. Applications that do not 8 require compatibility with older versions of Python should use 9 native coroutines instead. Some parts of this module are still 10 useful with native coroutines, notably `multi`, `sleep`, 11 `WaitIterator`, and `with_timeout`. Some of these functions have 12 counterparts in the `asyncio` module which may be used as well, 13 although the two may not necessarily be 100% compatible. 14 15Coroutines provide an easier way to work in an asynchronous 16environment than chaining callbacks. Code using coroutines is 17technically asynchronous, but it is written as a single generator 18instead of a collection of separate functions. 19 20For example, the following callback-based asynchronous handler: 21 22.. testcode:: 23 24 class AsyncHandler(RequestHandler): 25 @asynchronous 26 def get(self): 27 http_client = AsyncHTTPClient() 28 http_client.fetch("http://example.com", 29 callback=self.on_fetch) 30 31 def on_fetch(self, response): 32 do_something_with_response(response) 33 self.render("template.html") 34 35.. testoutput:: 36 :hide: 37 38could be written with ``gen`` as: 39 40.. testcode:: 41 42 class GenAsyncHandler(RequestHandler): 43 @gen.coroutine 44 def get(self): 45 http_client = AsyncHTTPClient() 46 response = yield http_client.fetch("http://example.com") 47 do_something_with_response(response) 48 self.render("template.html") 49 50.. testoutput:: 51 :hide: 52 53Most asynchronous functions in Tornado return a `.Future`; 54yielding this object returns its ``Future.result``. 55 56You can also yield a list or dict of ``Futures``, which will be 57started at the same time and run in parallel; a list or dict of results will 58be returned when they are all finished: 59 60.. testcode:: 61 62 @gen.coroutine 63 def get(self): 64 http_client = AsyncHTTPClient() 65 response1, response2 = yield [http_client.fetch(url1), 66 http_client.fetch(url2)] 67 response_dict = yield dict(response3=http_client.fetch(url3), 68 response4=http_client.fetch(url4)) 69 response3 = response_dict['response3'] 70 response4 = response_dict['response4'] 71 72.. testoutput:: 73 :hide: 74 75If the `~functools.singledispatch` library is available (standard in 76Python 3.4, available via the `singledispatch 77<https://pypi.python.org/pypi/singledispatch>`_ package on older 78versions), additional types of objects may be yielded. Tornado includes 79support for ``asyncio.Future`` and Twisted's ``Deferred`` class when 80``tornado.platform.asyncio`` and ``tornado.platform.twisted`` are imported. 81See the `convert_yielded` function to extend this mechanism. 82 83.. versionchanged:: 3.2 84 Dict support added. 85 86.. versionchanged:: 4.1 87 Support added for yielding ``asyncio`` Futures and Twisted Deferreds 88 via ``singledispatch``. 89 90""" 91from __future__ import absolute_import, division, print_function 92 93import collections 94import functools 95import itertools 96import os 97import sys 98import types 99import warnings 100 101from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info, 102 future_add_done_callback, future_set_result_unless_cancelled) 103from tornado.ioloop import IOLoop 104from tornado.log import app_log 105from tornado import stack_context 106from tornado.util import PY3, raise_exc_info, TimeoutError 107 108try: 109 try: 110 # py34+ 111 from functools import singledispatch # type: ignore 112 except ImportError: 113 from singledispatch import singledispatch # backport 114except ImportError: 115 # In most cases, singledispatch is required (to avoid 116 # difficult-to-diagnose problems in which the functionality 117 # available differs depending on which invisble packages are 118 # installed). However, in Google App Engine third-party 119 # dependencies are more trouble so we allow this module to be 120 # imported without it. 121 if 'APPENGINE_RUNTIME' not in os.environ: 122 raise 123 singledispatch = None 124 125try: 126 try: 127 # py35+ 128 from collections.abc import Generator as GeneratorType # type: ignore 129 except ImportError: 130 from backports_abc import Generator as GeneratorType # type: ignore 131 132 try: 133 # py35+ 134 from inspect import isawaitable # type: ignore 135 except ImportError: 136 from backports_abc import isawaitable 137except ImportError: 138 if 'APPENGINE_RUNTIME' not in os.environ: 139 raise 140 from types import GeneratorType 141 142 def isawaitable(x): # type: ignore 143 return False 144 145if PY3: 146 import builtins 147else: 148 import __builtin__ as builtins 149 150 151class KeyReuseError(Exception): 152 pass 153 154 155class UnknownKeyError(Exception): 156 pass 157 158 159class LeakedCallbackError(Exception): 160 pass 161 162 163class BadYieldError(Exception): 164 pass 165 166 167class ReturnValueIgnoredError(Exception): 168 pass 169 170 171def _value_from_stopiteration(e): 172 try: 173 # StopIteration has a value attribute beginning in py33. 174 # So does our Return class. 175 return e.value 176 except AttributeError: 177 pass 178 try: 179 # Cython backports coroutine functionality by putting the value in 180 # e.args[0]. 181 return e.args[0] 182 except (AttributeError, IndexError): 183 return None 184 185 186def _create_future(): 187 future = Future() 188 # Fixup asyncio debug info by removing extraneous stack entries 189 source_traceback = getattr(future, "_source_traceback", ()) 190 while source_traceback: 191 # Each traceback entry is equivalent to a 192 # (filename, self.lineno, self.name, self.line) tuple 193 filename = source_traceback[-1][0] 194 if filename == __file__: 195 del source_traceback[-1] 196 else: 197 break 198 return future 199 200 201def engine(func): 202 """Callback-oriented decorator for asynchronous generators. 203 204 This is an older interface; for new code that does not need to be 205 compatible with versions of Tornado older than 3.0 the 206 `coroutine` decorator is recommended instead. 207 208 This decorator is similar to `coroutine`, except it does not 209 return a `.Future` and the ``callback`` argument is not treated 210 specially. 211 212 In most cases, functions decorated with `engine` should take 213 a ``callback`` argument and invoke it with their result when 214 they are finished. One notable exception is the 215 `~tornado.web.RequestHandler` :ref:`HTTP verb methods <verbs>`, 216 which use ``self.finish()`` in place of a callback argument. 217 218 .. deprecated:: 5.1 219 220 This decorator will be removed in 6.0. Use `coroutine` or 221 ``async def`` instead. 222 """ 223 warnings.warn("gen.engine is deprecated, use gen.coroutine or async def instead", 224 DeprecationWarning) 225 func = _make_coroutine_wrapper(func, replace_callback=False) 226 227 @functools.wraps(func) 228 def wrapper(*args, **kwargs): 229 future = func(*args, **kwargs) 230 231 def final_callback(future): 232 if future.result() is not None: 233 raise ReturnValueIgnoredError( 234 "@gen.engine functions cannot return values: %r" % 235 (future.result(),)) 236 # The engine interface doesn't give us any way to return 237 # errors but to raise them into the stack context. 238 # Save the stack context here to use when the Future has resolved. 239 future_add_done_callback(future, stack_context.wrap(final_callback)) 240 return wrapper 241 242 243def coroutine(func): 244 """Decorator for asynchronous generators. 245 246 Any generator that yields objects from this module must be wrapped 247 in either this decorator or `engine`. 248 249 Coroutines may "return" by raising the special exception 250 `Return(value) <Return>`. In Python 3.3+, it is also possible for 251 the function to simply use the ``return value`` statement (prior to 252 Python 3.3 generators were not allowed to also return values). 253 In all versions of Python a coroutine that simply wishes to exit 254 early may use the ``return`` statement without a value. 255 256 Functions with this decorator return a `.Future`. Additionally, 257 they may be called with a ``callback`` keyword argument, which 258 will be invoked with the future's result when it resolves. If the 259 coroutine fails, the callback will not be run and an exception 260 will be raised into the surrounding `.StackContext`. The 261 ``callback`` argument is not visible inside the decorated 262 function; it is handled by the decorator itself. 263 264 .. warning:: 265 266 When exceptions occur inside a coroutine, the exception 267 information will be stored in the `.Future` object. You must 268 examine the result of the `.Future` object, or the exception 269 may go unnoticed by your code. This means yielding the function 270 if called from another coroutine, using something like 271 `.IOLoop.run_sync` for top-level calls, or passing the `.Future` 272 to `.IOLoop.add_future`. 273 274 .. deprecated:: 5.1 275 276 The ``callback`` argument is deprecated and will be removed in 6.0. 277 Use the returned awaitable object instead. 278 """ 279 return _make_coroutine_wrapper(func, replace_callback=True) 280 281 282def _make_coroutine_wrapper(func, replace_callback): 283 """The inner workings of ``@gen.coroutine`` and ``@gen.engine``. 284 285 The two decorators differ in their treatment of the ``callback`` 286 argument, so we cannot simply implement ``@engine`` in terms of 287 ``@coroutine``. 288 """ 289 # On Python 3.5, set the coroutine flag on our generator, to allow it 290 # to be used with 'await'. 291 wrapped = func 292 if hasattr(types, 'coroutine'): 293 func = types.coroutine(func) 294 295 @functools.wraps(wrapped) 296 def wrapper(*args, **kwargs): 297 future = _create_future() 298 299 if replace_callback and 'callback' in kwargs: 300 warnings.warn("callback arguments are deprecated, use the returned Future instead", 301 DeprecationWarning, stacklevel=2) 302 callback = kwargs.pop('callback') 303 IOLoop.current().add_future( 304 future, lambda future: callback(future.result())) 305 306 try: 307 result = func(*args, **kwargs) 308 except (Return, StopIteration) as e: 309 result = _value_from_stopiteration(e) 310 except Exception: 311 future_set_exc_info(future, sys.exc_info()) 312 try: 313 return future 314 finally: 315 # Avoid circular references 316 future = None 317 else: 318 if isinstance(result, GeneratorType): 319 # Inline the first iteration of Runner.run. This lets us 320 # avoid the cost of creating a Runner when the coroutine 321 # never actually yields, which in turn allows us to 322 # use "optional" coroutines in critical path code without 323 # performance penalty for the synchronous case. 324 try: 325 orig_stack_contexts = stack_context._state.contexts 326 yielded = next(result) 327 if stack_context._state.contexts is not orig_stack_contexts: 328 yielded = _create_future() 329 yielded.set_exception( 330 stack_context.StackContextInconsistentError( 331 'stack_context inconsistency (probably caused ' 332 'by yield within a "with StackContext" block)')) 333 except (StopIteration, Return) as e: 334 future_set_result_unless_cancelled(future, _value_from_stopiteration(e)) 335 except Exception: 336 future_set_exc_info(future, sys.exc_info()) 337 else: 338 # Provide strong references to Runner objects as long 339 # as their result future objects also have strong 340 # references (typically from the parent coroutine's 341 # Runner). This keeps the coroutine's Runner alive. 342 # We do this by exploiting the public API 343 # add_done_callback() instead of putting a private 344 # attribute on the Future. 345 # (Github issues #1769, #2229). 346 runner = Runner(result, future, yielded) 347 future.add_done_callback(lambda _: runner) 348 yielded = None 349 try: 350 return future 351 finally: 352 # Subtle memory optimization: if next() raised an exception, 353 # the future's exc_info contains a traceback which 354 # includes this stack frame. This creates a cycle, 355 # which will be collected at the next full GC but has 356 # been shown to greatly increase memory usage of 357 # benchmarks (relative to the refcount-based scheme 358 # used in the absence of cycles). We can avoid the 359 # cycle by clearing the local variable after we return it. 360 future = None 361 future_set_result_unless_cancelled(future, result) 362 return future 363 364 wrapper.__wrapped__ = wrapped 365 wrapper.__tornado_coroutine__ = True 366 return wrapper 367 368 369def is_coroutine_function(func): 370 """Return whether *func* is a coroutine function, i.e. a function 371 wrapped with `~.gen.coroutine`. 372 373 .. versionadded:: 4.5 374 """ 375 return getattr(func, '__tornado_coroutine__', False) 376 377 378class Return(Exception): 379 """Special exception to return a value from a `coroutine`. 380 381 If this exception is raised, its value argument is used as the 382 result of the coroutine:: 383 384 @gen.coroutine 385 def fetch_json(url): 386 response = yield AsyncHTTPClient().fetch(url) 387 raise gen.Return(json_decode(response.body)) 388 389 In Python 3.3, this exception is no longer necessary: the ``return`` 390 statement can be used directly to return a value (previously 391 ``yield`` and ``return`` with a value could not be combined in the 392 same function). 393 394 By analogy with the return statement, the value argument is optional, 395 but it is never necessary to ``raise gen.Return()``. The ``return`` 396 statement can be used with no arguments instead. 397 """ 398 def __init__(self, value=None): 399 super(Return, self).__init__() 400 self.value = value 401 # Cython recognizes subclasses of StopIteration with a .args tuple. 402 self.args = (value,) 403 404 405class WaitIterator(object): 406 """Provides an iterator to yield the results of futures as they finish. 407 408 Yielding a set of futures like this: 409 410 ``results = yield [future1, future2]`` 411 412 pauses the coroutine until both ``future1`` and ``future2`` 413 return, and then restarts the coroutine with the results of both 414 futures. If either future is an exception, the expression will 415 raise that exception and all the results will be lost. 416 417 If you need to get the result of each future as soon as possible, 418 or if you need the result of some futures even if others produce 419 errors, you can use ``WaitIterator``:: 420 421 wait_iterator = gen.WaitIterator(future1, future2) 422 while not wait_iterator.done(): 423 try: 424 result = yield wait_iterator.next() 425 except Exception as e: 426 print("Error {} from {}".format(e, wait_iterator.current_future)) 427 else: 428 print("Result {} received from {} at {}".format( 429 result, wait_iterator.current_future, 430 wait_iterator.current_index)) 431 432 Because results are returned as soon as they are available the 433 output from the iterator *will not be in the same order as the 434 input arguments*. If you need to know which future produced the 435 current result, you can use the attributes 436 ``WaitIterator.current_future``, or ``WaitIterator.current_index`` 437 to get the index of the future from the input list. (if keyword 438 arguments were used in the construction of the `WaitIterator`, 439 ``current_index`` will use the corresponding keyword). 440 441 On Python 3.5, `WaitIterator` implements the async iterator 442 protocol, so it can be used with the ``async for`` statement (note 443 that in this version the entire iteration is aborted if any value 444 raises an exception, while the previous example can continue past 445 individual errors):: 446 447 async for result in gen.WaitIterator(future1, future2): 448 print("Result {} received from {} at {}".format( 449 result, wait_iterator.current_future, 450 wait_iterator.current_index)) 451 452 .. versionadded:: 4.1 453 454 .. versionchanged:: 4.3 455 Added ``async for`` support in Python 3.5. 456 457 """ 458 def __init__(self, *args, **kwargs): 459 if args and kwargs: 460 raise ValueError( 461 "You must provide args or kwargs, not both") 462 463 if kwargs: 464 self._unfinished = dict((f, k) for (k, f) in kwargs.items()) 465 futures = list(kwargs.values()) 466 else: 467 self._unfinished = dict((f, i) for (i, f) in enumerate(args)) 468 futures = args 469 470 self._finished = collections.deque() 471 self.current_index = self.current_future = None 472 self._running_future = None 473 474 for future in futures: 475 future_add_done_callback(future, self._done_callback) 476 477 def done(self): 478 """Returns True if this iterator has no more results.""" 479 if self._finished or self._unfinished: 480 return False 481 # Clear the 'current' values when iteration is done. 482 self.current_index = self.current_future = None 483 return True 484 485 def next(self): 486 """Returns a `.Future` that will yield the next available result. 487 488 Note that this `.Future` will not be the same object as any of 489 the inputs. 490 """ 491 self._running_future = Future() 492 493 if self._finished: 494 self._return_result(self._finished.popleft()) 495 496 return self._running_future 497 498 def _done_callback(self, done): 499 if self._running_future and not self._running_future.done(): 500 self._return_result(done) 501 else: 502 self._finished.append(done) 503 504 def _return_result(self, done): 505 """Called set the returned future's state that of the future 506 we yielded, and set the current future for the iterator. 507 """ 508 chain_future(done, self._running_future) 509 510 self.current_future = done 511 self.current_index = self._unfinished.pop(done) 512 513 def __aiter__(self): 514 return self 515 516 def __anext__(self): 517 if self.done(): 518 # Lookup by name to silence pyflakes on older versions. 519 raise getattr(builtins, 'StopAsyncIteration')() 520 return self.next() 521 522 523class YieldPoint(object): 524 """Base class for objects that may be yielded from the generator. 525 526 .. deprecated:: 4.0 527 Use `Futures <.Future>` instead. This class and all its subclasses 528 will be removed in 6.0 529 """ 530 def __init__(self): 531 warnings.warn("YieldPoint is deprecated, use Futures instead", 532 DeprecationWarning) 533 534 def start(self, runner): 535 """Called by the runner after the generator has yielded. 536 537 No other methods will be called on this object before ``start``. 538 """ 539 raise NotImplementedError() 540 541 def is_ready(self): 542 """Called by the runner to determine whether to resume the generator. 543 544 Returns a boolean; may be called more than once. 545 """ 546 raise NotImplementedError() 547 548 def get_result(self): 549 """Returns the value to use as the result of the yield expression. 550 551 This method will only be called once, and only after `is_ready` 552 has returned true. 553 """ 554 raise NotImplementedError() 555 556 557class Callback(YieldPoint): 558 """Returns a callable object that will allow a matching `Wait` to proceed. 559 560 The key may be any value suitable for use as a dictionary key, and is 561 used to match ``Callbacks`` to their corresponding ``Waits``. The key 562 must be unique among outstanding callbacks within a single run of the 563 generator function, but may be reused across different runs of the same 564 function (so constants generally work fine). 565 566 The callback may be called with zero or one arguments; if an argument 567 is given it will be returned by `Wait`. 568 569 .. deprecated:: 4.0 570 Use `Futures <.Future>` instead. This class will be removed in 6.0. 571 """ 572 def __init__(self, key): 573 warnings.warn("gen.Callback is deprecated, use Futures instead", 574 DeprecationWarning) 575 self.key = key 576 577 def start(self, runner): 578 self.runner = runner 579 runner.register_callback(self.key) 580 581 def is_ready(self): 582 return True 583 584 def get_result(self): 585 return self.runner.result_callback(self.key) 586 587 588class Wait(YieldPoint): 589 """Returns the argument passed to the result of a previous `Callback`. 590 591 .. deprecated:: 4.0 592 Use `Futures <.Future>` instead. This class will be removed in 6.0. 593 """ 594 def __init__(self, key): 595 warnings.warn("gen.Wait is deprecated, use Futures instead", 596 DeprecationWarning) 597 self.key = key 598 599 def start(self, runner): 600 self.runner = runner 601 602 def is_ready(self): 603 return self.runner.is_ready(self.key) 604 605 def get_result(self): 606 return self.runner.pop_result(self.key) 607 608 609class WaitAll(YieldPoint): 610 """Returns the results of multiple previous `Callbacks <Callback>`. 611 612 The argument is a sequence of `Callback` keys, and the result is 613 a list of results in the same order. 614 615 `WaitAll` is equivalent to yielding a list of `Wait` objects. 616 617 .. deprecated:: 4.0 618 Use `Futures <.Future>` instead. This class will be removed in 6.0. 619 """ 620 def __init__(self, keys): 621 warnings.warn("gen.WaitAll is deprecated, use gen.multi instead", 622 DeprecationWarning) 623 self.keys = keys 624 625 def start(self, runner): 626 self.runner = runner 627 628 def is_ready(self): 629 return all(self.runner.is_ready(key) for key in self.keys) 630 631 def get_result(self): 632 return [self.runner.pop_result(key) for key in self.keys] 633 634 635def Task(func, *args, **kwargs): 636 """Adapts a callback-based asynchronous function for use in coroutines. 637 638 Takes a function (and optional additional arguments) and runs it with 639 those arguments plus a ``callback`` keyword argument. The argument passed 640 to the callback is returned as the result of the yield expression. 641 642 .. versionchanged:: 4.0 643 ``gen.Task`` is now a function that returns a `.Future`, instead of 644 a subclass of `YieldPoint`. It still behaves the same way when 645 yielded. 646 647 .. deprecated:: 5.1 648 This function is deprecated and will be removed in 6.0. 649 """ 650 warnings.warn("gen.Task is deprecated, use Futures instead", 651 DeprecationWarning) 652 future = _create_future() 653 654 def handle_exception(typ, value, tb): 655 if future.done(): 656 return False 657 future_set_exc_info(future, (typ, value, tb)) 658 return True 659 660 def set_result(result): 661 if future.done(): 662 return 663 future_set_result_unless_cancelled(future, result) 664 with stack_context.ExceptionStackContext(handle_exception): 665 func(*args, callback=_argument_adapter(set_result), **kwargs) 666 return future 667 668 669class YieldFuture(YieldPoint): 670 def __init__(self, future): 671 """Adapts a `.Future` to the `YieldPoint` interface. 672 673 .. versionchanged:: 5.0 674 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 675 676 .. deprecated:: 5.1 677 This class will be removed in 6.0. 678 """ 679 warnings.warn("YieldFuture is deprecated, use Futures instead", 680 DeprecationWarning) 681 self.future = future 682 self.io_loop = IOLoop.current() 683 684 def start(self, runner): 685 if not self.future.done(): 686 self.runner = runner 687 self.key = object() 688 runner.register_callback(self.key) 689 self.io_loop.add_future(self.future, runner.result_callback(self.key)) 690 else: 691 self.runner = None 692 self.result_fn = self.future.result 693 694 def is_ready(self): 695 if self.runner is not None: 696 return self.runner.is_ready(self.key) 697 else: 698 return True 699 700 def get_result(self): 701 if self.runner is not None: 702 return self.runner.pop_result(self.key).result() 703 else: 704 return self.result_fn() 705 706 707def _contains_yieldpoint(children): 708 """Returns True if ``children`` contains any YieldPoints. 709 710 ``children`` may be a dict or a list, as used by `MultiYieldPoint` 711 and `multi_future`. 712 """ 713 if isinstance(children, dict): 714 return any(isinstance(i, YieldPoint) for i in children.values()) 715 if isinstance(children, list): 716 return any(isinstance(i, YieldPoint) for i in children) 717 return False 718 719 720def multi(children, quiet_exceptions=()): 721 """Runs multiple asynchronous operations in parallel. 722 723 ``children`` may either be a list or a dict whose values are 724 yieldable objects. ``multi()`` returns a new yieldable 725 object that resolves to a parallel structure containing their 726 results. If ``children`` is a list, the result is a list of 727 results in the same order; if it is a dict, the result is a dict 728 with the same keys. 729 730 That is, ``results = yield multi(list_of_futures)`` is equivalent 731 to:: 732 733 results = [] 734 for future in list_of_futures: 735 results.append(yield future) 736 737 If any children raise exceptions, ``multi()`` will raise the first 738 one. All others will be logged, unless they are of types 739 contained in the ``quiet_exceptions`` argument. 740 741 If any of the inputs are `YieldPoints <YieldPoint>`, the returned 742 yieldable object is a `YieldPoint`. Otherwise, returns a `.Future`. 743 This means that the result of `multi` can be used in a native 744 coroutine if and only if all of its children can be. 745 746 In a ``yield``-based coroutine, it is not normally necessary to 747 call this function directly, since the coroutine runner will 748 do it automatically when a list or dict is yielded. However, 749 it is necessary in ``await``-based coroutines, or to pass 750 the ``quiet_exceptions`` argument. 751 752 This function is available under the names ``multi()`` and ``Multi()`` 753 for historical reasons. 754 755 Cancelling a `.Future` returned by ``multi()`` does not cancel its 756 children. `asyncio.gather` is similar to ``multi()``, but it does 757 cancel its children. 758 759 .. versionchanged:: 4.2 760 If multiple yieldables fail, any exceptions after the first 761 (which is raised) will be logged. Added the ``quiet_exceptions`` 762 argument to suppress this logging for selected exception types. 763 764 .. versionchanged:: 4.3 765 Replaced the class ``Multi`` and the function ``multi_future`` 766 with a unified function ``multi``. Added support for yieldables 767 other than `YieldPoint` and `.Future`. 768 769 """ 770 if _contains_yieldpoint(children): 771 return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions) 772 else: 773 return multi_future(children, quiet_exceptions=quiet_exceptions) 774 775 776Multi = multi 777 778 779class MultiYieldPoint(YieldPoint): 780 """Runs multiple asynchronous operations in parallel. 781 782 This class is similar to `multi`, but it always creates a stack 783 context even when no children require it. It is not compatible with 784 native coroutines. 785 786 .. versionchanged:: 4.2 787 If multiple ``YieldPoints`` fail, any exceptions after the first 788 (which is raised) will be logged. Added the ``quiet_exceptions`` 789 argument to suppress this logging for selected exception types. 790 791 .. versionchanged:: 4.3 792 Renamed from ``Multi`` to ``MultiYieldPoint``. The name ``Multi`` 793 remains as an alias for the equivalent `multi` function. 794 795 .. deprecated:: 4.3 796 Use `multi` instead. This class will be removed in 6.0. 797 """ 798 def __init__(self, children, quiet_exceptions=()): 799 warnings.warn("MultiYieldPoint is deprecated, use Futures instead", 800 DeprecationWarning) 801 self.keys = None 802 if isinstance(children, dict): 803 self.keys = list(children.keys()) 804 children = children.values() 805 self.children = [] 806 for i in children: 807 if not isinstance(i, YieldPoint): 808 i = convert_yielded(i) 809 if is_future(i): 810 i = YieldFuture(i) 811 self.children.append(i) 812 assert all(isinstance(i, YieldPoint) for i in self.children) 813 self.unfinished_children = set(self.children) 814 self.quiet_exceptions = quiet_exceptions 815 816 def start(self, runner): 817 for i in self.children: 818 i.start(runner) 819 820 def is_ready(self): 821 finished = list(itertools.takewhile( 822 lambda i: i.is_ready(), self.unfinished_children)) 823 self.unfinished_children.difference_update(finished) 824 return not self.unfinished_children 825 826 def get_result(self): 827 result_list = [] 828 exc_info = None 829 for f in self.children: 830 try: 831 result_list.append(f.get_result()) 832 except Exception as e: 833 if exc_info is None: 834 exc_info = sys.exc_info() 835 else: 836 if not isinstance(e, self.quiet_exceptions): 837 app_log.error("Multiple exceptions in yield list", 838 exc_info=True) 839 if exc_info is not None: 840 raise_exc_info(exc_info) 841 if self.keys is not None: 842 return dict(zip(self.keys, result_list)) 843 else: 844 return list(result_list) 845 846 847def multi_future(children, quiet_exceptions=()): 848 """Wait for multiple asynchronous futures in parallel. 849 850 This function is similar to `multi`, but does not support 851 `YieldPoints <YieldPoint>`. 852 853 .. versionadded:: 4.0 854 855 .. versionchanged:: 4.2 856 If multiple ``Futures`` fail, any exceptions after the first (which is 857 raised) will be logged. Added the ``quiet_exceptions`` 858 argument to suppress this logging for selected exception types. 859 860 .. deprecated:: 4.3 861 Use `multi` instead. 862 """ 863 if isinstance(children, dict): 864 keys = list(children.keys()) 865 children = children.values() 866 else: 867 keys = None 868 children = list(map(convert_yielded, children)) 869 assert all(is_future(i) or isinstance(i, _NullFuture) for i in children) 870 unfinished_children = set(children) 871 872 future = _create_future() 873 if not children: 874 future_set_result_unless_cancelled(future, 875 {} if keys is not None else []) 876 877 def callback(f): 878 unfinished_children.remove(f) 879 if not unfinished_children: 880 result_list = [] 881 for f in children: 882 try: 883 result_list.append(f.result()) 884 except Exception as e: 885 if future.done(): 886 if not isinstance(e, quiet_exceptions): 887 app_log.error("Multiple exceptions in yield list", 888 exc_info=True) 889 else: 890 future_set_exc_info(future, sys.exc_info()) 891 if not future.done(): 892 if keys is not None: 893 future_set_result_unless_cancelled(future, 894 dict(zip(keys, result_list))) 895 else: 896 future_set_result_unless_cancelled(future, result_list) 897 898 listening = set() 899 for f in children: 900 if f not in listening: 901 listening.add(f) 902 future_add_done_callback(f, callback) 903 return future 904 905 906def maybe_future(x): 907 """Converts ``x`` into a `.Future`. 908 909 If ``x`` is already a `.Future`, it is simply returned; otherwise 910 it is wrapped in a new `.Future`. This is suitable for use as 911 ``result = yield gen.maybe_future(f())`` when you don't know whether 912 ``f()`` returns a `.Future` or not. 913 914 .. deprecated:: 4.3 915 This function only handles ``Futures``, not other yieldable objects. 916 Instead of `maybe_future`, check for the non-future result types 917 you expect (often just ``None``), and ``yield`` anything unknown. 918 """ 919 if is_future(x): 920 return x 921 else: 922 fut = _create_future() 923 fut.set_result(x) 924 return fut 925 926 927def with_timeout(timeout, future, quiet_exceptions=()): 928 """Wraps a `.Future` (or other yieldable object) in a timeout. 929 930 Raises `tornado.util.TimeoutError` if the input future does not 931 complete before ``timeout``, which may be specified in any form 932 allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or 933 an absolute time relative to `.IOLoop.time`) 934 935 If the wrapped `.Future` fails after it has timed out, the exception 936 will be logged unless it is of a type contained in ``quiet_exceptions`` 937 (which may be an exception type or a sequence of types). 938 939 Does not support `YieldPoint` subclasses. 940 941 The wrapped `.Future` is not canceled when the timeout expires, 942 permitting it to be reused. `asyncio.wait_for` is similar to this 943 function but it does cancel the wrapped `.Future` on timeout. 944 945 .. versionadded:: 4.0 946 947 .. versionchanged:: 4.1 948 Added the ``quiet_exceptions`` argument and the logging of unhandled 949 exceptions. 950 951 .. versionchanged:: 4.4 952 Added support for yieldable objects other than `.Future`. 953 954 """ 955 # TODO: allow YieldPoints in addition to other yieldables? 956 # Tricky to do with stack_context semantics. 957 # 958 # It's tempting to optimize this by cancelling the input future on timeout 959 # instead of creating a new one, but A) we can't know if we are the only 960 # one waiting on the input future, so cancelling it might disrupt other 961 # callers and B) concurrent futures can only be cancelled while they are 962 # in the queue, so cancellation cannot reliably bound our waiting time. 963 future = convert_yielded(future) 964 result = _create_future() 965 chain_future(future, result) 966 io_loop = IOLoop.current() 967 968 def error_callback(future): 969 try: 970 future.result() 971 except Exception as e: 972 if not isinstance(e, quiet_exceptions): 973 app_log.error("Exception in Future %r after timeout", 974 future, exc_info=True) 975 976 def timeout_callback(): 977 if not result.done(): 978 result.set_exception(TimeoutError("Timeout")) 979 # In case the wrapped future goes on to fail, log it. 980 future_add_done_callback(future, error_callback) 981 timeout_handle = io_loop.add_timeout( 982 timeout, timeout_callback) 983 if isinstance(future, Future): 984 # We know this future will resolve on the IOLoop, so we don't 985 # need the extra thread-safety of IOLoop.add_future (and we also 986 # don't care about StackContext here. 987 future_add_done_callback( 988 future, lambda future: io_loop.remove_timeout(timeout_handle)) 989 else: 990 # concurrent.futures.Futures may resolve on any thread, so we 991 # need to route them back to the IOLoop. 992 io_loop.add_future( 993 future, lambda future: io_loop.remove_timeout(timeout_handle)) 994 return result 995 996 997def sleep(duration): 998 """Return a `.Future` that resolves after the given number of seconds. 999 1000 When used with ``yield`` in a coroutine, this is a non-blocking 1001 analogue to `time.sleep` (which should not be used in coroutines 1002 because it is blocking):: 1003 1004 yield gen.sleep(0.5) 1005 1006 Note that calling this function on its own does nothing; you must 1007 wait on the `.Future` it returns (usually by yielding it). 1008 1009 .. versionadded:: 4.1 1010 """ 1011 f = _create_future() 1012 IOLoop.current().call_later(duration, 1013 lambda: future_set_result_unless_cancelled(f, None)) 1014 return f 1015 1016 1017class _NullFuture(object): 1018 """_NullFuture resembles a Future that finished with a result of None. 1019 1020 It's not actually a `Future` to avoid depending on a particular event loop. 1021 Handled as a special case in the coroutine runner. 1022 """ 1023 def result(self): 1024 return None 1025 1026 def done(self): 1027 return True 1028 1029 1030# _null_future is used as a dummy value in the coroutine runner. It differs 1031# from moment in that moment always adds a delay of one IOLoop iteration 1032# while _null_future is processed as soon as possible. 1033_null_future = _NullFuture() 1034 1035moment = _NullFuture() 1036moment.__doc__ = \ 1037 """A special object which may be yielded to allow the IOLoop to run for 1038one iteration. 1039 1040This is not needed in normal use but it can be helpful in long-running 1041coroutines that are likely to yield Futures that are ready instantly. 1042 1043Usage: ``yield gen.moment`` 1044 1045.. versionadded:: 4.0 1046 1047.. deprecated:: 4.5 1048 ``yield None`` (or ``yield`` with no argument) is now equivalent to 1049 ``yield gen.moment``. 1050""" 1051 1052 1053class Runner(object): 1054 """Internal implementation of `tornado.gen.engine`. 1055 1056 Maintains information about pending callbacks and their results. 1057 1058 The results of the generator are stored in ``result_future`` (a 1059 `.Future`) 1060 """ 1061 def __init__(self, gen, result_future, first_yielded): 1062 self.gen = gen 1063 self.result_future = result_future 1064 self.future = _null_future 1065 self.yield_point = None 1066 self.pending_callbacks = None 1067 self.results = None 1068 self.running = False 1069 self.finished = False 1070 self.had_exception = False 1071 self.io_loop = IOLoop.current() 1072 # For efficiency, we do not create a stack context until we 1073 # reach a YieldPoint (stack contexts are required for the historical 1074 # semantics of YieldPoints, but not for Futures). When we have 1075 # done so, this field will be set and must be called at the end 1076 # of the coroutine. 1077 self.stack_context_deactivate = None 1078 if self.handle_yield(first_yielded): 1079 gen = result_future = first_yielded = None 1080 self.run() 1081 1082 def register_callback(self, key): 1083 """Adds ``key`` to the list of callbacks.""" 1084 if self.pending_callbacks is None: 1085 # Lazily initialize the old-style YieldPoint data structures. 1086 self.pending_callbacks = set() 1087 self.results = {} 1088 if key in self.pending_callbacks: 1089 raise KeyReuseError("key %r is already pending" % (key,)) 1090 self.pending_callbacks.add(key) 1091 1092 def is_ready(self, key): 1093 """Returns true if a result is available for ``key``.""" 1094 if self.pending_callbacks is None or key not in self.pending_callbacks: 1095 raise UnknownKeyError("key %r is not pending" % (key,)) 1096 return key in self.results 1097 1098 def set_result(self, key, result): 1099 """Sets the result for ``key`` and attempts to resume the generator.""" 1100 self.results[key] = result 1101 if self.yield_point is not None and self.yield_point.is_ready(): 1102 try: 1103 future_set_result_unless_cancelled(self.future, 1104 self.yield_point.get_result()) 1105 except: 1106 future_set_exc_info(self.future, sys.exc_info()) 1107 self.yield_point = None 1108 self.run() 1109 1110 def pop_result(self, key): 1111 """Returns the result for ``key`` and unregisters it.""" 1112 self.pending_callbacks.remove(key) 1113 return self.results.pop(key) 1114 1115 def run(self): 1116 """Starts or resumes the generator, running until it reaches a 1117 yield point that is not ready. 1118 """ 1119 if self.running or self.finished: 1120 return 1121 try: 1122 self.running = True 1123 while True: 1124 future = self.future 1125 if not future.done(): 1126 return 1127 self.future = None 1128 try: 1129 orig_stack_contexts = stack_context._state.contexts 1130 exc_info = None 1131 1132 try: 1133 value = future.result() 1134 except Exception: 1135 self.had_exception = True 1136 exc_info = sys.exc_info() 1137 future = None 1138 1139 if exc_info is not None: 1140 try: 1141 yielded = self.gen.throw(*exc_info) 1142 finally: 1143 # Break up a reference to itself 1144 # for faster GC on CPython. 1145 exc_info = None 1146 else: 1147 yielded = self.gen.send(value) 1148 1149 if stack_context._state.contexts is not orig_stack_contexts: 1150 self.gen.throw( 1151 stack_context.StackContextInconsistentError( 1152 'stack_context inconsistency (probably caused ' 1153 'by yield within a "with StackContext" block)')) 1154 except (StopIteration, Return) as e: 1155 self.finished = True 1156 self.future = _null_future 1157 if self.pending_callbacks and not self.had_exception: 1158 # If we ran cleanly without waiting on all callbacks 1159 # raise an error (really more of a warning). If we 1160 # had an exception then some callbacks may have been 1161 # orphaned, so skip the check in that case. 1162 raise LeakedCallbackError( 1163 "finished without waiting for callbacks %r" % 1164 self.pending_callbacks) 1165 future_set_result_unless_cancelled(self.result_future, 1166 _value_from_stopiteration(e)) 1167 self.result_future = None 1168 self._deactivate_stack_context() 1169 return 1170 except Exception: 1171 self.finished = True 1172 self.future = _null_future 1173 future_set_exc_info(self.result_future, sys.exc_info()) 1174 self.result_future = None 1175 self._deactivate_stack_context() 1176 return 1177 if not self.handle_yield(yielded): 1178 return 1179 yielded = None 1180 finally: 1181 self.running = False 1182 1183 def handle_yield(self, yielded): 1184 # Lists containing YieldPoints require stack contexts; 1185 # other lists are handled in convert_yielded. 1186 if _contains_yieldpoint(yielded): 1187 yielded = multi(yielded) 1188 1189 if isinstance(yielded, YieldPoint): 1190 # YieldPoints are too closely coupled to the Runner to go 1191 # through the generic convert_yielded mechanism. 1192 self.future = Future() 1193 1194 def start_yield_point(): 1195 try: 1196 yielded.start(self) 1197 if yielded.is_ready(): 1198 future_set_result_unless_cancelled(self.future, yielded.get_result()) 1199 else: 1200 self.yield_point = yielded 1201 except Exception: 1202 self.future = Future() 1203 future_set_exc_info(self.future, sys.exc_info()) 1204 1205 if self.stack_context_deactivate is None: 1206 # Start a stack context if this is the first 1207 # YieldPoint we've seen. 1208 with stack_context.ExceptionStackContext( 1209 self.handle_exception) as deactivate: 1210 self.stack_context_deactivate = deactivate 1211 1212 def cb(): 1213 start_yield_point() 1214 self.run() 1215 self.io_loop.add_callback(cb) 1216 return False 1217 else: 1218 start_yield_point() 1219 else: 1220 try: 1221 self.future = convert_yielded(yielded) 1222 except BadYieldError: 1223 self.future = Future() 1224 future_set_exc_info(self.future, sys.exc_info()) 1225 1226 if self.future is moment: 1227 self.io_loop.add_callback(self.run) 1228 return False 1229 elif not self.future.done(): 1230 def inner(f): 1231 # Break a reference cycle to speed GC. 1232 f = None # noqa 1233 self.run() 1234 self.io_loop.add_future( 1235 self.future, inner) 1236 return False 1237 return True 1238 1239 def result_callback(self, key): 1240 return stack_context.wrap(_argument_adapter( 1241 functools.partial(self.set_result, key))) 1242 1243 def handle_exception(self, typ, value, tb): 1244 if not self.running and not self.finished: 1245 self.future = Future() 1246 future_set_exc_info(self.future, (typ, value, tb)) 1247 self.run() 1248 return True 1249 else: 1250 return False 1251 1252 def _deactivate_stack_context(self): 1253 if self.stack_context_deactivate is not None: 1254 self.stack_context_deactivate() 1255 self.stack_context_deactivate = None 1256 1257 1258Arguments = collections.namedtuple('Arguments', ['args', 'kwargs']) 1259 1260 1261def _argument_adapter(callback): 1262 """Returns a function that when invoked runs ``callback`` with one arg. 1263 1264 If the function returned by this function is called with exactly 1265 one argument, that argument is passed to ``callback``. Otherwise 1266 the args tuple and kwargs dict are wrapped in an `Arguments` object. 1267 """ 1268 def wrapper(*args, **kwargs): 1269 if kwargs or len(args) > 1: 1270 callback(Arguments(args, kwargs)) 1271 elif args: 1272 callback(args[0]) 1273 else: 1274 callback(None) 1275 return wrapper 1276 1277 1278# Convert Awaitables into Futures. 1279try: 1280 import asyncio 1281except ImportError: 1282 # Py2-compatible version for use with Cython. 1283 # Copied from PEP 380. 1284 @coroutine 1285 def _wrap_awaitable(x): 1286 if hasattr(x, '__await__'): 1287 _i = x.__await__() 1288 else: 1289 _i = iter(x) 1290 try: 1291 _y = next(_i) 1292 except StopIteration as _e: 1293 _r = _value_from_stopiteration(_e) 1294 else: 1295 while 1: 1296 try: 1297 _s = yield _y 1298 except GeneratorExit as _e: 1299 try: 1300 _m = _i.close 1301 except AttributeError: 1302 pass 1303 else: 1304 _m() 1305 raise _e 1306 except BaseException as _e: 1307 _x = sys.exc_info() 1308 try: 1309 _m = _i.throw 1310 except AttributeError: 1311 raise _e 1312 else: 1313 try: 1314 _y = _m(*_x) 1315 except StopIteration as _e: 1316 _r = _value_from_stopiteration(_e) 1317 break 1318 else: 1319 try: 1320 if _s is None: 1321 _y = next(_i) 1322 else: 1323 _y = _i.send(_s) 1324 except StopIteration as _e: 1325 _r = _value_from_stopiteration(_e) 1326 break 1327 raise Return(_r) 1328else: 1329 try: 1330 _wrap_awaitable = asyncio.ensure_future 1331 except AttributeError: 1332 # asyncio.ensure_future was introduced in Python 3.4.4, but 1333 # Debian jessie still ships with 3.4.2 so try the old name. 1334 _wrap_awaitable = getattr(asyncio, 'async') 1335 1336 1337def convert_yielded(yielded): 1338 """Convert a yielded object into a `.Future`. 1339 1340 The default implementation accepts lists, dictionaries, and Futures. 1341 1342 If the `~functools.singledispatch` library is available, this function 1343 may be extended to support additional types. For example:: 1344 1345 @convert_yielded.register(asyncio.Future) 1346 def _(asyncio_future): 1347 return tornado.platform.asyncio.to_tornado_future(asyncio_future) 1348 1349 .. versionadded:: 4.1 1350 """ 1351 # Lists and dicts containing YieldPoints were handled earlier. 1352 if yielded is None or yielded is moment: 1353 return moment 1354 elif yielded is _null_future: 1355 return _null_future 1356 elif isinstance(yielded, (list, dict)): 1357 return multi(yielded) 1358 elif is_future(yielded): 1359 return yielded 1360 elif isawaitable(yielded): 1361 return _wrap_awaitable(yielded) 1362 else: 1363 raise BadYieldError("yielded unknown object %r" % (yielded,)) 1364 1365 1366if singledispatch is not None: 1367 convert_yielded = singledispatch(convert_yielded) 1368