1"""``tornado.gen`` implements generator-based coroutines.
2
3.. note::
4
5   The "decorator and generator" approach in this module is a
6   precursor to native coroutines (using ``async def`` and ``await``)
7   which were introduced in Python 3.5. Applications that do not
8   require compatibility with older versions of Python should use
9   native coroutines instead. Some parts of this module are still
10   useful with native coroutines, notably `multi`, `sleep`,
11   `WaitIterator`, and `with_timeout`. Some of these functions have
12   counterparts in the `asyncio` module which may be used as well,
13   although the two may not necessarily be 100% compatible.
14
15Coroutines provide an easier way to work in an asynchronous
16environment than chaining callbacks. Code using coroutines is
17technically asynchronous, but it is written as a single generator
18instead of a collection of separate functions.
19
20For example, the following callback-based asynchronous handler:
21
22.. testcode::
23
24    class AsyncHandler(RequestHandler):
25        @asynchronous
26        def get(self):
27            http_client = AsyncHTTPClient()
28            http_client.fetch("http://example.com",
29                              callback=self.on_fetch)
30
31        def on_fetch(self, response):
32            do_something_with_response(response)
33            self.render("template.html")
34
35.. testoutput::
36   :hide:
37
38could be written with ``gen`` as:
39
40.. testcode::
41
42    class GenAsyncHandler(RequestHandler):
43        @gen.coroutine
44        def get(self):
45            http_client = AsyncHTTPClient()
46            response = yield http_client.fetch("http://example.com")
47            do_something_with_response(response)
48            self.render("template.html")
49
50.. testoutput::
51   :hide:
52
53Most asynchronous functions in Tornado return a `.Future`;
54yielding this object returns its ``Future.result``.
55
56You can also yield a list or dict of ``Futures``, which will be
57started at the same time and run in parallel; a list or dict of results will
58be returned when they are all finished:
59
60.. testcode::
61
62    @gen.coroutine
63    def get(self):
64        http_client = AsyncHTTPClient()
65        response1, response2 = yield [http_client.fetch(url1),
66                                      http_client.fetch(url2)]
67        response_dict = yield dict(response3=http_client.fetch(url3),
68                                   response4=http_client.fetch(url4))
69        response3 = response_dict['response3']
70        response4 = response_dict['response4']
71
72.. testoutput::
73   :hide:
74
75If the `~functools.singledispatch` library is available (standard in
76Python 3.4, available via the `singledispatch
77<https://pypi.python.org/pypi/singledispatch>`_ package on older
78versions), additional types of objects may be yielded. Tornado includes
79support for ``asyncio.Future`` and Twisted's ``Deferred`` class when
80``tornado.platform.asyncio`` and ``tornado.platform.twisted`` are imported.
81See the `convert_yielded` function to extend this mechanism.
82
83.. versionchanged:: 3.2
84   Dict support added.
85
86.. versionchanged:: 4.1
87   Support added for yielding ``asyncio`` Futures and Twisted Deferreds
88   via ``singledispatch``.
89
90"""
91from __future__ import absolute_import, division, print_function
92
93import collections
94import functools
95import itertools
96import os
97import sys
98import types
99import warnings
100
101from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info,
102                                future_add_done_callback, future_set_result_unless_cancelled)
103from tornado.ioloop import IOLoop
104from tornado.log import app_log
105from tornado import stack_context
106from tornado.util import PY3, raise_exc_info, TimeoutError
107
108try:
109    try:
110        # py34+
111        from functools import singledispatch  # type: ignore
112    except ImportError:
113        from singledispatch import singledispatch  # backport
114except ImportError:
115    # In most cases, singledispatch is required (to avoid
116    # difficult-to-diagnose problems in which the functionality
117    # available differs depending on which invisble packages are
118    # installed). However, in Google App Engine third-party
119    # dependencies are more trouble so we allow this module to be
120    # imported without it.
121    if 'APPENGINE_RUNTIME' not in os.environ:
122        raise
123    singledispatch = None
124
125try:
126    try:
127        # py35+
128        from collections.abc import Generator as GeneratorType  # type: ignore
129    except ImportError:
130        from backports_abc import Generator as GeneratorType  # type: ignore
131
132    try:
133        # py35+
134        from inspect import isawaitable  # type: ignore
135    except ImportError:
136        from backports_abc import isawaitable
137except ImportError:
138    if 'APPENGINE_RUNTIME' not in os.environ:
139        raise
140    from types import GeneratorType
141
142    def isawaitable(x):  # type: ignore
143        return False
144
145if PY3:
146    import builtins
147else:
148    import __builtin__ as builtins
149
150
151class KeyReuseError(Exception):
152    pass
153
154
155class UnknownKeyError(Exception):
156    pass
157
158
159class LeakedCallbackError(Exception):
160    pass
161
162
163class BadYieldError(Exception):
164    pass
165
166
167class ReturnValueIgnoredError(Exception):
168    pass
169
170
171def _value_from_stopiteration(e):
172    try:
173        # StopIteration has a value attribute beginning in py33.
174        # So does our Return class.
175        return e.value
176    except AttributeError:
177        pass
178    try:
179        # Cython backports coroutine functionality by putting the value in
180        # e.args[0].
181        return e.args[0]
182    except (AttributeError, IndexError):
183        return None
184
185
186def _create_future():
187    future = Future()
188    # Fixup asyncio debug info by removing extraneous stack entries
189    source_traceback = getattr(future, "_source_traceback", ())
190    while source_traceback:
191        # Each traceback entry is equivalent to a
192        # (filename, self.lineno, self.name, self.line) tuple
193        filename = source_traceback[-1][0]
194        if filename == __file__:
195            del source_traceback[-1]
196        else:
197            break
198    return future
199
200
201def engine(func):
202    """Callback-oriented decorator for asynchronous generators.
203
204    This is an older interface; for new code that does not need to be
205    compatible with versions of Tornado older than 3.0 the
206    `coroutine` decorator is recommended instead.
207
208    This decorator is similar to `coroutine`, except it does not
209    return a `.Future` and the ``callback`` argument is not treated
210    specially.
211
212    In most cases, functions decorated with `engine` should take
213    a ``callback`` argument and invoke it with their result when
214    they are finished.  One notable exception is the
215    `~tornado.web.RequestHandler` :ref:`HTTP verb methods <verbs>`,
216    which use ``self.finish()`` in place of a callback argument.
217
218    .. deprecated:: 5.1
219
220       This decorator will be removed in 6.0. Use `coroutine` or
221       ``async def`` instead.
222    """
223    warnings.warn("gen.engine is deprecated, use gen.coroutine or async def instead",
224                  DeprecationWarning)
225    func = _make_coroutine_wrapper(func, replace_callback=False)
226
227    @functools.wraps(func)
228    def wrapper(*args, **kwargs):
229        future = func(*args, **kwargs)
230
231        def final_callback(future):
232            if future.result() is not None:
233                raise ReturnValueIgnoredError(
234                    "@gen.engine functions cannot return values: %r" %
235                    (future.result(),))
236        # The engine interface doesn't give us any way to return
237        # errors but to raise them into the stack context.
238        # Save the stack context here to use when the Future has resolved.
239        future_add_done_callback(future, stack_context.wrap(final_callback))
240    return wrapper
241
242
243def coroutine(func):
244    """Decorator for asynchronous generators.
245
246    Any generator that yields objects from this module must be wrapped
247    in either this decorator or `engine`.
248
249    Coroutines may "return" by raising the special exception
250    `Return(value) <Return>`.  In Python 3.3+, it is also possible for
251    the function to simply use the ``return value`` statement (prior to
252    Python 3.3 generators were not allowed to also return values).
253    In all versions of Python a coroutine that simply wishes to exit
254    early may use the ``return`` statement without a value.
255
256    Functions with this decorator return a `.Future`.  Additionally,
257    they may be called with a ``callback`` keyword argument, which
258    will be invoked with the future's result when it resolves.  If the
259    coroutine fails, the callback will not be run and an exception
260    will be raised into the surrounding `.StackContext`.  The
261    ``callback`` argument is not visible inside the decorated
262    function; it is handled by the decorator itself.
263
264    .. warning::
265
266       When exceptions occur inside a coroutine, the exception
267       information will be stored in the `.Future` object. You must
268       examine the result of the `.Future` object, or the exception
269       may go unnoticed by your code. This means yielding the function
270       if called from another coroutine, using something like
271       `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
272       to `.IOLoop.add_future`.
273
274    .. deprecated:: 5.1
275
276       The ``callback`` argument is deprecated and will be removed in 6.0.
277       Use the returned awaitable object instead.
278    """
279    return _make_coroutine_wrapper(func, replace_callback=True)
280
281
282def _make_coroutine_wrapper(func, replace_callback):
283    """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
284
285    The two decorators differ in their treatment of the ``callback``
286    argument, so we cannot simply implement ``@engine`` in terms of
287    ``@coroutine``.
288    """
289    # On Python 3.5, set the coroutine flag on our generator, to allow it
290    # to be used with 'await'.
291    wrapped = func
292    if hasattr(types, 'coroutine'):
293        func = types.coroutine(func)
294
295    @functools.wraps(wrapped)
296    def wrapper(*args, **kwargs):
297        future = _create_future()
298
299        if replace_callback and 'callback' in kwargs:
300            warnings.warn("callback arguments are deprecated, use the returned Future instead",
301                          DeprecationWarning, stacklevel=2)
302            callback = kwargs.pop('callback')
303            IOLoop.current().add_future(
304                future, lambda future: callback(future.result()))
305
306        try:
307            result = func(*args, **kwargs)
308        except (Return, StopIteration) as e:
309            result = _value_from_stopiteration(e)
310        except Exception:
311            future_set_exc_info(future, sys.exc_info())
312            try:
313                return future
314            finally:
315                # Avoid circular references
316                future = None
317        else:
318            if isinstance(result, GeneratorType):
319                # Inline the first iteration of Runner.run.  This lets us
320                # avoid the cost of creating a Runner when the coroutine
321                # never actually yields, which in turn allows us to
322                # use "optional" coroutines in critical path code without
323                # performance penalty for the synchronous case.
324                try:
325                    orig_stack_contexts = stack_context._state.contexts
326                    yielded = next(result)
327                    if stack_context._state.contexts is not orig_stack_contexts:
328                        yielded = _create_future()
329                        yielded.set_exception(
330                            stack_context.StackContextInconsistentError(
331                                'stack_context inconsistency (probably caused '
332                                'by yield within a "with StackContext" block)'))
333                except (StopIteration, Return) as e:
334                    future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
335                except Exception:
336                    future_set_exc_info(future, sys.exc_info())
337                else:
338                    # Provide strong references to Runner objects as long
339                    # as their result future objects also have strong
340                    # references (typically from the parent coroutine's
341                    # Runner). This keeps the coroutine's Runner alive.
342                    # We do this by exploiting the public API
343                    # add_done_callback() instead of putting a private
344                    # attribute on the Future.
345                    # (Github issues #1769, #2229).
346                    runner = Runner(result, future, yielded)
347                    future.add_done_callback(lambda _: runner)
348                yielded = None
349                try:
350                    return future
351                finally:
352                    # Subtle memory optimization: if next() raised an exception,
353                    # the future's exc_info contains a traceback which
354                    # includes this stack frame.  This creates a cycle,
355                    # which will be collected at the next full GC but has
356                    # been shown to greatly increase memory usage of
357                    # benchmarks (relative to the refcount-based scheme
358                    # used in the absence of cycles).  We can avoid the
359                    # cycle by clearing the local variable after we return it.
360                    future = None
361        future_set_result_unless_cancelled(future, result)
362        return future
363
364    wrapper.__wrapped__ = wrapped
365    wrapper.__tornado_coroutine__ = True
366    return wrapper
367
368
369def is_coroutine_function(func):
370    """Return whether *func* is a coroutine function, i.e. a function
371    wrapped with `~.gen.coroutine`.
372
373    .. versionadded:: 4.5
374    """
375    return getattr(func, '__tornado_coroutine__', False)
376
377
378class Return(Exception):
379    """Special exception to return a value from a `coroutine`.
380
381    If this exception is raised, its value argument is used as the
382    result of the coroutine::
383
384        @gen.coroutine
385        def fetch_json(url):
386            response = yield AsyncHTTPClient().fetch(url)
387            raise gen.Return(json_decode(response.body))
388
389    In Python 3.3, this exception is no longer necessary: the ``return``
390    statement can be used directly to return a value (previously
391    ``yield`` and ``return`` with a value could not be combined in the
392    same function).
393
394    By analogy with the return statement, the value argument is optional,
395    but it is never necessary to ``raise gen.Return()``.  The ``return``
396    statement can be used with no arguments instead.
397    """
398    def __init__(self, value=None):
399        super(Return, self).__init__()
400        self.value = value
401        # Cython recognizes subclasses of StopIteration with a .args tuple.
402        self.args = (value,)
403
404
405class WaitIterator(object):
406    """Provides an iterator to yield the results of futures as they finish.
407
408    Yielding a set of futures like this:
409
410    ``results = yield [future1, future2]``
411
412    pauses the coroutine until both ``future1`` and ``future2``
413    return, and then restarts the coroutine with the results of both
414    futures. If either future is an exception, the expression will
415    raise that exception and all the results will be lost.
416
417    If you need to get the result of each future as soon as possible,
418    or if you need the result of some futures even if others produce
419    errors, you can use ``WaitIterator``::
420
421      wait_iterator = gen.WaitIterator(future1, future2)
422      while not wait_iterator.done():
423          try:
424              result = yield wait_iterator.next()
425          except Exception as e:
426              print("Error {} from {}".format(e, wait_iterator.current_future))
427          else:
428              print("Result {} received from {} at {}".format(
429                  result, wait_iterator.current_future,
430                  wait_iterator.current_index))
431
432    Because results are returned as soon as they are available the
433    output from the iterator *will not be in the same order as the
434    input arguments*. If you need to know which future produced the
435    current result, you can use the attributes
436    ``WaitIterator.current_future``, or ``WaitIterator.current_index``
437    to get the index of the future from the input list. (if keyword
438    arguments were used in the construction of the `WaitIterator`,
439    ``current_index`` will use the corresponding keyword).
440
441    On Python 3.5, `WaitIterator` implements the async iterator
442    protocol, so it can be used with the ``async for`` statement (note
443    that in this version the entire iteration is aborted if any value
444    raises an exception, while the previous example can continue past
445    individual errors)::
446
447      async for result in gen.WaitIterator(future1, future2):
448          print("Result {} received from {} at {}".format(
449              result, wait_iterator.current_future,
450              wait_iterator.current_index))
451
452    .. versionadded:: 4.1
453
454    .. versionchanged:: 4.3
455       Added ``async for`` support in Python 3.5.
456
457    """
458    def __init__(self, *args, **kwargs):
459        if args and kwargs:
460            raise ValueError(
461                "You must provide args or kwargs, not both")
462
463        if kwargs:
464            self._unfinished = dict((f, k) for (k, f) in kwargs.items())
465            futures = list(kwargs.values())
466        else:
467            self._unfinished = dict((f, i) for (i, f) in enumerate(args))
468            futures = args
469
470        self._finished = collections.deque()
471        self.current_index = self.current_future = None
472        self._running_future = None
473
474        for future in futures:
475            future_add_done_callback(future, self._done_callback)
476
477    def done(self):
478        """Returns True if this iterator has no more results."""
479        if self._finished or self._unfinished:
480            return False
481        # Clear the 'current' values when iteration is done.
482        self.current_index = self.current_future = None
483        return True
484
485    def next(self):
486        """Returns a `.Future` that will yield the next available result.
487
488        Note that this `.Future` will not be the same object as any of
489        the inputs.
490        """
491        self._running_future = Future()
492
493        if self._finished:
494            self._return_result(self._finished.popleft())
495
496        return self._running_future
497
498    def _done_callback(self, done):
499        if self._running_future and not self._running_future.done():
500            self._return_result(done)
501        else:
502            self._finished.append(done)
503
504    def _return_result(self, done):
505        """Called set the returned future's state that of the future
506        we yielded, and set the current future for the iterator.
507        """
508        chain_future(done, self._running_future)
509
510        self.current_future = done
511        self.current_index = self._unfinished.pop(done)
512
513    def __aiter__(self):
514        return self
515
516    def __anext__(self):
517        if self.done():
518            # Lookup by name to silence pyflakes on older versions.
519            raise getattr(builtins, 'StopAsyncIteration')()
520        return self.next()
521
522
523class YieldPoint(object):
524    """Base class for objects that may be yielded from the generator.
525
526    .. deprecated:: 4.0
527       Use `Futures <.Future>` instead. This class and all its subclasses
528       will be removed in 6.0
529    """
530    def __init__(self):
531        warnings.warn("YieldPoint is deprecated, use Futures instead",
532                      DeprecationWarning)
533
534    def start(self, runner):
535        """Called by the runner after the generator has yielded.
536
537        No other methods will be called on this object before ``start``.
538        """
539        raise NotImplementedError()
540
541    def is_ready(self):
542        """Called by the runner to determine whether to resume the generator.
543
544        Returns a boolean; may be called more than once.
545        """
546        raise NotImplementedError()
547
548    def get_result(self):
549        """Returns the value to use as the result of the yield expression.
550
551        This method will only be called once, and only after `is_ready`
552        has returned true.
553        """
554        raise NotImplementedError()
555
556
557class Callback(YieldPoint):
558    """Returns a callable object that will allow a matching `Wait` to proceed.
559
560    The key may be any value suitable for use as a dictionary key, and is
561    used to match ``Callbacks`` to their corresponding ``Waits``.  The key
562    must be unique among outstanding callbacks within a single run of the
563    generator function, but may be reused across different runs of the same
564    function (so constants generally work fine).
565
566    The callback may be called with zero or one arguments; if an argument
567    is given it will be returned by `Wait`.
568
569    .. deprecated:: 4.0
570       Use `Futures <.Future>` instead. This class will be removed in 6.0.
571    """
572    def __init__(self, key):
573        warnings.warn("gen.Callback is deprecated, use Futures instead",
574                      DeprecationWarning)
575        self.key = key
576
577    def start(self, runner):
578        self.runner = runner
579        runner.register_callback(self.key)
580
581    def is_ready(self):
582        return True
583
584    def get_result(self):
585        return self.runner.result_callback(self.key)
586
587
588class Wait(YieldPoint):
589    """Returns the argument passed to the result of a previous `Callback`.
590
591    .. deprecated:: 4.0
592       Use `Futures <.Future>` instead. This class will be removed in 6.0.
593    """
594    def __init__(self, key):
595        warnings.warn("gen.Wait is deprecated, use Futures instead",
596                      DeprecationWarning)
597        self.key = key
598
599    def start(self, runner):
600        self.runner = runner
601
602    def is_ready(self):
603        return self.runner.is_ready(self.key)
604
605    def get_result(self):
606        return self.runner.pop_result(self.key)
607
608
609class WaitAll(YieldPoint):
610    """Returns the results of multiple previous `Callbacks <Callback>`.
611
612    The argument is a sequence of `Callback` keys, and the result is
613    a list of results in the same order.
614
615    `WaitAll` is equivalent to yielding a list of `Wait` objects.
616
617    .. deprecated:: 4.0
618       Use `Futures <.Future>` instead. This class will be removed in 6.0.
619    """
620    def __init__(self, keys):
621        warnings.warn("gen.WaitAll is deprecated, use gen.multi instead",
622                      DeprecationWarning)
623        self.keys = keys
624
625    def start(self, runner):
626        self.runner = runner
627
628    def is_ready(self):
629        return all(self.runner.is_ready(key) for key in self.keys)
630
631    def get_result(self):
632        return [self.runner.pop_result(key) for key in self.keys]
633
634
635def Task(func, *args, **kwargs):
636    """Adapts a callback-based asynchronous function for use in coroutines.
637
638    Takes a function (and optional additional arguments) and runs it with
639    those arguments plus a ``callback`` keyword argument.  The argument passed
640    to the callback is returned as the result of the yield expression.
641
642    .. versionchanged:: 4.0
643       ``gen.Task`` is now a function that returns a `.Future`, instead of
644       a subclass of `YieldPoint`.  It still behaves the same way when
645       yielded.
646
647    .. deprecated:: 5.1
648       This function is deprecated and will be removed in 6.0.
649    """
650    warnings.warn("gen.Task is deprecated, use Futures instead",
651                  DeprecationWarning)
652    future = _create_future()
653
654    def handle_exception(typ, value, tb):
655        if future.done():
656            return False
657        future_set_exc_info(future, (typ, value, tb))
658        return True
659
660    def set_result(result):
661        if future.done():
662            return
663        future_set_result_unless_cancelled(future, result)
664    with stack_context.ExceptionStackContext(handle_exception):
665        func(*args, callback=_argument_adapter(set_result), **kwargs)
666    return future
667
668
669class YieldFuture(YieldPoint):
670    def __init__(self, future):
671        """Adapts a `.Future` to the `YieldPoint` interface.
672
673        .. versionchanged:: 5.0
674           The ``io_loop`` argument (deprecated since version 4.1) has been removed.
675
676        .. deprecated:: 5.1
677           This class will be removed in 6.0.
678        """
679        warnings.warn("YieldFuture is deprecated, use Futures instead",
680                      DeprecationWarning)
681        self.future = future
682        self.io_loop = IOLoop.current()
683
684    def start(self, runner):
685        if not self.future.done():
686            self.runner = runner
687            self.key = object()
688            runner.register_callback(self.key)
689            self.io_loop.add_future(self.future, runner.result_callback(self.key))
690        else:
691            self.runner = None
692            self.result_fn = self.future.result
693
694    def is_ready(self):
695        if self.runner is not None:
696            return self.runner.is_ready(self.key)
697        else:
698            return True
699
700    def get_result(self):
701        if self.runner is not None:
702            return self.runner.pop_result(self.key).result()
703        else:
704            return self.result_fn()
705
706
707def _contains_yieldpoint(children):
708    """Returns True if ``children`` contains any YieldPoints.
709
710    ``children`` may be a dict or a list, as used by `MultiYieldPoint`
711    and `multi_future`.
712    """
713    if isinstance(children, dict):
714        return any(isinstance(i, YieldPoint) for i in children.values())
715    if isinstance(children, list):
716        return any(isinstance(i, YieldPoint) for i in children)
717    return False
718
719
720def multi(children, quiet_exceptions=()):
721    """Runs multiple asynchronous operations in parallel.
722
723    ``children`` may either be a list or a dict whose values are
724    yieldable objects. ``multi()`` returns a new yieldable
725    object that resolves to a parallel structure containing their
726    results. If ``children`` is a list, the result is a list of
727    results in the same order; if it is a dict, the result is a dict
728    with the same keys.
729
730    That is, ``results = yield multi(list_of_futures)`` is equivalent
731    to::
732
733        results = []
734        for future in list_of_futures:
735            results.append(yield future)
736
737    If any children raise exceptions, ``multi()`` will raise the first
738    one. All others will be logged, unless they are of types
739    contained in the ``quiet_exceptions`` argument.
740
741    If any of the inputs are `YieldPoints <YieldPoint>`, the returned
742    yieldable object is a `YieldPoint`. Otherwise, returns a `.Future`.
743    This means that the result of `multi` can be used in a native
744    coroutine if and only if all of its children can be.
745
746    In a ``yield``-based coroutine, it is not normally necessary to
747    call this function directly, since the coroutine runner will
748    do it automatically when a list or dict is yielded. However,
749    it is necessary in ``await``-based coroutines, or to pass
750    the ``quiet_exceptions`` argument.
751
752    This function is available under the names ``multi()`` and ``Multi()``
753    for historical reasons.
754
755    Cancelling a `.Future` returned by ``multi()`` does not cancel its
756    children. `asyncio.gather` is similar to ``multi()``, but it does
757    cancel its children.
758
759    .. versionchanged:: 4.2
760       If multiple yieldables fail, any exceptions after the first
761       (which is raised) will be logged. Added the ``quiet_exceptions``
762       argument to suppress this logging for selected exception types.
763
764    .. versionchanged:: 4.3
765       Replaced the class ``Multi`` and the function ``multi_future``
766       with a unified function ``multi``. Added support for yieldables
767       other than `YieldPoint` and `.Future`.
768
769    """
770    if _contains_yieldpoint(children):
771        return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions)
772    else:
773        return multi_future(children, quiet_exceptions=quiet_exceptions)
774
775
776Multi = multi
777
778
779class MultiYieldPoint(YieldPoint):
780    """Runs multiple asynchronous operations in parallel.
781
782    This class is similar to `multi`, but it always creates a stack
783    context even when no children require it. It is not compatible with
784    native coroutines.
785
786    .. versionchanged:: 4.2
787       If multiple ``YieldPoints`` fail, any exceptions after the first
788       (which is raised) will be logged. Added the ``quiet_exceptions``
789       argument to suppress this logging for selected exception types.
790
791    .. versionchanged:: 4.3
792       Renamed from ``Multi`` to ``MultiYieldPoint``. The name ``Multi``
793       remains as an alias for the equivalent `multi` function.
794
795    .. deprecated:: 4.3
796       Use `multi` instead. This class will be removed in 6.0.
797    """
798    def __init__(self, children, quiet_exceptions=()):
799        warnings.warn("MultiYieldPoint is deprecated, use Futures instead",
800                      DeprecationWarning)
801        self.keys = None
802        if isinstance(children, dict):
803            self.keys = list(children.keys())
804            children = children.values()
805        self.children = []
806        for i in children:
807            if not isinstance(i, YieldPoint):
808                i = convert_yielded(i)
809            if is_future(i):
810                i = YieldFuture(i)
811            self.children.append(i)
812        assert all(isinstance(i, YieldPoint) for i in self.children)
813        self.unfinished_children = set(self.children)
814        self.quiet_exceptions = quiet_exceptions
815
816    def start(self, runner):
817        for i in self.children:
818            i.start(runner)
819
820    def is_ready(self):
821        finished = list(itertools.takewhile(
822            lambda i: i.is_ready(), self.unfinished_children))
823        self.unfinished_children.difference_update(finished)
824        return not self.unfinished_children
825
826    def get_result(self):
827        result_list = []
828        exc_info = None
829        for f in self.children:
830            try:
831                result_list.append(f.get_result())
832            except Exception as e:
833                if exc_info is None:
834                    exc_info = sys.exc_info()
835                else:
836                    if not isinstance(e, self.quiet_exceptions):
837                        app_log.error("Multiple exceptions in yield list",
838                                      exc_info=True)
839        if exc_info is not None:
840            raise_exc_info(exc_info)
841        if self.keys is not None:
842            return dict(zip(self.keys, result_list))
843        else:
844            return list(result_list)
845
846
847def multi_future(children, quiet_exceptions=()):
848    """Wait for multiple asynchronous futures in parallel.
849
850    This function is similar to `multi`, but does not support
851    `YieldPoints <YieldPoint>`.
852
853    .. versionadded:: 4.0
854
855    .. versionchanged:: 4.2
856       If multiple ``Futures`` fail, any exceptions after the first (which is
857       raised) will be logged. Added the ``quiet_exceptions``
858       argument to suppress this logging for selected exception types.
859
860    .. deprecated:: 4.3
861       Use `multi` instead.
862    """
863    if isinstance(children, dict):
864        keys = list(children.keys())
865        children = children.values()
866    else:
867        keys = None
868    children = list(map(convert_yielded, children))
869    assert all(is_future(i) or isinstance(i, _NullFuture) for i in children)
870    unfinished_children = set(children)
871
872    future = _create_future()
873    if not children:
874        future_set_result_unless_cancelled(future,
875                                           {} if keys is not None else [])
876
877    def callback(f):
878        unfinished_children.remove(f)
879        if not unfinished_children:
880            result_list = []
881            for f in children:
882                try:
883                    result_list.append(f.result())
884                except Exception as e:
885                    if future.done():
886                        if not isinstance(e, quiet_exceptions):
887                            app_log.error("Multiple exceptions in yield list",
888                                          exc_info=True)
889                    else:
890                        future_set_exc_info(future, sys.exc_info())
891            if not future.done():
892                if keys is not None:
893                    future_set_result_unless_cancelled(future,
894                                                       dict(zip(keys, result_list)))
895                else:
896                    future_set_result_unless_cancelled(future, result_list)
897
898    listening = set()
899    for f in children:
900        if f not in listening:
901            listening.add(f)
902            future_add_done_callback(f, callback)
903    return future
904
905
906def maybe_future(x):
907    """Converts ``x`` into a `.Future`.
908
909    If ``x`` is already a `.Future`, it is simply returned; otherwise
910    it is wrapped in a new `.Future`.  This is suitable for use as
911    ``result = yield gen.maybe_future(f())`` when you don't know whether
912    ``f()`` returns a `.Future` or not.
913
914    .. deprecated:: 4.3
915       This function only handles ``Futures``, not other yieldable objects.
916       Instead of `maybe_future`, check for the non-future result types
917       you expect (often just ``None``), and ``yield`` anything unknown.
918    """
919    if is_future(x):
920        return x
921    else:
922        fut = _create_future()
923        fut.set_result(x)
924        return fut
925
926
927def with_timeout(timeout, future, quiet_exceptions=()):
928    """Wraps a `.Future` (or other yieldable object) in a timeout.
929
930    Raises `tornado.util.TimeoutError` if the input future does not
931    complete before ``timeout``, which may be specified in any form
932    allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
933    an absolute time relative to `.IOLoop.time`)
934
935    If the wrapped `.Future` fails after it has timed out, the exception
936    will be logged unless it is of a type contained in ``quiet_exceptions``
937    (which may be an exception type or a sequence of types).
938
939    Does not support `YieldPoint` subclasses.
940
941    The wrapped `.Future` is not canceled when the timeout expires,
942    permitting it to be reused. `asyncio.wait_for` is similar to this
943    function but it does cancel the wrapped `.Future` on timeout.
944
945    .. versionadded:: 4.0
946
947    .. versionchanged:: 4.1
948       Added the ``quiet_exceptions`` argument and the logging of unhandled
949       exceptions.
950
951    .. versionchanged:: 4.4
952       Added support for yieldable objects other than `.Future`.
953
954    """
955    # TODO: allow YieldPoints in addition to other yieldables?
956    # Tricky to do with stack_context semantics.
957    #
958    # It's tempting to optimize this by cancelling the input future on timeout
959    # instead of creating a new one, but A) we can't know if we are the only
960    # one waiting on the input future, so cancelling it might disrupt other
961    # callers and B) concurrent futures can only be cancelled while they are
962    # in the queue, so cancellation cannot reliably bound our waiting time.
963    future = convert_yielded(future)
964    result = _create_future()
965    chain_future(future, result)
966    io_loop = IOLoop.current()
967
968    def error_callback(future):
969        try:
970            future.result()
971        except Exception as e:
972            if not isinstance(e, quiet_exceptions):
973                app_log.error("Exception in Future %r after timeout",
974                              future, exc_info=True)
975
976    def timeout_callback():
977        if not result.done():
978            result.set_exception(TimeoutError("Timeout"))
979        # In case the wrapped future goes on to fail, log it.
980        future_add_done_callback(future, error_callback)
981    timeout_handle = io_loop.add_timeout(
982        timeout, timeout_callback)
983    if isinstance(future, Future):
984        # We know this future will resolve on the IOLoop, so we don't
985        # need the extra thread-safety of IOLoop.add_future (and we also
986        # don't care about StackContext here.
987        future_add_done_callback(
988            future, lambda future: io_loop.remove_timeout(timeout_handle))
989    else:
990        # concurrent.futures.Futures may resolve on any thread, so we
991        # need to route them back to the IOLoop.
992        io_loop.add_future(
993            future, lambda future: io_loop.remove_timeout(timeout_handle))
994    return result
995
996
997def sleep(duration):
998    """Return a `.Future` that resolves after the given number of seconds.
999
1000    When used with ``yield`` in a coroutine, this is a non-blocking
1001    analogue to `time.sleep` (which should not be used in coroutines
1002    because it is blocking)::
1003
1004        yield gen.sleep(0.5)
1005
1006    Note that calling this function on its own does nothing; you must
1007    wait on the `.Future` it returns (usually by yielding it).
1008
1009    .. versionadded:: 4.1
1010    """
1011    f = _create_future()
1012    IOLoop.current().call_later(duration,
1013                                lambda: future_set_result_unless_cancelled(f, None))
1014    return f
1015
1016
1017class _NullFuture(object):
1018    """_NullFuture resembles a Future that finished with a result of None.
1019
1020    It's not actually a `Future` to avoid depending on a particular event loop.
1021    Handled as a special case in the coroutine runner.
1022    """
1023    def result(self):
1024        return None
1025
1026    def done(self):
1027        return True
1028
1029
1030# _null_future is used as a dummy value in the coroutine runner. It differs
1031# from moment in that moment always adds a delay of one IOLoop iteration
1032# while _null_future is processed as soon as possible.
1033_null_future = _NullFuture()
1034
1035moment = _NullFuture()
1036moment.__doc__ = \
1037    """A special object which may be yielded to allow the IOLoop to run for
1038one iteration.
1039
1040This is not needed in normal use but it can be helpful in long-running
1041coroutines that are likely to yield Futures that are ready instantly.
1042
1043Usage: ``yield gen.moment``
1044
1045.. versionadded:: 4.0
1046
1047.. deprecated:: 4.5
1048   ``yield None`` (or ``yield`` with no argument) is now equivalent to
1049    ``yield gen.moment``.
1050"""
1051
1052
1053class Runner(object):
1054    """Internal implementation of `tornado.gen.engine`.
1055
1056    Maintains information about pending callbacks and their results.
1057
1058    The results of the generator are stored in ``result_future`` (a
1059    `.Future`)
1060    """
1061    def __init__(self, gen, result_future, first_yielded):
1062        self.gen = gen
1063        self.result_future = result_future
1064        self.future = _null_future
1065        self.yield_point = None
1066        self.pending_callbacks = None
1067        self.results = None
1068        self.running = False
1069        self.finished = False
1070        self.had_exception = False
1071        self.io_loop = IOLoop.current()
1072        # For efficiency, we do not create a stack context until we
1073        # reach a YieldPoint (stack contexts are required for the historical
1074        # semantics of YieldPoints, but not for Futures).  When we have
1075        # done so, this field will be set and must be called at the end
1076        # of the coroutine.
1077        self.stack_context_deactivate = None
1078        if self.handle_yield(first_yielded):
1079            gen = result_future = first_yielded = None
1080            self.run()
1081
1082    def register_callback(self, key):
1083        """Adds ``key`` to the list of callbacks."""
1084        if self.pending_callbacks is None:
1085            # Lazily initialize the old-style YieldPoint data structures.
1086            self.pending_callbacks = set()
1087            self.results = {}
1088        if key in self.pending_callbacks:
1089            raise KeyReuseError("key %r is already pending" % (key,))
1090        self.pending_callbacks.add(key)
1091
1092    def is_ready(self, key):
1093        """Returns true if a result is available for ``key``."""
1094        if self.pending_callbacks is None or key not in self.pending_callbacks:
1095            raise UnknownKeyError("key %r is not pending" % (key,))
1096        return key in self.results
1097
1098    def set_result(self, key, result):
1099        """Sets the result for ``key`` and attempts to resume the generator."""
1100        self.results[key] = result
1101        if self.yield_point is not None and self.yield_point.is_ready():
1102            try:
1103                future_set_result_unless_cancelled(self.future,
1104                                                   self.yield_point.get_result())
1105            except:
1106                future_set_exc_info(self.future, sys.exc_info())
1107            self.yield_point = None
1108            self.run()
1109
1110    def pop_result(self, key):
1111        """Returns the result for ``key`` and unregisters it."""
1112        self.pending_callbacks.remove(key)
1113        return self.results.pop(key)
1114
1115    def run(self):
1116        """Starts or resumes the generator, running until it reaches a
1117        yield point that is not ready.
1118        """
1119        if self.running or self.finished:
1120            return
1121        try:
1122            self.running = True
1123            while True:
1124                future = self.future
1125                if not future.done():
1126                    return
1127                self.future = None
1128                try:
1129                    orig_stack_contexts = stack_context._state.contexts
1130                    exc_info = None
1131
1132                    try:
1133                        value = future.result()
1134                    except Exception:
1135                        self.had_exception = True
1136                        exc_info = sys.exc_info()
1137                    future = None
1138
1139                    if exc_info is not None:
1140                        try:
1141                            yielded = self.gen.throw(*exc_info)
1142                        finally:
1143                            # Break up a reference to itself
1144                            # for faster GC on CPython.
1145                            exc_info = None
1146                    else:
1147                        yielded = self.gen.send(value)
1148
1149                    if stack_context._state.contexts is not orig_stack_contexts:
1150                        self.gen.throw(
1151                            stack_context.StackContextInconsistentError(
1152                                'stack_context inconsistency (probably caused '
1153                                'by yield within a "with StackContext" block)'))
1154                except (StopIteration, Return) as e:
1155                    self.finished = True
1156                    self.future = _null_future
1157                    if self.pending_callbacks and not self.had_exception:
1158                        # If we ran cleanly without waiting on all callbacks
1159                        # raise an error (really more of a warning).  If we
1160                        # had an exception then some callbacks may have been
1161                        # orphaned, so skip the check in that case.
1162                        raise LeakedCallbackError(
1163                            "finished without waiting for callbacks %r" %
1164                            self.pending_callbacks)
1165                    future_set_result_unless_cancelled(self.result_future,
1166                                                       _value_from_stopiteration(e))
1167                    self.result_future = None
1168                    self._deactivate_stack_context()
1169                    return
1170                except Exception:
1171                    self.finished = True
1172                    self.future = _null_future
1173                    future_set_exc_info(self.result_future, sys.exc_info())
1174                    self.result_future = None
1175                    self._deactivate_stack_context()
1176                    return
1177                if not self.handle_yield(yielded):
1178                    return
1179                yielded = None
1180        finally:
1181            self.running = False
1182
1183    def handle_yield(self, yielded):
1184        # Lists containing YieldPoints require stack contexts;
1185        # other lists are handled in convert_yielded.
1186        if _contains_yieldpoint(yielded):
1187            yielded = multi(yielded)
1188
1189        if isinstance(yielded, YieldPoint):
1190            # YieldPoints are too closely coupled to the Runner to go
1191            # through the generic convert_yielded mechanism.
1192            self.future = Future()
1193
1194            def start_yield_point():
1195                try:
1196                    yielded.start(self)
1197                    if yielded.is_ready():
1198                        future_set_result_unless_cancelled(self.future, yielded.get_result())
1199                    else:
1200                        self.yield_point = yielded
1201                except Exception:
1202                    self.future = Future()
1203                    future_set_exc_info(self.future, sys.exc_info())
1204
1205            if self.stack_context_deactivate is None:
1206                # Start a stack context if this is the first
1207                # YieldPoint we've seen.
1208                with stack_context.ExceptionStackContext(
1209                        self.handle_exception) as deactivate:
1210                    self.stack_context_deactivate = deactivate
1211
1212                    def cb():
1213                        start_yield_point()
1214                        self.run()
1215                    self.io_loop.add_callback(cb)
1216                    return False
1217            else:
1218                start_yield_point()
1219        else:
1220            try:
1221                self.future = convert_yielded(yielded)
1222            except BadYieldError:
1223                self.future = Future()
1224                future_set_exc_info(self.future, sys.exc_info())
1225
1226        if self.future is moment:
1227            self.io_loop.add_callback(self.run)
1228            return False
1229        elif not self.future.done():
1230            def inner(f):
1231                # Break a reference cycle to speed GC.
1232                f = None  # noqa
1233                self.run()
1234            self.io_loop.add_future(
1235                self.future, inner)
1236            return False
1237        return True
1238
1239    def result_callback(self, key):
1240        return stack_context.wrap(_argument_adapter(
1241            functools.partial(self.set_result, key)))
1242
1243    def handle_exception(self, typ, value, tb):
1244        if not self.running and not self.finished:
1245            self.future = Future()
1246            future_set_exc_info(self.future, (typ, value, tb))
1247            self.run()
1248            return True
1249        else:
1250            return False
1251
1252    def _deactivate_stack_context(self):
1253        if self.stack_context_deactivate is not None:
1254            self.stack_context_deactivate()
1255            self.stack_context_deactivate = None
1256
1257
1258Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
1259
1260
1261def _argument_adapter(callback):
1262    """Returns a function that when invoked runs ``callback`` with one arg.
1263
1264    If the function returned by this function is called with exactly
1265    one argument, that argument is passed to ``callback``.  Otherwise
1266    the args tuple and kwargs dict are wrapped in an `Arguments` object.
1267    """
1268    def wrapper(*args, **kwargs):
1269        if kwargs or len(args) > 1:
1270            callback(Arguments(args, kwargs))
1271        elif args:
1272            callback(args[0])
1273        else:
1274            callback(None)
1275    return wrapper
1276
1277
1278# Convert Awaitables into Futures.
1279try:
1280    import asyncio
1281except ImportError:
1282    # Py2-compatible version for use with Cython.
1283    # Copied from PEP 380.
1284    @coroutine
1285    def _wrap_awaitable(x):
1286        if hasattr(x, '__await__'):
1287            _i = x.__await__()
1288        else:
1289            _i = iter(x)
1290        try:
1291            _y = next(_i)
1292        except StopIteration as _e:
1293            _r = _value_from_stopiteration(_e)
1294        else:
1295            while 1:
1296                try:
1297                    _s = yield _y
1298                except GeneratorExit as _e:
1299                    try:
1300                        _m = _i.close
1301                    except AttributeError:
1302                        pass
1303                    else:
1304                        _m()
1305                    raise _e
1306                except BaseException as _e:
1307                    _x = sys.exc_info()
1308                    try:
1309                        _m = _i.throw
1310                    except AttributeError:
1311                        raise _e
1312                    else:
1313                        try:
1314                            _y = _m(*_x)
1315                        except StopIteration as _e:
1316                            _r = _value_from_stopiteration(_e)
1317                            break
1318                else:
1319                    try:
1320                        if _s is None:
1321                            _y = next(_i)
1322                        else:
1323                            _y = _i.send(_s)
1324                    except StopIteration as _e:
1325                        _r = _value_from_stopiteration(_e)
1326                        break
1327        raise Return(_r)
1328else:
1329    try:
1330        _wrap_awaitable = asyncio.ensure_future
1331    except AttributeError:
1332        # asyncio.ensure_future was introduced in Python 3.4.4, but
1333        # Debian jessie still ships with 3.4.2 so try the old name.
1334        _wrap_awaitable = getattr(asyncio, 'async')
1335
1336
1337def convert_yielded(yielded):
1338    """Convert a yielded object into a `.Future`.
1339
1340    The default implementation accepts lists, dictionaries, and Futures.
1341
1342    If the `~functools.singledispatch` library is available, this function
1343    may be extended to support additional types. For example::
1344
1345        @convert_yielded.register(asyncio.Future)
1346        def _(asyncio_future):
1347            return tornado.platform.asyncio.to_tornado_future(asyncio_future)
1348
1349    .. versionadded:: 4.1
1350    """
1351    # Lists and dicts containing YieldPoints were handled earlier.
1352    if yielded is None or yielded is moment:
1353        return moment
1354    elif yielded is _null_future:
1355        return _null_future
1356    elif isinstance(yielded, (list, dict)):
1357        return multi(yielded)
1358    elif is_future(yielded):
1359        return yielded
1360    elif isawaitable(yielded):
1361        return _wrap_awaitable(yielded)
1362    else:
1363        raise BadYieldError("yielded unknown object %r" % (yielded,))
1364
1365
1366if singledispatch is not None:
1367    convert_yielded = singledispatch(convert_yielded)
1368