1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10) 11 12import concurrent.futures 13import contextvars 14import functools 15import inspect 16import itertools 17import types 18import warnings 19import weakref 20 21from . import base_tasks 22from . import coroutines 23from . import events 24from . import exceptions 25from . import futures 26from .coroutines import _is_coroutine 27 28# Helper to generate new task names 29# This uses itertools.count() instead of a "+= 1" operation because the latter 30# is not thread safe. See bpo-11866 for a longer explanation. 31_task_name_counter = itertools.count(1).__next__ 32 33 34def current_task(loop=None): 35 """Return a currently executed task.""" 36 if loop is None: 37 loop = events.get_running_loop() 38 return _current_tasks.get(loop) 39 40 41def all_tasks(loop=None): 42 """Return a set of all tasks for the loop.""" 43 if loop is None: 44 loop = events.get_running_loop() 45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 46 # thread while we do so. Therefore we cast it to list prior to filtering. The list 47 # cast itself requires iteration, so we repeat it several times ignoring 48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 49 # details. 50 i = 0 51 while True: 52 try: 53 tasks = list(_all_tasks) 54 except RuntimeError: 55 i += 1 56 if i >= 1000: 57 raise 58 else: 59 break 60 return {t for t in tasks 61 if futures._get_loop(t) is loop and not t.done()} 62 63 64def _all_tasks_compat(loop=None): 65 # Different from "all_task()" by returning *all* Tasks, including 66 # the completed ones. Used to implement deprecated "Tasks.all_task()" 67 # method. 68 if loop is None: 69 loop = events.get_event_loop() 70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 71 # thread while we do so. Therefore we cast it to list prior to filtering. The list 72 # cast itself requires iteration, so we repeat it several times ignoring 73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 74 # details. 75 i = 0 76 while True: 77 try: 78 tasks = list(_all_tasks) 79 except RuntimeError: 80 i += 1 81 if i >= 1000: 82 raise 83 else: 84 break 85 return {t for t in tasks if futures._get_loop(t) is loop} 86 87 88def _set_task_name(task, name): 89 if name is not None: 90 try: 91 set_name = task.set_name 92 except AttributeError: 93 pass 94 else: 95 set_name(name) 96 97 98class Task(futures._PyFuture): # Inherit Python Task implementation 99 # from a Python Future implementation. 100 101 """A coroutine wrapped in a Future.""" 102 103 # An important invariant maintained while a Task not done: 104 # 105 # - Either _fut_waiter is None, and _step() is scheduled; 106 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 107 # 108 # The only transition from the latter to the former is through 109 # _wakeup(). When _fut_waiter is not None, one of its callbacks 110 # must be _wakeup(). 111 112 # If False, don't log a message if the task is destroyed whereas its 113 # status is still pending 114 _log_destroy_pending = True 115 116 def __init__(self, coro, *, loop=None, name=None): 117 super().__init__(loop=loop) 118 if self._source_traceback: 119 del self._source_traceback[-1] 120 if not coroutines.iscoroutine(coro): 121 # raise after Future.__init__(), attrs are required for __del__ 122 # prevent logging for pending task in __del__ 123 self._log_destroy_pending = False 124 raise TypeError(f"a coroutine was expected, got {coro!r}") 125 126 if name is None: 127 self._name = f'Task-{_task_name_counter()}' 128 else: 129 self._name = str(name) 130 131 self._must_cancel = False 132 self._fut_waiter = None 133 self._coro = coro 134 self._context = contextvars.copy_context() 135 136 self._loop.call_soon(self.__step, context=self._context) 137 _register_task(self) 138 139 def __del__(self): 140 if self._state == futures._PENDING and self._log_destroy_pending: 141 context = { 142 'task': self, 143 'message': 'Task was destroyed but it is pending!', 144 } 145 if self._source_traceback: 146 context['source_traceback'] = self._source_traceback 147 self._loop.call_exception_handler(context) 148 super().__del__() 149 150 def __class_getitem__(cls, type): 151 return cls 152 153 def _repr_info(self): 154 return base_tasks._task_repr_info(self) 155 156 def get_coro(self): 157 return self._coro 158 159 def get_name(self): 160 return self._name 161 162 def set_name(self, value): 163 self._name = str(value) 164 165 def set_result(self, result): 166 raise RuntimeError('Task does not support set_result operation') 167 168 def set_exception(self, exception): 169 raise RuntimeError('Task does not support set_exception operation') 170 171 def get_stack(self, *, limit=None): 172 """Return the list of stack frames for this task's coroutine. 173 174 If the coroutine is not done, this returns the stack where it is 175 suspended. If the coroutine has completed successfully or was 176 cancelled, this returns an empty list. If the coroutine was 177 terminated by an exception, this returns the list of traceback 178 frames. 179 180 The frames are always ordered from oldest to newest. 181 182 The optional limit gives the maximum number of frames to 183 return; by default all available frames are returned. Its 184 meaning differs depending on whether a stack or a traceback is 185 returned: the newest frames of a stack are returned, but the 186 oldest frames of a traceback are returned. (This matches the 187 behavior of the traceback module.) 188 189 For reasons beyond our control, only one stack frame is 190 returned for a suspended coroutine. 191 """ 192 return base_tasks._task_get_stack(self, limit) 193 194 def print_stack(self, *, limit=None, file=None): 195 """Print the stack or traceback for this task's coroutine. 196 197 This produces output similar to that of the traceback module, 198 for the frames retrieved by get_stack(). The limit argument 199 is passed to get_stack(). The file argument is an I/O stream 200 to which the output is written; by default output is written 201 to sys.stderr. 202 """ 203 return base_tasks._task_print_stack(self, limit, file) 204 205 def cancel(self, msg=None): 206 """Request that this task cancel itself. 207 208 This arranges for a CancelledError to be thrown into the 209 wrapped coroutine on the next cycle through the event loop. 210 The coroutine then has a chance to clean up or even deny 211 the request using try/except/finally. 212 213 Unlike Future.cancel, this does not guarantee that the 214 task will be cancelled: the exception might be caught and 215 acted upon, delaying cancellation of the task or preventing 216 cancellation completely. The task may also return a value or 217 raise a different exception. 218 219 Immediately after this method is called, Task.cancelled() will 220 not return True (unless the task was already cancelled). A 221 task will be marked as cancelled when the wrapped coroutine 222 terminates with a CancelledError exception (even if cancel() 223 was not called). 224 """ 225 self._log_traceback = False 226 if self.done(): 227 return False 228 if self._fut_waiter is not None: 229 if self._fut_waiter.cancel(msg=msg): 230 # Leave self._fut_waiter; it may be a Task that 231 # catches and ignores the cancellation so we may have 232 # to cancel it again later. 233 return True 234 # It must be the case that self.__step is already scheduled. 235 self._must_cancel = True 236 self._cancel_message = msg 237 return True 238 239 def __step(self, exc=None): 240 if self.done(): 241 raise exceptions.InvalidStateError( 242 f'_step(): already done: {self!r}, {exc!r}') 243 if self._must_cancel: 244 if not isinstance(exc, exceptions.CancelledError): 245 exc = self._make_cancelled_error() 246 self._must_cancel = False 247 coro = self._coro 248 self._fut_waiter = None 249 250 _enter_task(self._loop, self) 251 # Call either coro.throw(exc) or coro.send(None). 252 try: 253 if exc is None: 254 # We use the `send` method directly, because coroutines 255 # don't have `__iter__` and `__next__` methods. 256 result = coro.send(None) 257 else: 258 result = coro.throw(exc) 259 except StopIteration as exc: 260 if self._must_cancel: 261 # Task is cancelled right before coro stops. 262 self._must_cancel = False 263 super().cancel(msg=self._cancel_message) 264 else: 265 super().set_result(exc.value) 266 except exceptions.CancelledError as exc: 267 # Save the original exception so we can chain it later. 268 self._cancelled_exc = exc 269 super().cancel() # I.e., Future.cancel(self). 270 except (KeyboardInterrupt, SystemExit) as exc: 271 super().set_exception(exc) 272 raise 273 except BaseException as exc: 274 super().set_exception(exc) 275 else: 276 blocking = getattr(result, '_asyncio_future_blocking', None) 277 if blocking is not None: 278 # Yielded Future must come from Future.__iter__(). 279 if futures._get_loop(result) is not self._loop: 280 new_exc = RuntimeError( 281 f'Task {self!r} got Future ' 282 f'{result!r} attached to a different loop') 283 self._loop.call_soon( 284 self.__step, new_exc, context=self._context) 285 elif blocking: 286 if result is self: 287 new_exc = RuntimeError( 288 f'Task cannot await on itself: {self!r}') 289 self._loop.call_soon( 290 self.__step, new_exc, context=self._context) 291 else: 292 result._asyncio_future_blocking = False 293 result.add_done_callback( 294 self.__wakeup, context=self._context) 295 self._fut_waiter = result 296 if self._must_cancel: 297 if self._fut_waiter.cancel( 298 msg=self._cancel_message): 299 self._must_cancel = False 300 else: 301 new_exc = RuntimeError( 302 f'yield was used instead of yield from ' 303 f'in task {self!r} with {result!r}') 304 self._loop.call_soon( 305 self.__step, new_exc, context=self._context) 306 307 elif result is None: 308 # Bare yield relinquishes control for one event loop iteration. 309 self._loop.call_soon(self.__step, context=self._context) 310 elif inspect.isgenerator(result): 311 # Yielding a generator is just wrong. 312 new_exc = RuntimeError( 313 f'yield was used instead of yield from for ' 314 f'generator in task {self!r} with {result!r}') 315 self._loop.call_soon( 316 self.__step, new_exc, context=self._context) 317 else: 318 # Yielding something else is an error. 319 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 320 self._loop.call_soon( 321 self.__step, new_exc, context=self._context) 322 finally: 323 _leave_task(self._loop, self) 324 self = None # Needed to break cycles when an exception occurs. 325 326 def __wakeup(self, future): 327 try: 328 future.result() 329 except BaseException as exc: 330 # This may also be a cancellation. 331 self.__step(exc) 332 else: 333 # Don't pass the value of `future.result()` explicitly, 334 # as `Future.__iter__` and `Future.__await__` don't need it. 335 # If we call `_step(value, None)` instead of `_step()`, 336 # Python eval loop would use `.send(value)` method call, 337 # instead of `__next__()`, which is slower for futures 338 # that return non-generator iterators from their `__iter__`. 339 self.__step() 340 self = None # Needed to break cycles when an exception occurs. 341 342 343_PyTask = Task 344 345 346try: 347 import _asyncio 348except ImportError: 349 pass 350else: 351 # _CTask is needed for tests. 352 Task = _CTask = _asyncio.Task 353 354 355def create_task(coro, *, name=None): 356 """Schedule the execution of a coroutine object in a spawn task. 357 358 Return a Task object. 359 """ 360 loop = events.get_running_loop() 361 task = loop.create_task(coro) 362 _set_task_name(task, name) 363 return task 364 365 366# wait() and as_completed() similar to those in PEP 3148. 367 368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 371 372 373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 374 """Wait for the Futures and coroutines given by fs to complete. 375 376 The fs iterable must not be empty. 377 378 Coroutines will be wrapped in Tasks. 379 380 Returns two sets of Future: (done, pending). 381 382 Usage: 383 384 done, pending = await asyncio.wait(fs) 385 386 Note: This does not raise TimeoutError! Futures that aren't done 387 when the timeout occurs are returned in the second set. 388 """ 389 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 390 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 391 if not fs: 392 raise ValueError('Set of coroutines/Futures is empty.') 393 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 394 raise ValueError(f'Invalid return_when value: {return_when}') 395 396 if loop is None: 397 loop = events.get_running_loop() 398 else: 399 warnings.warn("The loop argument is deprecated since Python 3.8, " 400 "and scheduled for removal in Python 3.10.", 401 DeprecationWarning, stacklevel=2) 402 403 fs = set(fs) 404 405 if any(coroutines.iscoroutine(f) for f in fs): 406 warnings.warn("The explicit passing of coroutine objects to " 407 "asyncio.wait() is deprecated since Python 3.8, and " 408 "scheduled for removal in Python 3.11.", 409 DeprecationWarning, stacklevel=2) 410 411 fs = {ensure_future(f, loop=loop) for f in fs} 412 413 return await _wait(fs, timeout, return_when, loop) 414 415 416def _release_waiter(waiter, *args): 417 if not waiter.done(): 418 waiter.set_result(None) 419 420 421async def wait_for(fut, timeout, *, loop=None): 422 """Wait for the single Future or coroutine to complete, with timeout. 423 424 Coroutine will be wrapped in Task. 425 426 Returns result of the Future or coroutine. When a timeout occurs, 427 it cancels the task and raises TimeoutError. To avoid the task 428 cancellation, wrap it in shield(). 429 430 If the wait is cancelled, the task is also cancelled. 431 432 This function is a coroutine. 433 """ 434 if loop is None: 435 loop = events.get_running_loop() 436 else: 437 warnings.warn("The loop argument is deprecated since Python 3.8, " 438 "and scheduled for removal in Python 3.10.", 439 DeprecationWarning, stacklevel=2) 440 441 if timeout is None: 442 return await fut 443 444 if timeout <= 0: 445 fut = ensure_future(fut, loop=loop) 446 447 if fut.done(): 448 return fut.result() 449 450 await _cancel_and_wait(fut, loop=loop) 451 try: 452 fut.result() 453 except exceptions.CancelledError as exc: 454 raise exceptions.TimeoutError() from exc 455 else: 456 raise exceptions.TimeoutError() 457 458 waiter = loop.create_future() 459 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 460 cb = functools.partial(_release_waiter, waiter) 461 462 fut = ensure_future(fut, loop=loop) 463 fut.add_done_callback(cb) 464 465 try: 466 # wait until the future completes or the timeout 467 try: 468 await waiter 469 except exceptions.CancelledError: 470 if fut.done(): 471 return fut.result() 472 else: 473 fut.remove_done_callback(cb) 474 # We must ensure that the task is not running 475 # after wait_for() returns. 476 # See https://bugs.python.org/issue32751 477 await _cancel_and_wait(fut, loop=loop) 478 raise 479 480 if fut.done(): 481 return fut.result() 482 else: 483 fut.remove_done_callback(cb) 484 # We must ensure that the task is not running 485 # after wait_for() returns. 486 # See https://bugs.python.org/issue32751 487 await _cancel_and_wait(fut, loop=loop) 488 # In case task cancellation failed with some 489 # exception, we should re-raise it 490 # See https://bugs.python.org/issue40607 491 try: 492 fut.result() 493 except exceptions.CancelledError as exc: 494 raise exceptions.TimeoutError() from exc 495 else: 496 raise exceptions.TimeoutError() 497 finally: 498 timeout_handle.cancel() 499 500 501async def _wait(fs, timeout, return_when, loop): 502 """Internal helper for wait(). 503 504 The fs argument must be a collection of Futures. 505 """ 506 assert fs, 'Set of Futures is empty.' 507 waiter = loop.create_future() 508 timeout_handle = None 509 if timeout is not None: 510 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 511 counter = len(fs) 512 513 def _on_completion(f): 514 nonlocal counter 515 counter -= 1 516 if (counter <= 0 or 517 return_when == FIRST_COMPLETED or 518 return_when == FIRST_EXCEPTION and (not f.cancelled() and 519 f.exception() is not None)): 520 if timeout_handle is not None: 521 timeout_handle.cancel() 522 if not waiter.done(): 523 waiter.set_result(None) 524 525 for f in fs: 526 f.add_done_callback(_on_completion) 527 528 try: 529 await waiter 530 finally: 531 if timeout_handle is not None: 532 timeout_handle.cancel() 533 for f in fs: 534 f.remove_done_callback(_on_completion) 535 536 done, pending = set(), set() 537 for f in fs: 538 if f.done(): 539 done.add(f) 540 else: 541 pending.add(f) 542 return done, pending 543 544 545async def _cancel_and_wait(fut, loop): 546 """Cancel the *fut* future or task and wait until it completes.""" 547 548 waiter = loop.create_future() 549 cb = functools.partial(_release_waiter, waiter) 550 fut.add_done_callback(cb) 551 552 try: 553 fut.cancel() 554 # We cannot wait on *fut* directly to make 555 # sure _cancel_and_wait itself is reliably cancellable. 556 await waiter 557 finally: 558 fut.remove_done_callback(cb) 559 560 561# This is *not* a @coroutine! It is just an iterator (yielding Futures). 562def as_completed(fs, *, loop=None, timeout=None): 563 """Return an iterator whose values are coroutines. 564 565 When waiting for the yielded coroutines you'll get the results (or 566 exceptions!) of the original Futures (or coroutines), in the order 567 in which and as soon as they complete. 568 569 This differs from PEP 3148; the proper way to use this is: 570 571 for f in as_completed(fs): 572 result = await f # The 'await' may raise. 573 # Use result. 574 575 If a timeout is specified, the 'await' will raise 576 TimeoutError when the timeout occurs before all Futures are done. 577 578 Note: The futures 'f' are not necessarily members of fs. 579 """ 580 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 581 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 582 583 if loop is not None: 584 warnings.warn("The loop argument is deprecated since Python 3.8, " 585 "and scheduled for removal in Python 3.10.", 586 DeprecationWarning, stacklevel=2) 587 588 from .queues import Queue # Import here to avoid circular import problem. 589 done = Queue(loop=loop) 590 591 if loop is None: 592 loop = events.get_event_loop() 593 todo = {ensure_future(f, loop=loop) for f in set(fs)} 594 timeout_handle = None 595 596 def _on_timeout(): 597 for f in todo: 598 f.remove_done_callback(_on_completion) 599 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 600 todo.clear() # Can't do todo.remove(f) in the loop. 601 602 def _on_completion(f): 603 if not todo: 604 return # _on_timeout() was here first. 605 todo.remove(f) 606 done.put_nowait(f) 607 if not todo and timeout_handle is not None: 608 timeout_handle.cancel() 609 610 async def _wait_for_one(): 611 f = await done.get() 612 if f is None: 613 # Dummy value from _on_timeout(). 614 raise exceptions.TimeoutError 615 return f.result() # May raise f.exception(). 616 617 for f in todo: 618 f.add_done_callback(_on_completion) 619 if todo and timeout is not None: 620 timeout_handle = loop.call_later(timeout, _on_timeout) 621 for _ in range(len(todo)): 622 yield _wait_for_one() 623 624 625@types.coroutine 626def __sleep0(): 627 """Skip one event loop run cycle. 628 629 This is a private helper for 'asyncio.sleep()', used 630 when the 'delay' is set to 0. It uses a bare 'yield' 631 expression (which Task.__step knows how to handle) 632 instead of creating a Future object. 633 """ 634 yield 635 636 637async def sleep(delay, result=None, *, loop=None): 638 """Coroutine that completes after a given time (in seconds).""" 639 if loop is not None: 640 warnings.warn("The loop argument is deprecated since Python 3.8, " 641 "and scheduled for removal in Python 3.10.", 642 DeprecationWarning, stacklevel=2) 643 644 if delay <= 0: 645 await __sleep0() 646 return result 647 648 if loop is None: 649 loop = events.get_running_loop() 650 651 future = loop.create_future() 652 h = loop.call_later(delay, 653 futures._set_result_unless_cancelled, 654 future, result) 655 try: 656 return await future 657 finally: 658 h.cancel() 659 660 661def ensure_future(coro_or_future, *, loop=None): 662 """Wrap a coroutine or an awaitable in a future. 663 664 If the argument is a Future, it is returned directly. 665 """ 666 if coroutines.iscoroutine(coro_or_future): 667 if loop is None: 668 loop = events.get_event_loop() 669 task = loop.create_task(coro_or_future) 670 if task._source_traceback: 671 del task._source_traceback[-1] 672 return task 673 elif futures.isfuture(coro_or_future): 674 if loop is not None and loop is not futures._get_loop(coro_or_future): 675 raise ValueError('The future belongs to a different loop than ' 676 'the one specified as the loop argument') 677 return coro_or_future 678 elif inspect.isawaitable(coro_or_future): 679 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 680 else: 681 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 682 'required') 683 684 685@types.coroutine 686def _wrap_awaitable(awaitable): 687 """Helper for asyncio.ensure_future(). 688 689 Wraps awaitable (an object with __await__) into a coroutine 690 that will later be wrapped in a Task by ensure_future(). 691 """ 692 return (yield from awaitable.__await__()) 693 694_wrap_awaitable._is_coroutine = _is_coroutine 695 696 697class _GatheringFuture(futures.Future): 698 """Helper for gather(). 699 700 This overrides cancel() to cancel all the children and act more 701 like Task.cancel(), which doesn't immediately mark itself as 702 cancelled. 703 """ 704 705 def __init__(self, children, *, loop=None): 706 super().__init__(loop=loop) 707 self._children = children 708 self._cancel_requested = False 709 710 def cancel(self, msg=None): 711 if self.done(): 712 return False 713 ret = False 714 for child in self._children: 715 if child.cancel(msg=msg): 716 ret = True 717 if ret: 718 # If any child tasks were actually cancelled, we should 719 # propagate the cancellation request regardless of 720 # *return_exceptions* argument. See issue 32684. 721 self._cancel_requested = True 722 return ret 723 724 725def gather(*coros_or_futures, loop=None, return_exceptions=False): 726 """Return a future aggregating results from the given coroutines/futures. 727 728 Coroutines will be wrapped in a future and scheduled in the event 729 loop. They will not necessarily be scheduled in the same order as 730 passed in. 731 732 All futures must share the same event loop. If all the tasks are 733 done successfully, the returned future's result is the list of 734 results (in the order of the original sequence, not necessarily 735 the order of results arrival). If *return_exceptions* is True, 736 exceptions in the tasks are treated the same as successful 737 results, and gathered in the result list; otherwise, the first 738 raised exception will be immediately propagated to the returned 739 future. 740 741 Cancellation: if the outer Future is cancelled, all children (that 742 have not completed yet) are also cancelled. If any child is 743 cancelled, this is treated as if it raised CancelledError -- 744 the outer Future is *not* cancelled in this case. (This is to 745 prevent the cancellation of one child to cause other children to 746 be cancelled.) 747 748 If *return_exceptions* is False, cancelling gather() after it 749 has been marked done won't cancel any submitted awaitables. 750 For instance, gather can be marked done after propagating an 751 exception to the caller, therefore, calling ``gather.cancel()`` 752 after catching an exception (raised by one of the awaitables) from 753 gather won't cancel any other awaitables. 754 """ 755 if loop is not None: 756 warnings.warn("The loop argument is deprecated since Python 3.8, " 757 "and scheduled for removal in Python 3.10.", 758 DeprecationWarning, stacklevel=2) 759 760 return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) 761 762 763def _gather(*coros_or_futures, loop=None, return_exceptions=False): 764 if not coros_or_futures: 765 if loop is None: 766 loop = events.get_event_loop() 767 outer = loop.create_future() 768 outer.set_result([]) 769 return outer 770 771 def _done_callback(fut): 772 nonlocal nfinished 773 nfinished += 1 774 775 if outer.done(): 776 if not fut.cancelled(): 777 # Mark exception retrieved. 778 fut.exception() 779 return 780 781 if not return_exceptions: 782 if fut.cancelled(): 783 # Check if 'fut' is cancelled first, as 784 # 'fut.exception()' will *raise* a CancelledError 785 # instead of returning it. 786 exc = fut._make_cancelled_error() 787 outer.set_exception(exc) 788 return 789 else: 790 exc = fut.exception() 791 if exc is not None: 792 outer.set_exception(exc) 793 return 794 795 if nfinished == nfuts: 796 # All futures are done; create a list of results 797 # and set it to the 'outer' future. 798 results = [] 799 800 for fut in children: 801 if fut.cancelled(): 802 # Check if 'fut' is cancelled first, as 'fut.exception()' 803 # will *raise* a CancelledError instead of returning it. 804 # Also, since we're adding the exception return value 805 # to 'results' instead of raising it, don't bother 806 # setting __context__. This also lets us preserve 807 # calling '_make_cancelled_error()' at most once. 808 res = exceptions.CancelledError( 809 '' if fut._cancel_message is None else 810 fut._cancel_message) 811 else: 812 res = fut.exception() 813 if res is None: 814 res = fut.result() 815 results.append(res) 816 817 if outer._cancel_requested: 818 # If gather is being cancelled we must propagate the 819 # cancellation regardless of *return_exceptions* argument. 820 # See issue 32684. 821 exc = fut._make_cancelled_error() 822 outer.set_exception(exc) 823 else: 824 outer.set_result(results) 825 826 arg_to_fut = {} 827 children = [] 828 nfuts = 0 829 nfinished = 0 830 for arg in coros_or_futures: 831 if arg not in arg_to_fut: 832 fut = ensure_future(arg, loop=loop) 833 if loop is None: 834 loop = futures._get_loop(fut) 835 if fut is not arg: 836 # 'arg' was not a Future, therefore, 'fut' is a new 837 # Future created specifically for 'arg'. Since the caller 838 # can't control it, disable the "destroy pending task" 839 # warning. 840 fut._log_destroy_pending = False 841 842 nfuts += 1 843 arg_to_fut[arg] = fut 844 fut.add_done_callback(_done_callback) 845 846 else: 847 # There's a duplicate Future object in coros_or_futures. 848 fut = arg_to_fut[arg] 849 850 children.append(fut) 851 852 outer = _GatheringFuture(children, loop=loop) 853 return outer 854 855 856def shield(arg, *, loop=None): 857 """Wait for a future, shielding it from cancellation. 858 859 The statement 860 861 res = await shield(something()) 862 863 is exactly equivalent to the statement 864 865 res = await something() 866 867 *except* that if the coroutine containing it is cancelled, the 868 task running in something() is not cancelled. From the POV of 869 something(), the cancellation did not happen. But its caller is 870 still cancelled, so the yield-from expression still raises 871 CancelledError. Note: If something() is cancelled by other means 872 this will still cancel shield(). 873 874 If you want to completely ignore cancellation (not recommended) 875 you can combine shield() with a try/except clause, as follows: 876 877 try: 878 res = await shield(something()) 879 except CancelledError: 880 res = None 881 """ 882 if loop is not None: 883 warnings.warn("The loop argument is deprecated since Python 3.8, " 884 "and scheduled for removal in Python 3.10.", 885 DeprecationWarning, stacklevel=2) 886 inner = ensure_future(arg, loop=loop) 887 if inner.done(): 888 # Shortcut. 889 return inner 890 loop = futures._get_loop(inner) 891 outer = loop.create_future() 892 893 def _inner_done_callback(inner): 894 if outer.cancelled(): 895 if not inner.cancelled(): 896 # Mark inner's result as retrieved. 897 inner.exception() 898 return 899 900 if inner.cancelled(): 901 outer.cancel() 902 else: 903 exc = inner.exception() 904 if exc is not None: 905 outer.set_exception(exc) 906 else: 907 outer.set_result(inner.result()) 908 909 910 def _outer_done_callback(outer): 911 if not inner.done(): 912 inner.remove_done_callback(_inner_done_callback) 913 914 inner.add_done_callback(_inner_done_callback) 915 outer.add_done_callback(_outer_done_callback) 916 return outer 917 918 919def run_coroutine_threadsafe(coro, loop): 920 """Submit a coroutine object to a given event loop. 921 922 Return a concurrent.futures.Future to access the result. 923 """ 924 if not coroutines.iscoroutine(coro): 925 raise TypeError('A coroutine object is required') 926 future = concurrent.futures.Future() 927 928 def callback(): 929 try: 930 futures._chain_future(ensure_future(coro, loop=loop), future) 931 except (SystemExit, KeyboardInterrupt): 932 raise 933 except BaseException as exc: 934 if future.set_running_or_notify_cancel(): 935 future.set_exception(exc) 936 raise 937 938 loop.call_soon_threadsafe(callback) 939 return future 940 941 942# WeakSet containing all alive tasks. 943_all_tasks = weakref.WeakSet() 944 945# Dictionary containing tasks that are currently active in 946# all running event loops. {EventLoop: Task} 947_current_tasks = {} 948 949 950def _register_task(task): 951 """Register a new task in asyncio as executed by loop.""" 952 _all_tasks.add(task) 953 954 955def _enter_task(loop, task): 956 current_task = _current_tasks.get(loop) 957 if current_task is not None: 958 raise RuntimeError(f"Cannot enter into task {task!r} while another " 959 f"task {current_task!r} is being executed.") 960 _current_tasks[loop] = task 961 962 963def _leave_task(loop, task): 964 current_task = _current_tasks.get(loop) 965 if current_task is not task: 966 raise RuntimeError(f"Leaving task {task!r} does not match " 967 f"the current task {current_task!r}.") 968 del _current_tasks[loop] 969 970 971def _unregister_task(task): 972 """Unregister a task.""" 973 _all_tasks.discard(task) 974 975 976_py_register_task = _register_task 977_py_unregister_task = _unregister_task 978_py_enter_task = _enter_task 979_py_leave_task = _leave_task 980 981 982try: 983 from _asyncio import (_register_task, _unregister_task, 984 _enter_task, _leave_task, 985 _all_tasks, _current_tasks) 986except ImportError: 987 pass 988else: 989 _c_register_task = _register_task 990 _c_unregister_task = _unregister_task 991 _c_enter_task = _enter_task 992 _c_leave_task = _leave_task 993