1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65    # Different from "all_task()" by returning *all* Tasks, including
66    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
67    # method.
68    if loop is None:
69        loop = events.get_event_loop()
70    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71    # thread while we do so. Therefore we cast it to list prior to filtering. The list
72    # cast itself requires iteration, so we repeat it several times ignoring
73    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74    # details.
75    i = 0
76    while True:
77        try:
78            tasks = list(_all_tasks)
79        except RuntimeError:
80            i += 1
81            if i >= 1000:
82                raise
83        else:
84            break
85    return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89    if name is not None:
90        try:
91            set_name = task.set_name
92        except AttributeError:
93            pass
94        else:
95            set_name(name)
96
97
98class Task(futures._PyFuture):  # Inherit Python Task implementation
99                                # from a Python Future implementation.
100
101    """A coroutine wrapped in a Future."""
102
103    # An important invariant maintained while a Task not done:
104    #
105    # - Either _fut_waiter is None, and _step() is scheduled;
106    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107    #
108    # The only transition from the latter to the former is through
109    # _wakeup().  When _fut_waiter is not None, one of its callbacks
110    # must be _wakeup().
111
112    # If False, don't log a message if the task is destroyed whereas its
113    # status is still pending
114    _log_destroy_pending = True
115
116    def __init__(self, coro, *, loop=None, name=None):
117        super().__init__(loop=loop)
118        if self._source_traceback:
119            del self._source_traceback[-1]
120        if not coroutines.iscoroutine(coro):
121            # raise after Future.__init__(), attrs are required for __del__
122            # prevent logging for pending task in __del__
123            self._log_destroy_pending = False
124            raise TypeError(f"a coroutine was expected, got {coro!r}")
125
126        if name is None:
127            self._name = f'Task-{_task_name_counter()}'
128        else:
129            self._name = str(name)
130
131        self._must_cancel = False
132        self._fut_waiter = None
133        self._coro = coro
134        self._context = contextvars.copy_context()
135
136        self._loop.call_soon(self.__step, context=self._context)
137        _register_task(self)
138
139    def __del__(self):
140        if self._state == futures._PENDING and self._log_destroy_pending:
141            context = {
142                'task': self,
143                'message': 'Task was destroyed but it is pending!',
144            }
145            if self._source_traceback:
146                context['source_traceback'] = self._source_traceback
147            self._loop.call_exception_handler(context)
148        super().__del__()
149
150    def __class_getitem__(cls, type):
151        return cls
152
153    def _repr_info(self):
154        return base_tasks._task_repr_info(self)
155
156    def get_coro(self):
157        return self._coro
158
159    def get_name(self):
160        return self._name
161
162    def set_name(self, value):
163        self._name = str(value)
164
165    def set_result(self, result):
166        raise RuntimeError('Task does not support set_result operation')
167
168    def set_exception(self, exception):
169        raise RuntimeError('Task does not support set_exception operation')
170
171    def get_stack(self, *, limit=None):
172        """Return the list of stack frames for this task's coroutine.
173
174        If the coroutine is not done, this returns the stack where it is
175        suspended.  If the coroutine has completed successfully or was
176        cancelled, this returns an empty list.  If the coroutine was
177        terminated by an exception, this returns the list of traceback
178        frames.
179
180        The frames are always ordered from oldest to newest.
181
182        The optional limit gives the maximum number of frames to
183        return; by default all available frames are returned.  Its
184        meaning differs depending on whether a stack or a traceback is
185        returned: the newest frames of a stack are returned, but the
186        oldest frames of a traceback are returned.  (This matches the
187        behavior of the traceback module.)
188
189        For reasons beyond our control, only one stack frame is
190        returned for a suspended coroutine.
191        """
192        return base_tasks._task_get_stack(self, limit)
193
194    def print_stack(self, *, limit=None, file=None):
195        """Print the stack or traceback for this task's coroutine.
196
197        This produces output similar to that of the traceback module,
198        for the frames retrieved by get_stack().  The limit argument
199        is passed to get_stack().  The file argument is an I/O stream
200        to which the output is written; by default output is written
201        to sys.stderr.
202        """
203        return base_tasks._task_print_stack(self, limit, file)
204
205    def cancel(self, msg=None):
206        """Request that this task cancel itself.
207
208        This arranges for a CancelledError to be thrown into the
209        wrapped coroutine on the next cycle through the event loop.
210        The coroutine then has a chance to clean up or even deny
211        the request using try/except/finally.
212
213        Unlike Future.cancel, this does not guarantee that the
214        task will be cancelled: the exception might be caught and
215        acted upon, delaying cancellation of the task or preventing
216        cancellation completely.  The task may also return a value or
217        raise a different exception.
218
219        Immediately after this method is called, Task.cancelled() will
220        not return True (unless the task was already cancelled).  A
221        task will be marked as cancelled when the wrapped coroutine
222        terminates with a CancelledError exception (even if cancel()
223        was not called).
224        """
225        self._log_traceback = False
226        if self.done():
227            return False
228        if self._fut_waiter is not None:
229            if self._fut_waiter.cancel(msg=msg):
230                # Leave self._fut_waiter; it may be a Task that
231                # catches and ignores the cancellation so we may have
232                # to cancel it again later.
233                return True
234        # It must be the case that self.__step is already scheduled.
235        self._must_cancel = True
236        self._cancel_message = msg
237        return True
238
239    def __step(self, exc=None):
240        if self.done():
241            raise exceptions.InvalidStateError(
242                f'_step(): already done: {self!r}, {exc!r}')
243        if self._must_cancel:
244            if not isinstance(exc, exceptions.CancelledError):
245                exc = self._make_cancelled_error()
246            self._must_cancel = False
247        coro = self._coro
248        self._fut_waiter = None
249
250        _enter_task(self._loop, self)
251        # Call either coro.throw(exc) or coro.send(None).
252        try:
253            if exc is None:
254                # We use the `send` method directly, because coroutines
255                # don't have `__iter__` and `__next__` methods.
256                result = coro.send(None)
257            else:
258                result = coro.throw(exc)
259        except StopIteration as exc:
260            if self._must_cancel:
261                # Task is cancelled right before coro stops.
262                self._must_cancel = False
263                super().cancel(msg=self._cancel_message)
264            else:
265                super().set_result(exc.value)
266        except exceptions.CancelledError as exc:
267            # Save the original exception so we can chain it later.
268            self._cancelled_exc = exc
269            super().cancel()  # I.e., Future.cancel(self).
270        except (KeyboardInterrupt, SystemExit) as exc:
271            super().set_exception(exc)
272            raise
273        except BaseException as exc:
274            super().set_exception(exc)
275        else:
276            blocking = getattr(result, '_asyncio_future_blocking', None)
277            if blocking is not None:
278                # Yielded Future must come from Future.__iter__().
279                if futures._get_loop(result) is not self._loop:
280                    new_exc = RuntimeError(
281                        f'Task {self!r} got Future '
282                        f'{result!r} attached to a different loop')
283                    self._loop.call_soon(
284                        self.__step, new_exc, context=self._context)
285                elif blocking:
286                    if result is self:
287                        new_exc = RuntimeError(
288                            f'Task cannot await on itself: {self!r}')
289                        self._loop.call_soon(
290                            self.__step, new_exc, context=self._context)
291                    else:
292                        result._asyncio_future_blocking = False
293                        result.add_done_callback(
294                            self.__wakeup, context=self._context)
295                        self._fut_waiter = result
296                        if self._must_cancel:
297                            if self._fut_waiter.cancel(
298                                    msg=self._cancel_message):
299                                self._must_cancel = False
300                else:
301                    new_exc = RuntimeError(
302                        f'yield was used instead of yield from '
303                        f'in task {self!r} with {result!r}')
304                    self._loop.call_soon(
305                        self.__step, new_exc, context=self._context)
306
307            elif result is None:
308                # Bare yield relinquishes control for one event loop iteration.
309                self._loop.call_soon(self.__step, context=self._context)
310            elif inspect.isgenerator(result):
311                # Yielding a generator is just wrong.
312                new_exc = RuntimeError(
313                    f'yield was used instead of yield from for '
314                    f'generator in task {self!r} with {result!r}')
315                self._loop.call_soon(
316                    self.__step, new_exc, context=self._context)
317            else:
318                # Yielding something else is an error.
319                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
320                self._loop.call_soon(
321                    self.__step, new_exc, context=self._context)
322        finally:
323            _leave_task(self._loop, self)
324            self = None  # Needed to break cycles when an exception occurs.
325
326    def __wakeup(self, future):
327        try:
328            future.result()
329        except BaseException as exc:
330            # This may also be a cancellation.
331            self.__step(exc)
332        else:
333            # Don't pass the value of `future.result()` explicitly,
334            # as `Future.__iter__` and `Future.__await__` don't need it.
335            # If we call `_step(value, None)` instead of `_step()`,
336            # Python eval loop would use `.send(value)` method call,
337            # instead of `__next__()`, which is slower for futures
338            # that return non-generator iterators from their `__iter__`.
339            self.__step()
340        self = None  # Needed to break cycles when an exception occurs.
341
342
343_PyTask = Task
344
345
346try:
347    import _asyncio
348except ImportError:
349    pass
350else:
351    # _CTask is needed for tests.
352    Task = _CTask = _asyncio.Task
353
354
355def create_task(coro, *, name=None):
356    """Schedule the execution of a coroutine object in a spawn task.
357
358    Return a Task object.
359    """
360    loop = events.get_running_loop()
361    task = loop.create_task(coro)
362    _set_task_name(task, name)
363    return task
364
365
366# wait() and as_completed() similar to those in PEP 3148.
367
368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
371
372
373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
374    """Wait for the Futures and coroutines given by fs to complete.
375
376    The fs iterable must not be empty.
377
378    Coroutines will be wrapped in Tasks.
379
380    Returns two sets of Future: (done, pending).
381
382    Usage:
383
384        done, pending = await asyncio.wait(fs)
385
386    Note: This does not raise TimeoutError! Futures that aren't done
387    when the timeout occurs are returned in the second set.
388    """
389    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
390        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
391    if not fs:
392        raise ValueError('Set of coroutines/Futures is empty.')
393    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
394        raise ValueError(f'Invalid return_when value: {return_when}')
395
396    if loop is None:
397        loop = events.get_running_loop()
398    else:
399        warnings.warn("The loop argument is deprecated since Python 3.8, "
400                      "and scheduled for removal in Python 3.10.",
401                      DeprecationWarning, stacklevel=2)
402
403    fs = set(fs)
404
405    if any(coroutines.iscoroutine(f) for f in fs):
406        warnings.warn("The explicit passing of coroutine objects to "
407                      "asyncio.wait() is deprecated since Python 3.8, and "
408                      "scheduled for removal in Python 3.11.",
409                      DeprecationWarning, stacklevel=2)
410
411    fs = {ensure_future(f, loop=loop) for f in fs}
412
413    return await _wait(fs, timeout, return_when, loop)
414
415
416def _release_waiter(waiter, *args):
417    if not waiter.done():
418        waiter.set_result(None)
419
420
421async def wait_for(fut, timeout, *, loop=None):
422    """Wait for the single Future or coroutine to complete, with timeout.
423
424    Coroutine will be wrapped in Task.
425
426    Returns result of the Future or coroutine.  When a timeout occurs,
427    it cancels the task and raises TimeoutError.  To avoid the task
428    cancellation, wrap it in shield().
429
430    If the wait is cancelled, the task is also cancelled.
431
432    This function is a coroutine.
433    """
434    if loop is None:
435        loop = events.get_running_loop()
436    else:
437        warnings.warn("The loop argument is deprecated since Python 3.8, "
438                      "and scheduled for removal in Python 3.10.",
439                      DeprecationWarning, stacklevel=2)
440
441    if timeout is None:
442        return await fut
443
444    if timeout <= 0:
445        fut = ensure_future(fut, loop=loop)
446
447        if fut.done():
448            return fut.result()
449
450        await _cancel_and_wait(fut, loop=loop)
451        try:
452            fut.result()
453        except exceptions.CancelledError as exc:
454            raise exceptions.TimeoutError() from exc
455        else:
456            raise exceptions.TimeoutError()
457
458    waiter = loop.create_future()
459    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
460    cb = functools.partial(_release_waiter, waiter)
461
462    fut = ensure_future(fut, loop=loop)
463    fut.add_done_callback(cb)
464
465    try:
466        # wait until the future completes or the timeout
467        try:
468            await waiter
469        except exceptions.CancelledError:
470            if fut.done():
471                return fut.result()
472            else:
473                fut.remove_done_callback(cb)
474                # We must ensure that the task is not running
475                # after wait_for() returns.
476                # See https://bugs.python.org/issue32751
477                await _cancel_and_wait(fut, loop=loop)
478                raise
479
480        if fut.done():
481            return fut.result()
482        else:
483            fut.remove_done_callback(cb)
484            # We must ensure that the task is not running
485            # after wait_for() returns.
486            # See https://bugs.python.org/issue32751
487            await _cancel_and_wait(fut, loop=loop)
488            # In case task cancellation failed with some
489            # exception, we should re-raise it
490            # See https://bugs.python.org/issue40607
491            try:
492                fut.result()
493            except exceptions.CancelledError as exc:
494                raise exceptions.TimeoutError() from exc
495            else:
496                raise exceptions.TimeoutError()
497    finally:
498        timeout_handle.cancel()
499
500
501async def _wait(fs, timeout, return_when, loop):
502    """Internal helper for wait().
503
504    The fs argument must be a collection of Futures.
505    """
506    assert fs, 'Set of Futures is empty.'
507    waiter = loop.create_future()
508    timeout_handle = None
509    if timeout is not None:
510        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
511    counter = len(fs)
512
513    def _on_completion(f):
514        nonlocal counter
515        counter -= 1
516        if (counter <= 0 or
517            return_when == FIRST_COMPLETED or
518            return_when == FIRST_EXCEPTION and (not f.cancelled() and
519                                                f.exception() is not None)):
520            if timeout_handle is not None:
521                timeout_handle.cancel()
522            if not waiter.done():
523                waiter.set_result(None)
524
525    for f in fs:
526        f.add_done_callback(_on_completion)
527
528    try:
529        await waiter
530    finally:
531        if timeout_handle is not None:
532            timeout_handle.cancel()
533        for f in fs:
534            f.remove_done_callback(_on_completion)
535
536    done, pending = set(), set()
537    for f in fs:
538        if f.done():
539            done.add(f)
540        else:
541            pending.add(f)
542    return done, pending
543
544
545async def _cancel_and_wait(fut, loop):
546    """Cancel the *fut* future or task and wait until it completes."""
547
548    waiter = loop.create_future()
549    cb = functools.partial(_release_waiter, waiter)
550    fut.add_done_callback(cb)
551
552    try:
553        fut.cancel()
554        # We cannot wait on *fut* directly to make
555        # sure _cancel_and_wait itself is reliably cancellable.
556        await waiter
557    finally:
558        fut.remove_done_callback(cb)
559
560
561# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
562def as_completed(fs, *, loop=None, timeout=None):
563    """Return an iterator whose values are coroutines.
564
565    When waiting for the yielded coroutines you'll get the results (or
566    exceptions!) of the original Futures (or coroutines), in the order
567    in which and as soon as they complete.
568
569    This differs from PEP 3148; the proper way to use this is:
570
571        for f in as_completed(fs):
572            result = await f  # The 'await' may raise.
573            # Use result.
574
575    If a timeout is specified, the 'await' will raise
576    TimeoutError when the timeout occurs before all Futures are done.
577
578    Note: The futures 'f' are not necessarily members of fs.
579    """
580    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
581        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
582
583    if loop is not None:
584        warnings.warn("The loop argument is deprecated since Python 3.8, "
585                      "and scheduled for removal in Python 3.10.",
586                      DeprecationWarning, stacklevel=2)
587
588    from .queues import Queue  # Import here to avoid circular import problem.
589    done = Queue(loop=loop)
590
591    if loop is None:
592        loop = events.get_event_loop()
593    todo = {ensure_future(f, loop=loop) for f in set(fs)}
594    timeout_handle = None
595
596    def _on_timeout():
597        for f in todo:
598            f.remove_done_callback(_on_completion)
599            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
600        todo.clear()  # Can't do todo.remove(f) in the loop.
601
602    def _on_completion(f):
603        if not todo:
604            return  # _on_timeout() was here first.
605        todo.remove(f)
606        done.put_nowait(f)
607        if not todo and timeout_handle is not None:
608            timeout_handle.cancel()
609
610    async def _wait_for_one():
611        f = await done.get()
612        if f is None:
613            # Dummy value from _on_timeout().
614            raise exceptions.TimeoutError
615        return f.result()  # May raise f.exception().
616
617    for f in todo:
618        f.add_done_callback(_on_completion)
619    if todo and timeout is not None:
620        timeout_handle = loop.call_later(timeout, _on_timeout)
621    for _ in range(len(todo)):
622        yield _wait_for_one()
623
624
625@types.coroutine
626def __sleep0():
627    """Skip one event loop run cycle.
628
629    This is a private helper for 'asyncio.sleep()', used
630    when the 'delay' is set to 0.  It uses a bare 'yield'
631    expression (which Task.__step knows how to handle)
632    instead of creating a Future object.
633    """
634    yield
635
636
637async def sleep(delay, result=None, *, loop=None):
638    """Coroutine that completes after a given time (in seconds)."""
639    if loop is not None:
640        warnings.warn("The loop argument is deprecated since Python 3.8, "
641                      "and scheduled for removal in Python 3.10.",
642                      DeprecationWarning, stacklevel=2)
643
644    if delay <= 0:
645        await __sleep0()
646        return result
647
648    if loop is None:
649        loop = events.get_running_loop()
650
651    future = loop.create_future()
652    h = loop.call_later(delay,
653                        futures._set_result_unless_cancelled,
654                        future, result)
655    try:
656        return await future
657    finally:
658        h.cancel()
659
660
661def ensure_future(coro_or_future, *, loop=None):
662    """Wrap a coroutine or an awaitable in a future.
663
664    If the argument is a Future, it is returned directly.
665    """
666    if coroutines.iscoroutine(coro_or_future):
667        if loop is None:
668            loop = events.get_event_loop()
669        task = loop.create_task(coro_or_future)
670        if task._source_traceback:
671            del task._source_traceback[-1]
672        return task
673    elif futures.isfuture(coro_or_future):
674        if loop is not None and loop is not futures._get_loop(coro_or_future):
675            raise ValueError('The future belongs to a different loop than '
676                             'the one specified as the loop argument')
677        return coro_or_future
678    elif inspect.isawaitable(coro_or_future):
679        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
680    else:
681        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
682                        'required')
683
684
685@types.coroutine
686def _wrap_awaitable(awaitable):
687    """Helper for asyncio.ensure_future().
688
689    Wraps awaitable (an object with __await__) into a coroutine
690    that will later be wrapped in a Task by ensure_future().
691    """
692    return (yield from awaitable.__await__())
693
694_wrap_awaitable._is_coroutine = _is_coroutine
695
696
697class _GatheringFuture(futures.Future):
698    """Helper for gather().
699
700    This overrides cancel() to cancel all the children and act more
701    like Task.cancel(), which doesn't immediately mark itself as
702    cancelled.
703    """
704
705    def __init__(self, children, *, loop=None):
706        super().__init__(loop=loop)
707        self._children = children
708        self._cancel_requested = False
709
710    def cancel(self, msg=None):
711        if self.done():
712            return False
713        ret = False
714        for child in self._children:
715            if child.cancel(msg=msg):
716                ret = True
717        if ret:
718            # If any child tasks were actually cancelled, we should
719            # propagate the cancellation request regardless of
720            # *return_exceptions* argument.  See issue 32684.
721            self._cancel_requested = True
722        return ret
723
724
725def gather(*coros_or_futures, loop=None, return_exceptions=False):
726    """Return a future aggregating results from the given coroutines/futures.
727
728    Coroutines will be wrapped in a future and scheduled in the event
729    loop. They will not necessarily be scheduled in the same order as
730    passed in.
731
732    All futures must share the same event loop.  If all the tasks are
733    done successfully, the returned future's result is the list of
734    results (in the order of the original sequence, not necessarily
735    the order of results arrival).  If *return_exceptions* is True,
736    exceptions in the tasks are treated the same as successful
737    results, and gathered in the result list; otherwise, the first
738    raised exception will be immediately propagated to the returned
739    future.
740
741    Cancellation: if the outer Future is cancelled, all children (that
742    have not completed yet) are also cancelled.  If any child is
743    cancelled, this is treated as if it raised CancelledError --
744    the outer Future is *not* cancelled in this case.  (This is to
745    prevent the cancellation of one child to cause other children to
746    be cancelled.)
747
748    If *return_exceptions* is False, cancelling gather() after it
749    has been marked done won't cancel any submitted awaitables.
750    For instance, gather can be marked done after propagating an
751    exception to the caller, therefore, calling ``gather.cancel()``
752    after catching an exception (raised by one of the awaitables) from
753    gather won't cancel any other awaitables.
754    """
755    if loop is not None:
756        warnings.warn("The loop argument is deprecated since Python 3.8, "
757                      "and scheduled for removal in Python 3.10.",
758                      DeprecationWarning, stacklevel=2)
759
760    return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
761
762
763def _gather(*coros_or_futures, loop=None, return_exceptions=False):
764    if not coros_or_futures:
765        if loop is None:
766            loop = events.get_event_loop()
767        outer = loop.create_future()
768        outer.set_result([])
769        return outer
770
771    def _done_callback(fut):
772        nonlocal nfinished
773        nfinished += 1
774
775        if outer.done():
776            if not fut.cancelled():
777                # Mark exception retrieved.
778                fut.exception()
779            return
780
781        if not return_exceptions:
782            if fut.cancelled():
783                # Check if 'fut' is cancelled first, as
784                # 'fut.exception()' will *raise* a CancelledError
785                # instead of returning it.
786                exc = fut._make_cancelled_error()
787                outer.set_exception(exc)
788                return
789            else:
790                exc = fut.exception()
791                if exc is not None:
792                    outer.set_exception(exc)
793                    return
794
795        if nfinished == nfuts:
796            # All futures are done; create a list of results
797            # and set it to the 'outer' future.
798            results = []
799
800            for fut in children:
801                if fut.cancelled():
802                    # Check if 'fut' is cancelled first, as 'fut.exception()'
803                    # will *raise* a CancelledError instead of returning it.
804                    # Also, since we're adding the exception return value
805                    # to 'results' instead of raising it, don't bother
806                    # setting __context__.  This also lets us preserve
807                    # calling '_make_cancelled_error()' at most once.
808                    res = exceptions.CancelledError(
809                        '' if fut._cancel_message is None else
810                        fut._cancel_message)
811                else:
812                    res = fut.exception()
813                    if res is None:
814                        res = fut.result()
815                results.append(res)
816
817            if outer._cancel_requested:
818                # If gather is being cancelled we must propagate the
819                # cancellation regardless of *return_exceptions* argument.
820                # See issue 32684.
821                exc = fut._make_cancelled_error()
822                outer.set_exception(exc)
823            else:
824                outer.set_result(results)
825
826    arg_to_fut = {}
827    children = []
828    nfuts = 0
829    nfinished = 0
830    for arg in coros_or_futures:
831        if arg not in arg_to_fut:
832            fut = ensure_future(arg, loop=loop)
833            if loop is None:
834                loop = futures._get_loop(fut)
835            if fut is not arg:
836                # 'arg' was not a Future, therefore, 'fut' is a new
837                # Future created specifically for 'arg'.  Since the caller
838                # can't control it, disable the "destroy pending task"
839                # warning.
840                fut._log_destroy_pending = False
841
842            nfuts += 1
843            arg_to_fut[arg] = fut
844            fut.add_done_callback(_done_callback)
845
846        else:
847            # There's a duplicate Future object in coros_or_futures.
848            fut = arg_to_fut[arg]
849
850        children.append(fut)
851
852    outer = _GatheringFuture(children, loop=loop)
853    return outer
854
855
856def shield(arg, *, loop=None):
857    """Wait for a future, shielding it from cancellation.
858
859    The statement
860
861        res = await shield(something())
862
863    is exactly equivalent to the statement
864
865        res = await something()
866
867    *except* that if the coroutine containing it is cancelled, the
868    task running in something() is not cancelled.  From the POV of
869    something(), the cancellation did not happen.  But its caller is
870    still cancelled, so the yield-from expression still raises
871    CancelledError.  Note: If something() is cancelled by other means
872    this will still cancel shield().
873
874    If you want to completely ignore cancellation (not recommended)
875    you can combine shield() with a try/except clause, as follows:
876
877        try:
878            res = await shield(something())
879        except CancelledError:
880            res = None
881    """
882    if loop is not None:
883        warnings.warn("The loop argument is deprecated since Python 3.8, "
884                      "and scheduled for removal in Python 3.10.",
885                      DeprecationWarning, stacklevel=2)
886    inner = ensure_future(arg, loop=loop)
887    if inner.done():
888        # Shortcut.
889        return inner
890    loop = futures._get_loop(inner)
891    outer = loop.create_future()
892
893    def _inner_done_callback(inner):
894        if outer.cancelled():
895            if not inner.cancelled():
896                # Mark inner's result as retrieved.
897                inner.exception()
898            return
899
900        if inner.cancelled():
901            outer.cancel()
902        else:
903            exc = inner.exception()
904            if exc is not None:
905                outer.set_exception(exc)
906            else:
907                outer.set_result(inner.result())
908
909
910    def _outer_done_callback(outer):
911        if not inner.done():
912            inner.remove_done_callback(_inner_done_callback)
913
914    inner.add_done_callback(_inner_done_callback)
915    outer.add_done_callback(_outer_done_callback)
916    return outer
917
918
919def run_coroutine_threadsafe(coro, loop):
920    """Submit a coroutine object to a given event loop.
921
922    Return a concurrent.futures.Future to access the result.
923    """
924    if not coroutines.iscoroutine(coro):
925        raise TypeError('A coroutine object is required')
926    future = concurrent.futures.Future()
927
928    def callback():
929        try:
930            futures._chain_future(ensure_future(coro, loop=loop), future)
931        except (SystemExit, KeyboardInterrupt):
932            raise
933        except BaseException as exc:
934            if future.set_running_or_notify_cancel():
935                future.set_exception(exc)
936            raise
937
938    loop.call_soon_threadsafe(callback)
939    return future
940
941
942# WeakSet containing all alive tasks.
943_all_tasks = weakref.WeakSet()
944
945# Dictionary containing tasks that are currently active in
946# all running event loops.  {EventLoop: Task}
947_current_tasks = {}
948
949
950def _register_task(task):
951    """Register a new task in asyncio as executed by loop."""
952    _all_tasks.add(task)
953
954
955def _enter_task(loop, task):
956    current_task = _current_tasks.get(loop)
957    if current_task is not None:
958        raise RuntimeError(f"Cannot enter into task {task!r} while another "
959                           f"task {current_task!r} is being executed.")
960    _current_tasks[loop] = task
961
962
963def _leave_task(loop, task):
964    current_task = _current_tasks.get(loop)
965    if current_task is not task:
966        raise RuntimeError(f"Leaving task {task!r} does not match "
967                           f"the current task {current_task!r}.")
968    del _current_tasks[loop]
969
970
971def _unregister_task(task):
972    """Unregister a task."""
973    _all_tasks.discard(task)
974
975
976_py_register_task = _register_task
977_py_unregister_task = _unregister_task
978_py_enter_task = _enter_task
979_py_leave_task = _leave_task
980
981
982try:
983    from _asyncio import (_register_task, _unregister_task,
984                          _enter_task, _leave_task,
985                          _all_tasks, _current_tasks)
986except ImportError:
987    pass
988else:
989    _c_register_task = _register_task
990    _c_unregister_task = _unregister_task
991    _c_enter_task = _enter_task
992    _c_leave_task = _leave_task
993