1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ['Task', 4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 5 'wait', 'wait_for', 'as_completed', 'sleep', 'async', 6 'gather', 'shield', 7 ] 8 9import concurrent.futures 10import functools 11import inspect 12import linecache 13import sys 14import traceback 15import weakref 16 17from . import coroutines 18from . import events 19from . import futures 20from .coroutines import coroutine 21 22_PY34 = (sys.version_info >= (3, 4)) 23 24 25class Task(futures.Future): 26 """A coroutine wrapped in a Future.""" 27 28 # An important invariant maintained while a Task not done: 29 # 30 # - Either _fut_waiter is None, and _step() is scheduled; 31 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 32 # 33 # The only transition from the latter to the former is through 34 # _wakeup(). When _fut_waiter is not None, one of its callbacks 35 # must be _wakeup(). 36 37 # Weak set containing all tasks alive. 38 _all_tasks = weakref.WeakSet() 39 40 # Dictionary containing tasks that are currently active in 41 # all running event loops. {EventLoop: Task} 42 _current_tasks = {} 43 44 # If False, don't log a message if the task is destroyed whereas its 45 # status is still pending 46 _log_destroy_pending = True 47 48 @classmethod 49 def current_task(cls, loop=None): 50 """Return the currently running task in an event loop or None. 51 52 By default the current task for the current event loop is returned. 53 54 None is returned when called not in the context of a Task. 55 """ 56 if loop is None: 57 loop = events.get_event_loop() 58 return cls._current_tasks.get(loop) 59 60 @classmethod 61 def all_tasks(cls, loop=None): 62 """Return a set of all tasks for an event loop. 63 64 By default all tasks for the current event loop are returned. 65 """ 66 if loop is None: 67 loop = events.get_event_loop() 68 return {t for t in cls._all_tasks if t._loop is loop} 69 70 def __init__(self, coro, *, loop=None): 71 assert coroutines.iscoroutine(coro), repr(coro) 72 super().__init__(loop=loop) 73 if self._source_traceback: 74 del self._source_traceback[-1] 75 self._coro = iter(coro) # Use the iterator just in case. 76 self._fut_waiter = None 77 self._must_cancel = False 78 self._loop.call_soon(self._step) 79 self.__class__._all_tasks.add(self) 80 81 # On Python 3.3 or older, objects with a destructor that are part of a 82 # reference cycle are never destroyed. That's not the case any more on 83 # Python 3.4 thanks to the PEP 442. 84 if _PY34: 85 def __del__(self): 86 if self._state == futures._PENDING and self._log_destroy_pending: 87 context = { 88 'task': self, 89 'message': 'Task was destroyed but it is pending!', 90 } 91 if self._source_traceback: 92 context['source_traceback'] = self._source_traceback 93 self._loop.call_exception_handler(context) 94 futures.Future.__del__(self) 95 96 def _repr_info(self): 97 info = super()._repr_info() 98 99 if self._must_cancel: 100 # replace status 101 info[0] = 'cancelling' 102 103 coro = coroutines._format_coroutine(self._coro) 104 info.insert(1, 'coro=<%s>' % coro) 105 106 if self._fut_waiter is not None: 107 info.insert(2, 'wait_for=%r' % self._fut_waiter) 108 return info 109 110 def get_stack(self, *, limit=None): 111 """Return the list of stack frames for this task's coroutine. 112 113 If the coroutine is not done, this returns the stack where it is 114 suspended. If the coroutine has completed successfully or was 115 cancelled, this returns an empty list. If the coroutine was 116 terminated by an exception, this returns the list of traceback 117 frames. 118 119 The frames are always ordered from oldest to newest. 120 121 The optional limit gives the maximum number of frames to 122 return; by default all available frames are returned. Its 123 meaning differs depending on whether a stack or a traceback is 124 returned: the newest frames of a stack are returned, but the 125 oldest frames of a traceback are returned. (This matches the 126 behavior of the traceback module.) 127 128 For reasons beyond our control, only one stack frame is 129 returned for a suspended coroutine. 130 """ 131 frames = [] 132 f = self._coro.gi_frame 133 if f is not None: 134 while f is not None: 135 if limit is not None: 136 if limit <= 0: 137 break 138 limit -= 1 139 frames.append(f) 140 f = f.f_back 141 frames.reverse() 142 elif self._exception is not None: 143 tb = self._exception.__traceback__ 144 while tb is not None: 145 if limit is not None: 146 if limit <= 0: 147 break 148 limit -= 1 149 frames.append(tb.tb_frame) 150 tb = tb.tb_next 151 return frames 152 153 def print_stack(self, *, limit=None, file=None): 154 """Print the stack or traceback for this task's coroutine. 155 156 This produces output similar to that of the traceback module, 157 for the frames retrieved by get_stack(). The limit argument 158 is passed to get_stack(). The file argument is an I/O stream 159 to which the output is written; by default output is written 160 to sys.stderr. 161 """ 162 extracted_list = [] 163 checked = set() 164 for f in self.get_stack(limit=limit): 165 lineno = f.f_lineno 166 co = f.f_code 167 filename = co.co_filename 168 name = co.co_name 169 if filename not in checked: 170 checked.add(filename) 171 linecache.checkcache(filename) 172 line = linecache.getline(filename, lineno, f.f_globals) 173 extracted_list.append((filename, lineno, name, line)) 174 exc = self._exception 175 if not extracted_list: 176 print('No stack for %r' % self, file=file) 177 elif exc is not None: 178 print('Traceback for %r (most recent call last):' % self, 179 file=file) 180 else: 181 print('Stack for %r (most recent call last):' % self, 182 file=file) 183 traceback.print_list(extracted_list, file=file) 184 if exc is not None: 185 for line in traceback.format_exception_only(exc.__class__, exc): 186 print(line, file=file, end='') 187 188 def cancel(self): 189 """Request that this task cancel itself. 190 191 This arranges for a CancelledError to be thrown into the 192 wrapped coroutine on the next cycle through the event loop. 193 The coroutine then has a chance to clean up or even deny 194 the request using try/except/finally. 195 196 Unlike Future.cancel, this does not guarantee that the 197 task will be cancelled: the exception might be caught and 198 acted upon, delaying cancellation of the task or preventing 199 cancellation completely. The task may also return a value or 200 raise a different exception. 201 202 Immediately after this method is called, Task.cancelled() will 203 not return True (unless the task was already cancelled). A 204 task will be marked as cancelled when the wrapped coroutine 205 terminates with a CancelledError exception (even if cancel() 206 was not called). 207 """ 208 if self.done(): 209 return False 210 if self._fut_waiter is not None: 211 if self._fut_waiter.cancel(): 212 # Leave self._fut_waiter; it may be a Task that 213 # catches and ignores the cancellation so we may have 214 # to cancel it again later. 215 return True 216 # It must be the case that self._step is already scheduled. 217 self._must_cancel = True 218 return True 219 220 def _step(self, value=None, exc=None): 221 assert not self.done(), \ 222 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc) 223 if self._must_cancel: 224 if not isinstance(exc, futures.CancelledError): 225 exc = futures.CancelledError() 226 self._must_cancel = False 227 coro = self._coro 228 self._fut_waiter = None 229 230 self.__class__._current_tasks[self._loop] = self 231 # Call either coro.throw(exc) or coro.send(value). 232 try: 233 if exc is not None: 234 result = coro.throw(exc) 235 elif value is not None: 236 result = coro.send(value) 237 else: 238 result = next(coro) 239 except StopIteration as exc: 240 self.set_result(exc.value) 241 except futures.CancelledError as exc: 242 super().cancel() # I.e., Future.cancel(self). 243 except Exception as exc: 244 self.set_exception(exc) 245 except BaseException as exc: 246 self.set_exception(exc) 247 raise 248 else: 249 if isinstance(result, futures.Future): 250 # Yielded Future must come from Future.__iter__(). 251 if result._blocking: 252 result._blocking = False 253 result.add_done_callback(self._wakeup) 254 self._fut_waiter = result 255 if self._must_cancel: 256 if self._fut_waiter.cancel(): 257 self._must_cancel = False 258 else: 259 self._loop.call_soon( 260 self._step, None, 261 RuntimeError( 262 'yield was used instead of yield from ' 263 'in task {!r} with {!r}'.format(self, result))) 264 elif result is None: 265 # Bare yield relinquishes control for one event loop iteration. 266 self._loop.call_soon(self._step) 267 elif inspect.isgenerator(result): 268 # Yielding a generator is just wrong. 269 self._loop.call_soon( 270 self._step, None, 271 RuntimeError( 272 'yield was used instead of yield from for ' 273 'generator in task {!r} with {}'.format( 274 self, result))) 275 else: 276 # Yielding something else is an error. 277 self._loop.call_soon( 278 self._step, None, 279 RuntimeError( 280 'Task got bad yield: {!r}'.format(result))) 281 finally: 282 self.__class__._current_tasks.pop(self._loop) 283 self = None # Needed to break cycles when an exception occurs. 284 285 def _wakeup(self, future): 286 try: 287 value = future.result() 288 except Exception as exc: 289 # This may also be a cancellation. 290 self._step(None, exc) 291 else: 292 self._step(value, None) 293 self = None # Needed to break cycles when an exception occurs. 294 295 296# wait() and as_completed() similar to those in PEP 3148. 297 298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 301 302 303@coroutine 304def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 305 """Wait for the Futures and coroutines given by fs to complete. 306 307 The sequence futures must not be empty. 308 309 Coroutines will be wrapped in Tasks. 310 311 Returns two sets of Future: (done, pending). 312 313 Usage: 314 315 done, pending = yield from asyncio.wait(fs) 316 317 Note: This does not raise TimeoutError! Futures that aren't done 318 when the timeout occurs are returned in the second set. 319 """ 320 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): 321 raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 322 if not fs: 323 raise ValueError('Set of coroutines/Futures is empty.') 324 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 325 raise ValueError('Invalid return_when value: {}'.format(return_when)) 326 327 if loop is None: 328 loop = events.get_event_loop() 329 330 fs = {async(f, loop=loop) for f in set(fs)} 331 332 return (yield from _wait(fs, timeout, return_when, loop)) 333 334 335def _release_waiter(waiter, *args): 336 if not waiter.done(): 337 waiter.set_result(None) 338 339 340@coroutine 341def wait_for(fut, timeout, *, loop=None): 342 """Wait for the single Future or coroutine to complete, with timeout. 343 344 Coroutine will be wrapped in Task. 345 346 Returns result of the Future or coroutine. When a timeout occurs, 347 it cancels the task and raises TimeoutError. To avoid the task 348 cancellation, wrap it in shield(). 349 350 If the wait is cancelled, the task is also cancelled. 351 352 This function is a coroutine. 353 """ 354 if loop is None: 355 loop = events.get_event_loop() 356 357 if timeout is None: 358 return (yield from fut) 359 360 waiter = futures.Future(loop=loop) 361 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 362 cb = functools.partial(_release_waiter, waiter) 363 364 fut = async(fut, loop=loop) 365 fut.add_done_callback(cb) 366 367 try: 368 # wait until the future completes or the timeout 369 try: 370 yield from waiter 371 except futures.CancelledError: 372 fut.remove_done_callback(cb) 373 fut.cancel() 374 raise 375 376 if fut.done(): 377 return fut.result() 378 else: 379 fut.remove_done_callback(cb) 380 fut.cancel() 381 raise futures.TimeoutError() 382 finally: 383 timeout_handle.cancel() 384 385 386@coroutine 387def _wait(fs, timeout, return_when, loop): 388 """Internal helper for wait() and _wait_for(). 389 390 The fs argument must be a collection of Futures. 391 """ 392 assert fs, 'Set of Futures is empty.' 393 waiter = futures.Future(loop=loop) 394 timeout_handle = None 395 if timeout is not None: 396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 397 counter = len(fs) 398 399 def _on_completion(f): 400 nonlocal counter 401 counter -= 1 402 if (counter <= 0 or 403 return_when == FIRST_COMPLETED or 404 return_when == FIRST_EXCEPTION and (not f.cancelled() and 405 f.exception() is not None)): 406 if timeout_handle is not None: 407 timeout_handle.cancel() 408 if not waiter.done(): 409 waiter.set_result(None) 410 411 for f in fs: 412 f.add_done_callback(_on_completion) 413 414 try: 415 yield from waiter 416 finally: 417 if timeout_handle is not None: 418 timeout_handle.cancel() 419 420 done, pending = set(), set() 421 for f in fs: 422 f.remove_done_callback(_on_completion) 423 if f.done(): 424 done.add(f) 425 else: 426 pending.add(f) 427 return done, pending 428 429 430# This is *not* a @coroutine! It is just an iterator (yielding Futures). 431def as_completed(fs, *, loop=None, timeout=None): 432 """Return an iterator whose values are coroutines. 433 434 When waiting for the yielded coroutines you'll get the results (or 435 exceptions!) of the original Futures (or coroutines), in the order 436 in which and as soon as they complete. 437 438 This differs from PEP 3148; the proper way to use this is: 439 440 for f in as_completed(fs): 441 result = yield from f # The 'yield from' may raise. 442 # Use result. 443 444 If a timeout is specified, the 'yield from' will raise 445 TimeoutError when the timeout occurs before all Futures are done. 446 447 Note: The futures 'f' are not necessarily members of fs. 448 """ 449 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): 450 raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 451 loop = loop if loop is not None else events.get_event_loop() 452 todo = {async(f, loop=loop) for f in set(fs)} 453 from .queues import Queue # Import here to avoid circular import problem. 454 done = Queue(loop=loop) 455 timeout_handle = None 456 457 def _on_timeout(): 458 for f in todo: 459 f.remove_done_callback(_on_completion) 460 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 461 todo.clear() # Can't do todo.remove(f) in the loop. 462 463 def _on_completion(f): 464 if not todo: 465 return # _on_timeout() was here first. 466 todo.remove(f) 467 done.put_nowait(f) 468 if not todo and timeout_handle is not None: 469 timeout_handle.cancel() 470 471 @coroutine 472 def _wait_for_one(): 473 f = yield from done.get() 474 if f is None: 475 # Dummy value from _on_timeout(). 476 raise futures.TimeoutError 477 return f.result() # May raise f.exception(). 478 479 for f in todo: 480 f.add_done_callback(_on_completion) 481 if todo and timeout is not None: 482 timeout_handle = loop.call_later(timeout, _on_timeout) 483 for _ in range(len(todo)): 484 yield _wait_for_one() 485 486 487@coroutine 488def sleep(delay, result=None, *, loop=None): 489 """Coroutine that completes after a given time (in seconds).""" 490 future = futures.Future(loop=loop) 491 h = future._loop.call_later(delay, 492 future._set_result_unless_cancelled, result) 493 try: 494 return (yield from future) 495 finally: 496 h.cancel() 497 498 499def async(coro_or_future, *, loop=None): 500 """Wrap a coroutine in a future. 501 502 If the argument is a Future, it is returned directly. 503 """ 504 if isinstance(coro_or_future, futures.Future): 505 if loop is not None and loop is not coro_or_future._loop: 506 raise ValueError('loop argument must agree with Future') 507 return coro_or_future 508 elif coroutines.iscoroutine(coro_or_future): 509 if loop is None: 510 loop = events.get_event_loop() 511 task = loop.create_task(coro_or_future) 512 if task._source_traceback: 513 del task._source_traceback[-1] 514 return task 515 else: 516 raise TypeError('A Future or coroutine is required') 517 518 519class _GatheringFuture(futures.Future): 520 """Helper for gather(). 521 522 This overrides cancel() to cancel all the children and act more 523 like Task.cancel(), which doesn't immediately mark itself as 524 cancelled. 525 """ 526 527 def __init__(self, children, *, loop=None): 528 super().__init__(loop=loop) 529 self._children = children 530 531 def cancel(self): 532 if self.done(): 533 return False 534 for child in self._children: 535 child.cancel() 536 return True 537 538 539def gather(*coros_or_futures, loop=None, return_exceptions=False): 540 """Return a future aggregating results from the given coroutines 541 or futures. 542 543 All futures must share the same event loop. If all the tasks are 544 done successfully, the returned future's result is the list of 545 results (in the order of the original sequence, not necessarily 546 the order of results arrival). If *return_exceptions* is True, 547 exceptions in the tasks are treated the same as successful 548 results, and gathered in the result list; otherwise, the first 549 raised exception will be immediately propagated to the returned 550 future. 551 552 Cancellation: if the outer Future is cancelled, all children (that 553 have not completed yet) are also cancelled. If any child is 554 cancelled, this is treated as if it raised CancelledError -- 555 the outer Future is *not* cancelled in this case. (This is to 556 prevent the cancellation of one child to cause other children to 557 be cancelled.) 558 """ 559 if not coros_or_futures: 560 outer = futures.Future(loop=loop) 561 outer.set_result([]) 562 return outer 563 564 arg_to_fut = {} 565 for arg in set(coros_or_futures): 566 if not isinstance(arg, futures.Future): 567 fut = async(arg, loop=loop) 568 if loop is None: 569 loop = fut._loop 570 # The caller cannot control this future, the "destroy pending task" 571 # warning should not be emitted. 572 fut._log_destroy_pending = False 573 else: 574 fut = arg 575 if loop is None: 576 loop = fut._loop 577 elif fut._loop is not loop: 578 raise ValueError("futures are tied to different event loops") 579 arg_to_fut[arg] = fut 580 581 children = [arg_to_fut[arg] for arg in coros_or_futures] 582 nchildren = len(children) 583 outer = _GatheringFuture(children, loop=loop) 584 nfinished = 0 585 results = [None] * nchildren 586 587 def _done_callback(i, fut): 588 nonlocal nfinished 589 if outer.done(): 590 if not fut.cancelled(): 591 # Mark exception retrieved. 592 fut.exception() 593 return 594 595 if fut.cancelled(): 596 res = futures.CancelledError() 597 if not return_exceptions: 598 outer.set_exception(res) 599 return 600 elif fut._exception is not None: 601 res = fut.exception() # Mark exception retrieved. 602 if not return_exceptions: 603 outer.set_exception(res) 604 return 605 else: 606 res = fut._result 607 results[i] = res 608 nfinished += 1 609 if nfinished == nchildren: 610 outer.set_result(results) 611 612 for i, fut in enumerate(children): 613 fut.add_done_callback(functools.partial(_done_callback, i)) 614 return outer 615 616 617def shield(arg, *, loop=None): 618 """Wait for a future, shielding it from cancellation. 619 620 The statement 621 622 res = yield from shield(something()) 623 624 is exactly equivalent to the statement 625 626 res = yield from something() 627 628 *except* that if the coroutine containing it is cancelled, the 629 task running in something() is not cancelled. From the POV of 630 something(), the cancellation did not happen. But its caller is 631 still cancelled, so the yield-from expression still raises 632 CancelledError. Note: If something() is cancelled by other means 633 this will still cancel shield(). 634 635 If you want to completely ignore cancellation (not recommended) 636 you can combine shield() with a try/except clause, as follows: 637 638 try: 639 res = yield from shield(something()) 640 except CancelledError: 641 res = None 642 """ 643 inner = async(arg, loop=loop) 644 if inner.done(): 645 # Shortcut. 646 return inner 647 loop = inner._loop 648 outer = futures.Future(loop=loop) 649 650 def _done_callback(inner): 651 if outer.cancelled(): 652 if not inner.cancelled(): 653 # Mark inner's result as retrieved. 654 inner.exception() 655 return 656 657 if inner.cancelled(): 658 outer.cancel() 659 else: 660 exc = inner.exception() 661 if exc is not None: 662 outer.set_exception(exc) 663 else: 664 outer.set_result(inner.result()) 665 666 inner.add_done_callback(_done_callback) 667 return outer 668