1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _set_task_name(task, name):
65    if name is not None:
66        try:
67            set_name = task.set_name
68        except AttributeError:
69            pass
70        else:
71            set_name(name)
72
73
74class Task(futures._PyFuture):  # Inherit Python Task implementation
75                                # from a Python Future implementation.
76
77    """A coroutine wrapped in a Future."""
78
79    # An important invariant maintained while a Task not done:
80    #
81    # - Either _fut_waiter is None, and _step() is scheduled;
82    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
83    #
84    # The only transition from the latter to the former is through
85    # _wakeup().  When _fut_waiter is not None, one of its callbacks
86    # must be _wakeup().
87
88    # If False, don't log a message if the task is destroyed whereas its
89    # status is still pending
90    _log_destroy_pending = True
91
92    def __init__(self, coro, *, loop=None, name=None):
93        super().__init__(loop=loop)
94        if self._source_traceback:
95            del self._source_traceback[-1]
96        if not coroutines.iscoroutine(coro):
97            # raise after Future.__init__(), attrs are required for __del__
98            # prevent logging for pending task in __del__
99            self._log_destroy_pending = False
100            raise TypeError(f"a coroutine was expected, got {coro!r}")
101
102        if name is None:
103            self._name = f'Task-{_task_name_counter()}'
104        else:
105            self._name = str(name)
106
107        self._must_cancel = False
108        self._fut_waiter = None
109        self._coro = coro
110        self._context = contextvars.copy_context()
111
112        self._loop.call_soon(self.__step, context=self._context)
113        _register_task(self)
114
115    def __del__(self):
116        if self._state == futures._PENDING and self._log_destroy_pending:
117            context = {
118                'task': self,
119                'message': 'Task was destroyed but it is pending!',
120            }
121            if self._source_traceback:
122                context['source_traceback'] = self._source_traceback
123            self._loop.call_exception_handler(context)
124        super().__del__()
125
126    def __class_getitem__(cls, type):
127        return cls
128
129    def _repr_info(self):
130        return base_tasks._task_repr_info(self)
131
132    def get_coro(self):
133        return self._coro
134
135    def get_name(self):
136        return self._name
137
138    def set_name(self, value):
139        self._name = str(value)
140
141    def set_result(self, result):
142        raise RuntimeError('Task does not support set_result operation')
143
144    def set_exception(self, exception):
145        raise RuntimeError('Task does not support set_exception operation')
146
147    def get_stack(self, *, limit=None):
148        """Return the list of stack frames for this task's coroutine.
149
150        If the coroutine is not done, this returns the stack where it is
151        suspended.  If the coroutine has completed successfully or was
152        cancelled, this returns an empty list.  If the coroutine was
153        terminated by an exception, this returns the list of traceback
154        frames.
155
156        The frames are always ordered from oldest to newest.
157
158        The optional limit gives the maximum number of frames to
159        return; by default all available frames are returned.  Its
160        meaning differs depending on whether a stack or a traceback is
161        returned: the newest frames of a stack are returned, but the
162        oldest frames of a traceback are returned.  (This matches the
163        behavior of the traceback module.)
164
165        For reasons beyond our control, only one stack frame is
166        returned for a suspended coroutine.
167        """
168        return base_tasks._task_get_stack(self, limit)
169
170    def print_stack(self, *, limit=None, file=None):
171        """Print the stack or traceback for this task's coroutine.
172
173        This produces output similar to that of the traceback module,
174        for the frames retrieved by get_stack().  The limit argument
175        is passed to get_stack().  The file argument is an I/O stream
176        to which the output is written; by default output is written
177        to sys.stderr.
178        """
179        return base_tasks._task_print_stack(self, limit, file)
180
181    def cancel(self, msg=None):
182        """Request that this task cancel itself.
183
184        This arranges for a CancelledError to be thrown into the
185        wrapped coroutine on the next cycle through the event loop.
186        The coroutine then has a chance to clean up or even deny
187        the request using try/except/finally.
188
189        Unlike Future.cancel, this does not guarantee that the
190        task will be cancelled: the exception might be caught and
191        acted upon, delaying cancellation of the task or preventing
192        cancellation completely.  The task may also return a value or
193        raise a different exception.
194
195        Immediately after this method is called, Task.cancelled() will
196        not return True (unless the task was already cancelled).  A
197        task will be marked as cancelled when the wrapped coroutine
198        terminates with a CancelledError exception (even if cancel()
199        was not called).
200        """
201        self._log_traceback = False
202        if self.done():
203            return False
204        if self._fut_waiter is not None:
205            if self._fut_waiter.cancel(msg=msg):
206                # Leave self._fut_waiter; it may be a Task that
207                # catches and ignores the cancellation so we may have
208                # to cancel it again later.
209                return True
210        # It must be the case that self.__step is already scheduled.
211        self._must_cancel = True
212        self._cancel_message = msg
213        return True
214
215    def __step(self, exc=None):
216        if self.done():
217            raise exceptions.InvalidStateError(
218                f'_step(): already done: {self!r}, {exc!r}')
219        if self._must_cancel:
220            if not isinstance(exc, exceptions.CancelledError):
221                exc = self._make_cancelled_error()
222            self._must_cancel = False
223        coro = self._coro
224        self._fut_waiter = None
225
226        _enter_task(self._loop, self)
227        # Call either coro.throw(exc) or coro.send(None).
228        try:
229            if exc is None:
230                # We use the `send` method directly, because coroutines
231                # don't have `__iter__` and `__next__` methods.
232                result = coro.send(None)
233            else:
234                result = coro.throw(exc)
235        except StopIteration as exc:
236            if self._must_cancel:
237                # Task is cancelled right before coro stops.
238                self._must_cancel = False
239                super().cancel(msg=self._cancel_message)
240            else:
241                super().set_result(exc.value)
242        except exceptions.CancelledError as exc:
243            # Save the original exception so we can chain it later.
244            self._cancelled_exc = exc
245            super().cancel()  # I.e., Future.cancel(self).
246        except (KeyboardInterrupt, SystemExit) as exc:
247            super().set_exception(exc)
248            raise
249        except BaseException as exc:
250            super().set_exception(exc)
251        else:
252            blocking = getattr(result, '_asyncio_future_blocking', None)
253            if blocking is not None:
254                # Yielded Future must come from Future.__iter__().
255                if futures._get_loop(result) is not self._loop:
256                    new_exc = RuntimeError(
257                        f'Task {self!r} got Future '
258                        f'{result!r} attached to a different loop')
259                    self._loop.call_soon(
260                        self.__step, new_exc, context=self._context)
261                elif blocking:
262                    if result is self:
263                        new_exc = RuntimeError(
264                            f'Task cannot await on itself: {self!r}')
265                        self._loop.call_soon(
266                            self.__step, new_exc, context=self._context)
267                    else:
268                        result._asyncio_future_blocking = False
269                        result.add_done_callback(
270                            self.__wakeup, context=self._context)
271                        self._fut_waiter = result
272                        if self._must_cancel:
273                            if self._fut_waiter.cancel(
274                                    msg=self._cancel_message):
275                                self._must_cancel = False
276                else:
277                    new_exc = RuntimeError(
278                        f'yield was used instead of yield from '
279                        f'in task {self!r} with {result!r}')
280                    self._loop.call_soon(
281                        self.__step, new_exc, context=self._context)
282
283            elif result is None:
284                # Bare yield relinquishes control for one event loop iteration.
285                self._loop.call_soon(self.__step, context=self._context)
286            elif inspect.isgenerator(result):
287                # Yielding a generator is just wrong.
288                new_exc = RuntimeError(
289                    f'yield was used instead of yield from for '
290                    f'generator in task {self!r} with {result!r}')
291                self._loop.call_soon(
292                    self.__step, new_exc, context=self._context)
293            else:
294                # Yielding something else is an error.
295                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
296                self._loop.call_soon(
297                    self.__step, new_exc, context=self._context)
298        finally:
299            _leave_task(self._loop, self)
300            self = None  # Needed to break cycles when an exception occurs.
301
302    def __wakeup(self, future):
303        try:
304            future.result()
305        except BaseException as exc:
306            # This may also be a cancellation.
307            self.__step(exc)
308        else:
309            # Don't pass the value of `future.result()` explicitly,
310            # as `Future.__iter__` and `Future.__await__` don't need it.
311            # If we call `_step(value, None)` instead of `_step()`,
312            # Python eval loop would use `.send(value)` method call,
313            # instead of `__next__()`, which is slower for futures
314            # that return non-generator iterators from their `__iter__`.
315            self.__step()
316        self = None  # Needed to break cycles when an exception occurs.
317
318
319_PyTask = Task
320
321
322try:
323    import _asyncio
324except ImportError:
325    pass
326else:
327    # _CTask is needed for tests.
328    Task = _CTask = _asyncio.Task
329
330
331def create_task(coro, *, name=None):
332    """Schedule the execution of a coroutine object in a spawn task.
333
334    Return a Task object.
335    """
336    loop = events.get_running_loop()
337    task = loop.create_task(coro)
338    _set_task_name(task, name)
339    return task
340
341
342# wait() and as_completed() similar to those in PEP 3148.
343
344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
347
348
349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
350    """Wait for the Futures and coroutines given by fs to complete.
351
352    The fs iterable must not be empty.
353
354    Coroutines will be wrapped in Tasks.
355
356    Returns two sets of Future: (done, pending).
357
358    Usage:
359
360        done, pending = await asyncio.wait(fs)
361
362    Note: This does not raise TimeoutError! Futures that aren't done
363    when the timeout occurs are returned in the second set.
364    """
365    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
366        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
367    if not fs:
368        raise ValueError('Set of coroutines/Futures is empty.')
369    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
370        raise ValueError(f'Invalid return_when value: {return_when}')
371
372    loop = events.get_running_loop()
373
374    fs = set(fs)
375
376    if any(coroutines.iscoroutine(f) for f in fs):
377        warnings.warn("The explicit passing of coroutine objects to "
378                      "asyncio.wait() is deprecated since Python 3.8, and "
379                      "scheduled for removal in Python 3.11.",
380                      DeprecationWarning, stacklevel=2)
381
382    fs = {ensure_future(f, loop=loop) for f in fs}
383
384    return await _wait(fs, timeout, return_when, loop)
385
386
387def _release_waiter(waiter, *args):
388    if not waiter.done():
389        waiter.set_result(None)
390
391
392async def wait_for(fut, timeout):
393    """Wait for the single Future or coroutine to complete, with timeout.
394
395    Coroutine will be wrapped in Task.
396
397    Returns result of the Future or coroutine.  When a timeout occurs,
398    it cancels the task and raises TimeoutError.  To avoid the task
399    cancellation, wrap it in shield().
400
401    If the wait is cancelled, the task is also cancelled.
402
403    This function is a coroutine.
404    """
405    loop = events.get_running_loop()
406
407    if timeout is None:
408        return await fut
409
410    if timeout <= 0:
411        fut = ensure_future(fut, loop=loop)
412
413        if fut.done():
414            return fut.result()
415
416        await _cancel_and_wait(fut, loop=loop)
417        try:
418            return fut.result()
419        except exceptions.CancelledError as exc:
420            raise exceptions.TimeoutError() from exc
421
422    waiter = loop.create_future()
423    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
424    cb = functools.partial(_release_waiter, waiter)
425
426    fut = ensure_future(fut, loop=loop)
427    fut.add_done_callback(cb)
428
429    try:
430        # wait until the future completes or the timeout
431        try:
432            await waiter
433        except exceptions.CancelledError:
434            if fut.done():
435                return fut.result()
436            else:
437                fut.remove_done_callback(cb)
438                # We must ensure that the task is not running
439                # after wait_for() returns.
440                # See https://bugs.python.org/issue32751
441                await _cancel_and_wait(fut, loop=loop)
442                raise
443
444        if fut.done():
445            return fut.result()
446        else:
447            fut.remove_done_callback(cb)
448            # We must ensure that the task is not running
449            # after wait_for() returns.
450            # See https://bugs.python.org/issue32751
451            await _cancel_and_wait(fut, loop=loop)
452            # In case task cancellation failed with some
453            # exception, we should re-raise it
454            # See https://bugs.python.org/issue40607
455            try:
456                return fut.result()
457            except exceptions.CancelledError as exc:
458                raise exceptions.TimeoutError() from exc
459    finally:
460        timeout_handle.cancel()
461
462
463async def _wait(fs, timeout, return_when, loop):
464    """Internal helper for wait().
465
466    The fs argument must be a collection of Futures.
467    """
468    assert fs, 'Set of Futures is empty.'
469    waiter = loop.create_future()
470    timeout_handle = None
471    if timeout is not None:
472        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
473    counter = len(fs)
474
475    def _on_completion(f):
476        nonlocal counter
477        counter -= 1
478        if (counter <= 0 or
479            return_when == FIRST_COMPLETED or
480            return_when == FIRST_EXCEPTION and (not f.cancelled() and
481                                                f.exception() is not None)):
482            if timeout_handle is not None:
483                timeout_handle.cancel()
484            if not waiter.done():
485                waiter.set_result(None)
486
487    for f in fs:
488        f.add_done_callback(_on_completion)
489
490    try:
491        await waiter
492    finally:
493        if timeout_handle is not None:
494            timeout_handle.cancel()
495        for f in fs:
496            f.remove_done_callback(_on_completion)
497
498    done, pending = set(), set()
499    for f in fs:
500        if f.done():
501            done.add(f)
502        else:
503            pending.add(f)
504    return done, pending
505
506
507async def _cancel_and_wait(fut, loop):
508    """Cancel the *fut* future or task and wait until it completes."""
509
510    waiter = loop.create_future()
511    cb = functools.partial(_release_waiter, waiter)
512    fut.add_done_callback(cb)
513
514    try:
515        fut.cancel()
516        # We cannot wait on *fut* directly to make
517        # sure _cancel_and_wait itself is reliably cancellable.
518        await waiter
519    finally:
520        fut.remove_done_callback(cb)
521
522
523# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
524def as_completed(fs, *, timeout=None):
525    """Return an iterator whose values are coroutines.
526
527    When waiting for the yielded coroutines you'll get the results (or
528    exceptions!) of the original Futures (or coroutines), in the order
529    in which and as soon as they complete.
530
531    This differs from PEP 3148; the proper way to use this is:
532
533        for f in as_completed(fs):
534            result = await f  # The 'await' may raise.
535            # Use result.
536
537    If a timeout is specified, the 'await' will raise
538    TimeoutError when the timeout occurs before all Futures are done.
539
540    Note: The futures 'f' are not necessarily members of fs.
541    """
542    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
543        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
544
545    from .queues import Queue  # Import here to avoid circular import problem.
546    done = Queue()
547
548    loop = events._get_event_loop()
549    todo = {ensure_future(f, loop=loop) for f in set(fs)}
550    timeout_handle = None
551
552    def _on_timeout():
553        for f in todo:
554            f.remove_done_callback(_on_completion)
555            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
556        todo.clear()  # Can't do todo.remove(f) in the loop.
557
558    def _on_completion(f):
559        if not todo:
560            return  # _on_timeout() was here first.
561        todo.remove(f)
562        done.put_nowait(f)
563        if not todo and timeout_handle is not None:
564            timeout_handle.cancel()
565
566    async def _wait_for_one():
567        f = await done.get()
568        if f is None:
569            # Dummy value from _on_timeout().
570            raise exceptions.TimeoutError
571        return f.result()  # May raise f.exception().
572
573    for f in todo:
574        f.add_done_callback(_on_completion)
575    if todo and timeout is not None:
576        timeout_handle = loop.call_later(timeout, _on_timeout)
577    for _ in range(len(todo)):
578        yield _wait_for_one()
579
580
581@types.coroutine
582def __sleep0():
583    """Skip one event loop run cycle.
584
585    This is a private helper for 'asyncio.sleep()', used
586    when the 'delay' is set to 0.  It uses a bare 'yield'
587    expression (which Task.__step knows how to handle)
588    instead of creating a Future object.
589    """
590    yield
591
592
593async def sleep(delay, result=None):
594    """Coroutine that completes after a given time (in seconds)."""
595    if delay <= 0:
596        await __sleep0()
597        return result
598
599    loop = events.get_running_loop()
600    future = loop.create_future()
601    h = loop.call_later(delay,
602                        futures._set_result_unless_cancelled,
603                        future, result)
604    try:
605        return await future
606    finally:
607        h.cancel()
608
609
610def ensure_future(coro_or_future, *, loop=None):
611    """Wrap a coroutine or an awaitable in a future.
612
613    If the argument is a Future, it is returned directly.
614    """
615    return _ensure_future(coro_or_future, loop=loop)
616
617
618def _ensure_future(coro_or_future, *, loop=None):
619    if futures.isfuture(coro_or_future):
620        if loop is not None and loop is not futures._get_loop(coro_or_future):
621            raise ValueError('The future belongs to a different loop than '
622                            'the one specified as the loop argument')
623        return coro_or_future
624
625    if not coroutines.iscoroutine(coro_or_future):
626        if inspect.isawaitable(coro_or_future):
627            coro_or_future = _wrap_awaitable(coro_or_future)
628        else:
629            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
630                            'is required')
631
632    if loop is None:
633        loop = events._get_event_loop(stacklevel=4)
634    return loop.create_task(coro_or_future)
635
636
637@types.coroutine
638def _wrap_awaitable(awaitable):
639    """Helper for asyncio.ensure_future().
640
641    Wraps awaitable (an object with __await__) into a coroutine
642    that will later be wrapped in a Task by ensure_future().
643    """
644    return (yield from awaitable.__await__())
645
646_wrap_awaitable._is_coroutine = _is_coroutine
647
648
649class _GatheringFuture(futures.Future):
650    """Helper for gather().
651
652    This overrides cancel() to cancel all the children and act more
653    like Task.cancel(), which doesn't immediately mark itself as
654    cancelled.
655    """
656
657    def __init__(self, children, *, loop):
658        assert loop is not None
659        super().__init__(loop=loop)
660        self._children = children
661        self._cancel_requested = False
662
663    def cancel(self, msg=None):
664        if self.done():
665            return False
666        ret = False
667        for child in self._children:
668            if child.cancel(msg=msg):
669                ret = True
670        if ret:
671            # If any child tasks were actually cancelled, we should
672            # propagate the cancellation request regardless of
673            # *return_exceptions* argument.  See issue 32684.
674            self._cancel_requested = True
675        return ret
676
677
678def gather(*coros_or_futures, return_exceptions=False):
679    """Return a future aggregating results from the given coroutines/futures.
680
681    Coroutines will be wrapped in a future and scheduled in the event
682    loop. They will not necessarily be scheduled in the same order as
683    passed in.
684
685    All futures must share the same event loop.  If all the tasks are
686    done successfully, the returned future's result is the list of
687    results (in the order of the original sequence, not necessarily
688    the order of results arrival).  If *return_exceptions* is True,
689    exceptions in the tasks are treated the same as successful
690    results, and gathered in the result list; otherwise, the first
691    raised exception will be immediately propagated to the returned
692    future.
693
694    Cancellation: if the outer Future is cancelled, all children (that
695    have not completed yet) are also cancelled.  If any child is
696    cancelled, this is treated as if it raised CancelledError --
697    the outer Future is *not* cancelled in this case.  (This is to
698    prevent the cancellation of one child to cause other children to
699    be cancelled.)
700
701    If *return_exceptions* is False, cancelling gather() after it
702    has been marked done won't cancel any submitted awaitables.
703    For instance, gather can be marked done after propagating an
704    exception to the caller, therefore, calling ``gather.cancel()``
705    after catching an exception (raised by one of the awaitables) from
706    gather won't cancel any other awaitables.
707    """
708    if not coros_or_futures:
709        loop = events._get_event_loop()
710        outer = loop.create_future()
711        outer.set_result([])
712        return outer
713
714    def _done_callback(fut):
715        nonlocal nfinished
716        nfinished += 1
717
718        if outer.done():
719            if not fut.cancelled():
720                # Mark exception retrieved.
721                fut.exception()
722            return
723
724        if not return_exceptions:
725            if fut.cancelled():
726                # Check if 'fut' is cancelled first, as
727                # 'fut.exception()' will *raise* a CancelledError
728                # instead of returning it.
729                exc = fut._make_cancelled_error()
730                outer.set_exception(exc)
731                return
732            else:
733                exc = fut.exception()
734                if exc is not None:
735                    outer.set_exception(exc)
736                    return
737
738        if nfinished == nfuts:
739            # All futures are done; create a list of results
740            # and set it to the 'outer' future.
741            results = []
742
743            for fut in children:
744                if fut.cancelled():
745                    # Check if 'fut' is cancelled first, as 'fut.exception()'
746                    # will *raise* a CancelledError instead of returning it.
747                    # Also, since we're adding the exception return value
748                    # to 'results' instead of raising it, don't bother
749                    # setting __context__.  This also lets us preserve
750                    # calling '_make_cancelled_error()' at most once.
751                    res = exceptions.CancelledError(
752                        '' if fut._cancel_message is None else
753                        fut._cancel_message)
754                else:
755                    res = fut.exception()
756                    if res is None:
757                        res = fut.result()
758                results.append(res)
759
760            if outer._cancel_requested:
761                # If gather is being cancelled we must propagate the
762                # cancellation regardless of *return_exceptions* argument.
763                # See issue 32684.
764                exc = fut._make_cancelled_error()
765                outer.set_exception(exc)
766            else:
767                outer.set_result(results)
768
769    arg_to_fut = {}
770    children = []
771    nfuts = 0
772    nfinished = 0
773    loop = None
774    for arg in coros_or_futures:
775        if arg not in arg_to_fut:
776            fut = _ensure_future(arg, loop=loop)
777            if loop is None:
778                loop = futures._get_loop(fut)
779            if fut is not arg:
780                # 'arg' was not a Future, therefore, 'fut' is a new
781                # Future created specifically for 'arg'.  Since the caller
782                # can't control it, disable the "destroy pending task"
783                # warning.
784                fut._log_destroy_pending = False
785
786            nfuts += 1
787            arg_to_fut[arg] = fut
788            fut.add_done_callback(_done_callback)
789
790        else:
791            # There's a duplicate Future object in coros_or_futures.
792            fut = arg_to_fut[arg]
793
794        children.append(fut)
795
796    outer = _GatheringFuture(children, loop=loop)
797    return outer
798
799
800def shield(arg):
801    """Wait for a future, shielding it from cancellation.
802
803    The statement
804
805        res = await shield(something())
806
807    is exactly equivalent to the statement
808
809        res = await something()
810
811    *except* that if the coroutine containing it is cancelled, the
812    task running in something() is not cancelled.  From the POV of
813    something(), the cancellation did not happen.  But its caller is
814    still cancelled, so the yield-from expression still raises
815    CancelledError.  Note: If something() is cancelled by other means
816    this will still cancel shield().
817
818    If you want to completely ignore cancellation (not recommended)
819    you can combine shield() with a try/except clause, as follows:
820
821        try:
822            res = await shield(something())
823        except CancelledError:
824            res = None
825    """
826    inner = _ensure_future(arg)
827    if inner.done():
828        # Shortcut.
829        return inner
830    loop = futures._get_loop(inner)
831    outer = loop.create_future()
832
833    def _inner_done_callback(inner):
834        if outer.cancelled():
835            if not inner.cancelled():
836                # Mark inner's result as retrieved.
837                inner.exception()
838            return
839
840        if inner.cancelled():
841            outer.cancel()
842        else:
843            exc = inner.exception()
844            if exc is not None:
845                outer.set_exception(exc)
846            else:
847                outer.set_result(inner.result())
848
849
850    def _outer_done_callback(outer):
851        if not inner.done():
852            inner.remove_done_callback(_inner_done_callback)
853
854    inner.add_done_callback(_inner_done_callback)
855    outer.add_done_callback(_outer_done_callback)
856    return outer
857
858
859def run_coroutine_threadsafe(coro, loop):
860    """Submit a coroutine object to a given event loop.
861
862    Return a concurrent.futures.Future to access the result.
863    """
864    if not coroutines.iscoroutine(coro):
865        raise TypeError('A coroutine object is required')
866    future = concurrent.futures.Future()
867
868    def callback():
869        try:
870            futures._chain_future(ensure_future(coro, loop=loop), future)
871        except (SystemExit, KeyboardInterrupt):
872            raise
873        except BaseException as exc:
874            if future.set_running_or_notify_cancel():
875                future.set_exception(exc)
876            raise
877
878    loop.call_soon_threadsafe(callback)
879    return future
880
881
882# WeakSet containing all alive tasks.
883_all_tasks = weakref.WeakSet()
884
885# Dictionary containing tasks that are currently active in
886# all running event loops.  {EventLoop: Task}
887_current_tasks = {}
888
889
890def _register_task(task):
891    """Register a new task in asyncio as executed by loop."""
892    _all_tasks.add(task)
893
894
895def _enter_task(loop, task):
896    current_task = _current_tasks.get(loop)
897    if current_task is not None:
898        raise RuntimeError(f"Cannot enter into task {task!r} while another "
899                           f"task {current_task!r} is being executed.")
900    _current_tasks[loop] = task
901
902
903def _leave_task(loop, task):
904    current_task = _current_tasks.get(loop)
905    if current_task is not task:
906        raise RuntimeError(f"Leaving task {task!r} does not match "
907                           f"the current task {current_task!r}.")
908    del _current_tasks[loop]
909
910
911def _unregister_task(task):
912    """Unregister a task."""
913    _all_tasks.discard(task)
914
915
916_py_register_task = _register_task
917_py_unregister_task = _unregister_task
918_py_enter_task = _enter_task
919_py_leave_task = _leave_task
920
921
922try:
923    from _asyncio import (_register_task, _unregister_task,
924                          _enter_task, _leave_task,
925                          _all_tasks, _current_tasks)
926except ImportError:
927    pass
928else:
929    _c_register_task = _register_task
930    _c_unregister_task = _unregister_task
931    _c_enter_task = _enter_task
932    _c_leave_task = _leave_task
933