1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['Task',
4           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5           'wait', 'wait_for', 'as_completed', 'sleep', 'async',
6           'gather', 'shield',
7           ]
8
9import concurrent.futures
10import functools
11import inspect
12import linecache
13import sys
14import traceback
15import weakref
16
17from . import coroutines
18from . import events
19from . import futures
20from .coroutines import coroutine
21
22_PY34 = (sys.version_info >= (3, 4))
23
24
25class Task(futures.Future):
26    """A coroutine wrapped in a Future."""
27
28    # An important invariant maintained while a Task not done:
29    #
30    # - Either _fut_waiter is None, and _step() is scheduled;
31    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32    #
33    # The only transition from the latter to the former is through
34    # _wakeup().  When _fut_waiter is not None, one of its callbacks
35    # must be _wakeup().
36
37    # Weak set containing all tasks alive.
38    _all_tasks = weakref.WeakSet()
39
40    # Dictionary containing tasks that are currently active in
41    # all running event loops.  {EventLoop: Task}
42    _current_tasks = {}
43
44    # If False, don't log a message if the task is destroyed whereas its
45    # status is still pending
46    _log_destroy_pending = True
47
48    @classmethod
49    def current_task(cls, loop=None):
50        """Return the currently running task in an event loop or None.
51
52        By default the current task for the current event loop is returned.
53
54        None is returned when called not in the context of a Task.
55        """
56        if loop is None:
57            loop = events.get_event_loop()
58        return cls._current_tasks.get(loop)
59
60    @classmethod
61    def all_tasks(cls, loop=None):
62        """Return a set of all tasks for an event loop.
63
64        By default all tasks for the current event loop are returned.
65        """
66        if loop is None:
67            loop = events.get_event_loop()
68        return {t for t in cls._all_tasks if t._loop is loop}
69
70    def __init__(self, coro, *, loop=None):
71        assert coroutines.iscoroutine(coro), repr(coro)
72        super().__init__(loop=loop)
73        if self._source_traceback:
74            del self._source_traceback[-1]
75        self._coro = iter(coro)  # Use the iterator just in case.
76        self._fut_waiter = None
77        self._must_cancel = False
78        self._loop.call_soon(self._step)
79        self.__class__._all_tasks.add(self)
80
81    # On Python 3.3 or older, objects with a destructor that are part of a
82    # reference cycle are never destroyed. That's not the case any more on
83    # Python 3.4 thanks to the PEP 442.
84    if _PY34:
85        def __del__(self):
86            if self._state == futures._PENDING and self._log_destroy_pending:
87                context = {
88                    'task': self,
89                    'message': 'Task was destroyed but it is pending!',
90                }
91                if self._source_traceback:
92                    context['source_traceback'] = self._source_traceback
93                self._loop.call_exception_handler(context)
94            futures.Future.__del__(self)
95
96    def _repr_info(self):
97        info = super()._repr_info()
98
99        if self._must_cancel:
100            # replace status
101            info[0] = 'cancelling'
102
103        coro = coroutines._format_coroutine(self._coro)
104        info.insert(1, 'coro=<%s>' % coro)
105
106        if self._fut_waiter is not None:
107            info.insert(2, 'wait_for=%r' % self._fut_waiter)
108        return info
109
110    def get_stack(self, *, limit=None):
111        """Return the list of stack frames for this task's coroutine.
112
113        If the coroutine is not done, this returns the stack where it is
114        suspended.  If the coroutine has completed successfully or was
115        cancelled, this returns an empty list.  If the coroutine was
116        terminated by an exception, this returns the list of traceback
117        frames.
118
119        The frames are always ordered from oldest to newest.
120
121        The optional limit gives the maximum number of frames to
122        return; by default all available frames are returned.  Its
123        meaning differs depending on whether a stack or a traceback is
124        returned: the newest frames of a stack are returned, but the
125        oldest frames of a traceback are returned.  (This matches the
126        behavior of the traceback module.)
127
128        For reasons beyond our control, only one stack frame is
129        returned for a suspended coroutine.
130        """
131        frames = []
132        f = self._coro.gi_frame
133        if f is not None:
134            while f is not None:
135                if limit is not None:
136                    if limit <= 0:
137                        break
138                    limit -= 1
139                frames.append(f)
140                f = f.f_back
141            frames.reverse()
142        elif self._exception is not None:
143            tb = self._exception.__traceback__
144            while tb is not None:
145                if limit is not None:
146                    if limit <= 0:
147                        break
148                    limit -= 1
149                frames.append(tb.tb_frame)
150                tb = tb.tb_next
151        return frames
152
153    def print_stack(self, *, limit=None, file=None):
154        """Print the stack or traceback for this task's coroutine.
155
156        This produces output similar to that of the traceback module,
157        for the frames retrieved by get_stack().  The limit argument
158        is passed to get_stack().  The file argument is an I/O stream
159        to which the output is written; by default output is written
160        to sys.stderr.
161        """
162        extracted_list = []
163        checked = set()
164        for f in self.get_stack(limit=limit):
165            lineno = f.f_lineno
166            co = f.f_code
167            filename = co.co_filename
168            name = co.co_name
169            if filename not in checked:
170                checked.add(filename)
171                linecache.checkcache(filename)
172            line = linecache.getline(filename, lineno, f.f_globals)
173            extracted_list.append((filename, lineno, name, line))
174        exc = self._exception
175        if not extracted_list:
176            print('No stack for %r' % self, file=file)
177        elif exc is not None:
178            print('Traceback for %r (most recent call last):' % self,
179                  file=file)
180        else:
181            print('Stack for %r (most recent call last):' % self,
182                  file=file)
183        traceback.print_list(extracted_list, file=file)
184        if exc is not None:
185            for line in traceback.format_exception_only(exc.__class__, exc):
186                print(line, file=file, end='')
187
188    def cancel(self):
189        """Request that this task cancel itself.
190
191        This arranges for a CancelledError to be thrown into the
192        wrapped coroutine on the next cycle through the event loop.
193        The coroutine then has a chance to clean up or even deny
194        the request using try/except/finally.
195
196        Unlike Future.cancel, this does not guarantee that the
197        task will be cancelled: the exception might be caught and
198        acted upon, delaying cancellation of the task or preventing
199        cancellation completely.  The task may also return a value or
200        raise a different exception.
201
202        Immediately after this method is called, Task.cancelled() will
203        not return True (unless the task was already cancelled).  A
204        task will be marked as cancelled when the wrapped coroutine
205        terminates with a CancelledError exception (even if cancel()
206        was not called).
207        """
208        if self.done():
209            return False
210        if self._fut_waiter is not None:
211            if self._fut_waiter.cancel():
212                # Leave self._fut_waiter; it may be a Task that
213                # catches and ignores the cancellation so we may have
214                # to cancel it again later.
215                return True
216        # It must be the case that self._step is already scheduled.
217        self._must_cancel = True
218        return True
219
220    def _step(self, value=None, exc=None):
221        assert not self.done(), \
222            '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
223        if self._must_cancel:
224            if not isinstance(exc, futures.CancelledError):
225                exc = futures.CancelledError()
226            self._must_cancel = False
227        coro = self._coro
228        self._fut_waiter = None
229
230        self.__class__._current_tasks[self._loop] = self
231        # Call either coro.throw(exc) or coro.send(value).
232        try:
233            if exc is not None:
234                result = coro.throw(exc)
235            elif value is not None:
236                result = coro.send(value)
237            else:
238                result = next(coro)
239        except StopIteration as exc:
240            self.set_result(exc.value)
241        except futures.CancelledError as exc:
242            super().cancel()  # I.e., Future.cancel(self).
243        except Exception as exc:
244            self.set_exception(exc)
245        except BaseException as exc:
246            self.set_exception(exc)
247            raise
248        else:
249            if isinstance(result, futures.Future):
250                # Yielded Future must come from Future.__iter__().
251                if result._blocking:
252                    result._blocking = False
253                    result.add_done_callback(self._wakeup)
254                    self._fut_waiter = result
255                    if self._must_cancel:
256                        if self._fut_waiter.cancel():
257                            self._must_cancel = False
258                else:
259                    self._loop.call_soon(
260                        self._step, None,
261                        RuntimeError(
262                            'yield was used instead of yield from '
263                            'in task {!r} with {!r}'.format(self, result)))
264            elif result is None:
265                # Bare yield relinquishes control for one event loop iteration.
266                self._loop.call_soon(self._step)
267            elif inspect.isgenerator(result):
268                # Yielding a generator is just wrong.
269                self._loop.call_soon(
270                    self._step, None,
271                    RuntimeError(
272                        'yield was used instead of yield from for '
273                        'generator in task {!r} with {}'.format(
274                            self, result)))
275            else:
276                # Yielding something else is an error.
277                self._loop.call_soon(
278                    self._step, None,
279                    RuntimeError(
280                        'Task got bad yield: {!r}'.format(result)))
281        finally:
282            self.__class__._current_tasks.pop(self._loop)
283            self = None  # Needed to break cycles when an exception occurs.
284
285    def _wakeup(self, future):
286        try:
287            value = future.result()
288        except Exception as exc:
289            # This may also be a cancellation.
290            self._step(None, exc)
291        else:
292            self._step(value, None)
293        self = None  # Needed to break cycles when an exception occurs.
294
295
296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
303@coroutine
304def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
305    """Wait for the Futures and coroutines given by fs to complete.
306
307    The sequence futures must not be empty.
308
309    Coroutines will be wrapped in Tasks.
310
311    Returns two sets of Future: (done, pending).
312
313    Usage:
314
315        done, pending = yield from asyncio.wait(fs)
316
317    Note: This does not raise TimeoutError! Futures that aren't done
318    when the timeout occurs are returned in the second set.
319    """
320    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
321        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
322    if not fs:
323        raise ValueError('Set of coroutines/Futures is empty.')
324    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
325        raise ValueError('Invalid return_when value: {}'.format(return_when))
326
327    if loop is None:
328        loop = events.get_event_loop()
329
330    fs = {async(f, loop=loop) for f in set(fs)}
331
332    return (yield from _wait(fs, timeout, return_when, loop))
333
334
335def _release_waiter(waiter, *args):
336    if not waiter.done():
337        waiter.set_result(None)
338
339
340@coroutine
341def wait_for(fut, timeout, *, loop=None):
342    """Wait for the single Future or coroutine to complete, with timeout.
343
344    Coroutine will be wrapped in Task.
345
346    Returns result of the Future or coroutine.  When a timeout occurs,
347    it cancels the task and raises TimeoutError.  To avoid the task
348    cancellation, wrap it in shield().
349
350    If the wait is cancelled, the task is also cancelled.
351
352    This function is a coroutine.
353    """
354    if loop is None:
355        loop = events.get_event_loop()
356
357    if timeout is None:
358        return (yield from fut)
359
360    waiter = futures.Future(loop=loop)
361    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
362    cb = functools.partial(_release_waiter, waiter)
363
364    fut = async(fut, loop=loop)
365    fut.add_done_callback(cb)
366
367    try:
368        # wait until the future completes or the timeout
369        try:
370            yield from waiter
371        except futures.CancelledError:
372            fut.remove_done_callback(cb)
373            fut.cancel()
374            raise
375
376        if fut.done():
377            return fut.result()
378        else:
379            fut.remove_done_callback(cb)
380            fut.cancel()
381            raise futures.TimeoutError()
382    finally:
383        timeout_handle.cancel()
384
385
386@coroutine
387def _wait(fs, timeout, return_when, loop):
388    """Internal helper for wait() and _wait_for().
389
390    The fs argument must be a collection of Futures.
391    """
392    assert fs, 'Set of Futures is empty.'
393    waiter = futures.Future(loop=loop)
394    timeout_handle = None
395    if timeout is not None:
396        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
397    counter = len(fs)
398
399    def _on_completion(f):
400        nonlocal counter
401        counter -= 1
402        if (counter <= 0 or
403            return_when == FIRST_COMPLETED or
404            return_when == FIRST_EXCEPTION and (not f.cancelled() and
405                                                f.exception() is not None)):
406            if timeout_handle is not None:
407                timeout_handle.cancel()
408            if not waiter.done():
409                waiter.set_result(None)
410
411    for f in fs:
412        f.add_done_callback(_on_completion)
413
414    try:
415        yield from waiter
416    finally:
417        if timeout_handle is not None:
418            timeout_handle.cancel()
419
420    done, pending = set(), set()
421    for f in fs:
422        f.remove_done_callback(_on_completion)
423        if f.done():
424            done.add(f)
425        else:
426            pending.add(f)
427    return done, pending
428
429
430# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
431def as_completed(fs, *, loop=None, timeout=None):
432    """Return an iterator whose values are coroutines.
433
434    When waiting for the yielded coroutines you'll get the results (or
435    exceptions!) of the original Futures (or coroutines), in the order
436    in which and as soon as they complete.
437
438    This differs from PEP 3148; the proper way to use this is:
439
440        for f in as_completed(fs):
441            result = yield from f  # The 'yield from' may raise.
442            # Use result.
443
444    If a timeout is specified, the 'yield from' will raise
445    TimeoutError when the timeout occurs before all Futures are done.
446
447    Note: The futures 'f' are not necessarily members of fs.
448    """
449    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
450        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
451    loop = loop if loop is not None else events.get_event_loop()
452    todo = {async(f, loop=loop) for f in set(fs)}
453    from .queues import Queue  # Import here to avoid circular import problem.
454    done = Queue(loop=loop)
455    timeout_handle = None
456
457    def _on_timeout():
458        for f in todo:
459            f.remove_done_callback(_on_completion)
460            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
461        todo.clear()  # Can't do todo.remove(f) in the loop.
462
463    def _on_completion(f):
464        if not todo:
465            return  # _on_timeout() was here first.
466        todo.remove(f)
467        done.put_nowait(f)
468        if not todo and timeout_handle is not None:
469            timeout_handle.cancel()
470
471    @coroutine
472    def _wait_for_one():
473        f = yield from done.get()
474        if f is None:
475            # Dummy value from _on_timeout().
476            raise futures.TimeoutError
477        return f.result()  # May raise f.exception().
478
479    for f in todo:
480        f.add_done_callback(_on_completion)
481    if todo and timeout is not None:
482        timeout_handle = loop.call_later(timeout, _on_timeout)
483    for _ in range(len(todo)):
484        yield _wait_for_one()
485
486
487@coroutine
488def sleep(delay, result=None, *, loop=None):
489    """Coroutine that completes after a given time (in seconds)."""
490    future = futures.Future(loop=loop)
491    h = future._loop.call_later(delay,
492                                future._set_result_unless_cancelled, result)
493    try:
494        return (yield from future)
495    finally:
496        h.cancel()
497
498
499def async(coro_or_future, *, loop=None):
500    """Wrap a coroutine in a future.
501
502    If the argument is a Future, it is returned directly.
503    """
504    if isinstance(coro_or_future, futures.Future):
505        if loop is not None and loop is not coro_or_future._loop:
506            raise ValueError('loop argument must agree with Future')
507        return coro_or_future
508    elif coroutines.iscoroutine(coro_or_future):
509        if loop is None:
510            loop = events.get_event_loop()
511        task = loop.create_task(coro_or_future)
512        if task._source_traceback:
513            del task._source_traceback[-1]
514        return task
515    else:
516        raise TypeError('A Future or coroutine is required')
517
518
519class _GatheringFuture(futures.Future):
520    """Helper for gather().
521
522    This overrides cancel() to cancel all the children and act more
523    like Task.cancel(), which doesn't immediately mark itself as
524    cancelled.
525    """
526
527    def __init__(self, children, *, loop=None):
528        super().__init__(loop=loop)
529        self._children = children
530
531    def cancel(self):
532        if self.done():
533            return False
534        for child in self._children:
535            child.cancel()
536        return True
537
538
539def gather(*coros_or_futures, loop=None, return_exceptions=False):
540    """Return a future aggregating results from the given coroutines
541    or futures.
542
543    All futures must share the same event loop.  If all the tasks are
544    done successfully, the returned future's result is the list of
545    results (in the order of the original sequence, not necessarily
546    the order of results arrival).  If *return_exceptions* is True,
547    exceptions in the tasks are treated the same as successful
548    results, and gathered in the result list; otherwise, the first
549    raised exception will be immediately propagated to the returned
550    future.
551
552    Cancellation: if the outer Future is cancelled, all children (that
553    have not completed yet) are also cancelled.  If any child is
554    cancelled, this is treated as if it raised CancelledError --
555    the outer Future is *not* cancelled in this case.  (This is to
556    prevent the cancellation of one child to cause other children to
557    be cancelled.)
558    """
559    if not coros_or_futures:
560        outer = futures.Future(loop=loop)
561        outer.set_result([])
562        return outer
563
564    arg_to_fut = {}
565    for arg in set(coros_or_futures):
566        if not isinstance(arg, futures.Future):
567            fut = async(arg, loop=loop)
568            if loop is None:
569                loop = fut._loop
570            # The caller cannot control this future, the "destroy pending task"
571            # warning should not be emitted.
572            fut._log_destroy_pending = False
573        else:
574            fut = arg
575            if loop is None:
576                loop = fut._loop
577            elif fut._loop is not loop:
578                raise ValueError("futures are tied to different event loops")
579        arg_to_fut[arg] = fut
580
581    children = [arg_to_fut[arg] for arg in coros_or_futures]
582    nchildren = len(children)
583    outer = _GatheringFuture(children, loop=loop)
584    nfinished = 0
585    results = [None] * nchildren
586
587    def _done_callback(i, fut):
588        nonlocal nfinished
589        if outer.done():
590            if not fut.cancelled():
591                # Mark exception retrieved.
592                fut.exception()
593            return
594
595        if fut.cancelled():
596            res = futures.CancelledError()
597            if not return_exceptions:
598                outer.set_exception(res)
599                return
600        elif fut._exception is not None:
601            res = fut.exception()  # Mark exception retrieved.
602            if not return_exceptions:
603                outer.set_exception(res)
604                return
605        else:
606            res = fut._result
607        results[i] = res
608        nfinished += 1
609        if nfinished == nchildren:
610            outer.set_result(results)
611
612    for i, fut in enumerate(children):
613        fut.add_done_callback(functools.partial(_done_callback, i))
614    return outer
615
616
617def shield(arg, *, loop=None):
618    """Wait for a future, shielding it from cancellation.
619
620    The statement
621
622        res = yield from shield(something())
623
624    is exactly equivalent to the statement
625
626        res = yield from something()
627
628    *except* that if the coroutine containing it is cancelled, the
629    task running in something() is not cancelled.  From the POV of
630    something(), the cancellation did not happen.  But its caller is
631    still cancelled, so the yield-from expression still raises
632    CancelledError.  Note: If something() is cancelled by other means
633    this will still cancel shield().
634
635    If you want to completely ignore cancellation (not recommended)
636    you can combine shield() with a try/except clause, as follows:
637
638        try:
639            res = yield from shield(something())
640        except CancelledError:
641            res = None
642    """
643    inner = async(arg, loop=loop)
644    if inner.done():
645        # Shortcut.
646        return inner
647    loop = inner._loop
648    outer = futures.Future(loop=loop)
649
650    def _done_callback(inner):
651        if outer.cancelled():
652            if not inner.cancelled():
653                # Mark inner's result as retrieved.
654                inner.exception()
655            return
656
657        if inner.cancelled():
658            outer.cancel()
659        else:
660            exc = inner.exception()
661            if exc is not None:
662                outer.set_exception(exc)
663            else:
664                outer.set_result(inner.result())
665
666    inner.add_done_callback(_done_callback)
667    return outer
668