1# coding: utf-8 2 3import functools 4import itertools 5import logging 6import os 7import random 8import select 9import sys 10import threading 11from collections import deque 12import collections.abc 13from contextlib import contextmanager 14import warnings 15import weakref 16import enum 17 18from contextvars import copy_context 19from math import inf 20from time import perf_counter 21from typing import Callable, TYPE_CHECKING 22 23from sniffio import current_async_library_cvar 24 25import attr 26from heapq import heapify, heappop, heappush 27from sortedcontainers import SortedDict 28from outcome import Error, Outcome, Value, capture 29 30from ._entry_queue import EntryQueue, TrioToken 31from ._exceptions import TrioInternalError, RunFinishedError, Cancelled 32from ._ki import ( 33 LOCALS_KEY_KI_PROTECTION_ENABLED, 34 KIManager, 35 enable_ki_protection, 36) 37from ._multierror import MultiError 38from ._traps import ( 39 Abort, 40 wait_task_rescheduled, 41 cancel_shielded_checkpoint, 42 CancelShieldedCheckpoint, 43 PermanentlyDetachCoroutineObject, 44 WaitTaskRescheduled, 45) 46from ._asyncgens import AsyncGenerators 47from ._thread_cache import start_thread_soon 48from ._instrumentation import Instruments 49from .. import _core 50from .._deprecate import warn_deprecated 51from .._util import Final, NoPublicConstructor, coroutine_or_error 52 53DEADLINE_HEAP_MIN_PRUNE_THRESHOLD = 1000 54 55_NO_SEND = object() 56 57 58# Decorator to mark methods public. This does nothing by itself, but 59# trio/_tools/gen_exports.py looks for it. 60def _public(fn): 61 return fn 62 63 64# When running under Hypothesis, we want examples to be reproducible and 65# shrinkable. pytest-trio's Hypothesis integration monkeypatches this 66# variable to True, and registers the Random instance _r for Hypothesis 67# to manage for each test case, which together should make Trio's task 68# scheduling loop deterministic. We have a test for that, of course. 69_ALLOW_DETERMINISTIC_SCHEDULING = False 70_r = random.Random() 71 72 73# On 3.7+, Context.run() is implemented in C and doesn't show up in 74# tracebacks. On 3.6, we use the contextvars backport, which is 75# currently implemented in Python and adds 1 frame to tracebacks. So this 76# function is a super-overkill version of "0 if sys.version_info >= (3, 7) 77# else 1". But if Context.run ever changes, we'll be ready! 78# 79# This can all be removed once we drop support for 3.6. 80def _count_context_run_tb_frames(): 81 def function_with_unique_name_xyzzy(): 82 1 / 0 83 84 ctx = copy_context() 85 try: 86 ctx.run(function_with_unique_name_xyzzy) 87 except ZeroDivisionError as exc: 88 tb = exc.__traceback__ 89 # Skip the frame where we caught it 90 tb = tb.tb_next 91 count = 0 92 while tb.tb_frame.f_code.co_name != "function_with_unique_name_xyzzy": 93 tb = tb.tb_next 94 count += 1 95 return count 96 97 98CONTEXT_RUN_TB_FRAMES = _count_context_run_tb_frames() 99 100 101@attr.s(frozen=True, slots=True) 102class SystemClock: 103 # Add a large random offset to our clock to ensure that if people 104 # accidentally call time.perf_counter() directly or start comparing clocks 105 # between different runs, then they'll notice the bug quickly: 106 offset = attr.ib(factory=lambda: _r.uniform(10000, 200000)) 107 108 def start_clock(self): 109 pass 110 111 # In cPython 3, on every platform except Windows, perf_counter is 112 # exactly the same as time.monotonic; and on Windows, it uses 113 # QueryPerformanceCounter instead of GetTickCount64. 114 def current_time(self): 115 return self.offset + perf_counter() 116 117 def deadline_to_sleep_time(self, deadline): 118 return deadline - self.current_time() 119 120 121class IdlePrimedTypes(enum.Enum): 122 WAITING_FOR_IDLE = 1 123 AUTOJUMP_CLOCK = 2 124 125 126################################################################ 127# CancelScope and friends 128################################################################ 129 130 131@attr.s(eq=False, slots=True) 132class Deadlines: 133 """A container of deadlined cancel scopes. 134 135 Only contains scopes with non-infinite deadlines that are currently 136 attached to at least one task. 137 138 """ 139 140 # Heap of (deadline, id(CancelScope), CancelScope) 141 _heap = attr.ib(factory=list) 142 # Count of active deadlines (those that haven't been changed) 143 _active = attr.ib(default=0) 144 145 def add(self, deadline, cancel_scope): 146 heappush(self._heap, (deadline, id(cancel_scope), cancel_scope)) 147 self._active += 1 148 149 def remove(self, deadline, cancel_scope): 150 self._active -= 1 151 152 def next_deadline(self): 153 while self._heap: 154 deadline, _, cancel_scope = self._heap[0] 155 if deadline == cancel_scope._registered_deadline: 156 return deadline 157 else: 158 # This entry is stale; discard it and try again 159 heappop(self._heap) 160 return inf 161 162 def _prune(self): 163 # In principle, it's possible for a cancel scope to toggle back and 164 # forth repeatedly between the same two deadlines, and end up with 165 # lots of stale entries that *look* like they're still active, because 166 # their deadline is correct, but in fact are redundant. So when 167 # pruning we have to eliminate entries with the wrong deadline, *and* 168 # eliminate duplicates. 169 seen = set() 170 pruned_heap = [] 171 for deadline, tiebreaker, cancel_scope in self._heap: 172 if deadline == cancel_scope._registered_deadline: 173 if cancel_scope in seen: 174 continue 175 seen.add(cancel_scope) 176 pruned_heap.append((deadline, tiebreaker, cancel_scope)) 177 # See test_cancel_scope_deadline_duplicates for a test that exercises 178 # this assert: 179 assert len(pruned_heap) == self._active 180 heapify(pruned_heap) 181 self._heap = pruned_heap 182 183 def expire(self, now): 184 did_something = False 185 while self._heap and self._heap[0][0] <= now: 186 deadline, _, cancel_scope = heappop(self._heap) 187 if deadline == cancel_scope._registered_deadline: 188 did_something = True 189 # This implicitly calls self.remove(), so we don't need to 190 # decrement _active here 191 cancel_scope.cancel() 192 # If we've accumulated too many stale entries, then prune the heap to 193 # keep it under control. (We only do this occasionally in a batch, to 194 # keep the amortized cost down) 195 if len(self._heap) > self._active * 2 + DEADLINE_HEAP_MIN_PRUNE_THRESHOLD: 196 self._prune() 197 return did_something 198 199 200@attr.s(eq=False, slots=True) 201class CancelStatus: 202 """Tracks the cancellation status for a contiguous extent 203 of code that will become cancelled, or not, as a unit. 204 205 Each task has at all times a single "active" CancelStatus whose 206 cancellation state determines whether checkpoints executed in that 207 task raise Cancelled. Each 'with CancelScope(...)' context is 208 associated with a particular CancelStatus. When a task enters 209 such a context, a CancelStatus is created which becomes the active 210 CancelStatus for that task; when the 'with' block is exited, the 211 active CancelStatus for that task goes back to whatever it was 212 before. 213 214 CancelStatus objects are arranged in a tree whose structure 215 mirrors the lexical nesting of the cancel scope contexts. When a 216 CancelStatus becomes cancelled, it notifies all of its direct 217 children, who become cancelled in turn (and continue propagating 218 the cancellation down the tree) unless they are shielded. (There 219 will be at most one such child except in the case of a 220 CancelStatus that immediately encloses a nursery.) At the leaves 221 of this tree are the tasks themselves, which get woken up to deliver 222 an abort when their direct parent CancelStatus becomes cancelled. 223 224 You can think of CancelStatus as being responsible for the 225 "plumbing" of cancellations as oppposed to CancelScope which is 226 responsible for the origination of them. 227 228 """ 229 230 # Our associated cancel scope. Can be any object with attributes 231 # `deadline`, `shield`, and `cancel_called`, but in current usage 232 # is always a CancelScope object. Must not be None. 233 _scope = attr.ib() 234 235 # True iff the tasks in self._tasks should receive cancellations 236 # when they checkpoint. Always True when scope.cancel_called is True; 237 # may also be True due to a cancellation propagated from our 238 # parent. Unlike scope.cancel_called, this does not necessarily stay 239 # true once it becomes true. For example, we might become 240 # effectively cancelled due to the cancel scope two levels out 241 # becoming cancelled, but then the cancel scope one level out 242 # becomes shielded so we're not effectively cancelled anymore. 243 effectively_cancelled = attr.ib(default=False) 244 245 # The CancelStatus whose cancellations can propagate to us; we 246 # become effectively cancelled when they do, unless scope.shield 247 # is True. May be None (for the outermost CancelStatus in a call 248 # to trio.run(), briefly during TaskStatus.started(), or during 249 # recovery from mis-nesting of cancel scopes). 250 _parent = attr.ib(default=None, repr=False) 251 252 # All of the CancelStatuses that have this CancelStatus as their parent. 253 _children = attr.ib(factory=set, init=False, repr=False) 254 255 # Tasks whose cancellation state is currently tied directly to 256 # the cancellation state of this CancelStatus object. Don't modify 257 # this directly; instead, use Task._activate_cancel_status(). 258 # Invariant: all(task._cancel_status is self for task in self._tasks) 259 _tasks = attr.ib(factory=set, init=False, repr=False) 260 261 # Set to True on still-active cancel statuses that are children 262 # of a cancel status that's been closed. This is used to permit 263 # recovery from mis-nested cancel scopes (well, at least enough 264 # recovery to show a useful traceback). 265 abandoned_by_misnesting = attr.ib(default=False, init=False, repr=False) 266 267 def __attrs_post_init__(self): 268 if self._parent is not None: 269 self._parent._children.add(self) 270 self.recalculate() 271 272 # parent/children/tasks accessors are used by TaskStatus.started() 273 274 @property 275 def parent(self): 276 return self._parent 277 278 @parent.setter 279 def parent(self, parent): 280 if self._parent is not None: 281 self._parent._children.remove(self) 282 self._parent = parent 283 if self._parent is not None: 284 self._parent._children.add(self) 285 self.recalculate() 286 287 @property 288 def children(self): 289 return frozenset(self._children) 290 291 @property 292 def tasks(self): 293 return frozenset(self._tasks) 294 295 def encloses(self, other): 296 """Returns true if this cancel status is a direct or indirect 297 parent of cancel status *other*, or if *other* is *self*. 298 """ 299 while other is not None: 300 if other is self: 301 return True 302 other = other.parent 303 return False 304 305 def close(self): 306 self.parent = None # now we're not a child of self.parent anymore 307 if self._tasks or self._children: 308 # Cancel scopes weren't exited in opposite order of being 309 # entered. CancelScope._close() deals with raising an error 310 # if appropriate; our job is to leave things in a reasonable 311 # state for unwinding our dangling children. We choose to leave 312 # this part of the CancelStatus tree unlinked from everyone 313 # else, cancelled, and marked so that exiting a CancelScope 314 # within the abandoned subtree doesn't affect the active 315 # CancelStatus. Note that it's possible for us to get here 316 # without CancelScope._close() raising an error, if a 317 # nursery's cancel scope is closed within the nursery's 318 # nested child and no other cancel scopes are involved, 319 # but in that case task_exited() will deal with raising 320 # the error. 321 self._mark_abandoned() 322 323 # Since our CancelScope is about to forget about us, and we 324 # have no parent anymore, there's nothing left to call 325 # recalculate(). So, we can stay cancelled by setting 326 # effectively_cancelled and updating our children. 327 self.effectively_cancelled = True 328 for task in self._tasks: 329 task._attempt_delivery_of_any_pending_cancel() 330 for child in self._children: 331 child.recalculate() 332 333 @property 334 def parent_cancellation_is_visible_to_us(self): 335 return ( 336 self._parent is not None 337 and not self._scope.shield 338 and self._parent.effectively_cancelled 339 ) 340 341 def recalculate(self): 342 # This does a depth-first traversal over this and descendent cancel 343 # statuses, to ensure their state is up-to-date. It's basically a 344 # recursive algorithm, but we use an explicit stack to avoid any 345 # issues with stack overflow. 346 todo = [self] 347 while todo: 348 current = todo.pop() 349 new_state = ( 350 current._scope.cancel_called 351 or current.parent_cancellation_is_visible_to_us 352 ) 353 if new_state != current.effectively_cancelled: 354 current.effectively_cancelled = new_state 355 if new_state: 356 for task in current._tasks: 357 task._attempt_delivery_of_any_pending_cancel() 358 todo.extend(current._children) 359 360 def _mark_abandoned(self): 361 self.abandoned_by_misnesting = True 362 for child in self._children: 363 child._mark_abandoned() 364 365 def effective_deadline(self): 366 if self.effectively_cancelled: 367 return -inf 368 if self._parent is None or self._scope.shield: 369 return self._scope.deadline 370 return min(self._scope.deadline, self._parent.effective_deadline()) 371 372 373MISNESTING_ADVICE = """ 374This is probably a bug in your code, that has caused Trio's internal state to 375become corrupted. We'll do our best to recover, but from now on there are 376no guarantees. 377 378Typically this is caused by one of the following: 379 - yielding within a generator or async generator that's opened a cancel 380 scope or nursery (unless the generator is a @contextmanager or 381 @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 382 - manually calling __enter__ or __exit__ on a trio.CancelScope, or 383 __aenter__ or __aexit__ on the object returned by trio.open_nursery(); 384 doing so correctly is difficult and you should use @[async]contextmanager 385 instead, or maybe [Async]ExitStack 386 - using [Async]ExitStack to interleave the entries/exits of cancel scopes 387 and/or nurseries in a way that couldn't be achieved by some nesting of 388 'with' and 'async with' blocks 389 - using the low-level coroutine object protocol to execute some parts of 390 an async function in a different cancel scope/nursery context than 391 other parts 392If you don't believe you're doing any of these things, please file a bug: 393https://github.com/python-trio/trio/issues/new 394""" 395 396 397@attr.s(eq=False, repr=False, slots=True) 398class CancelScope(metaclass=Final): 399 """A *cancellation scope*: the link between a unit of cancellable 400 work and Trio's cancellation system. 401 402 A :class:`CancelScope` becomes associated with some cancellable work 403 when it is used as a context manager surrounding that work:: 404 405 cancel_scope = trio.CancelScope() 406 ... 407 with cancel_scope: 408 await long_running_operation() 409 410 Inside the ``with`` block, a cancellation of ``cancel_scope`` (via 411 a call to its :meth:`cancel` method or via the expiry of its 412 :attr:`deadline`) will immediately interrupt the 413 ``long_running_operation()`` by raising :exc:`Cancelled` at its 414 next :ref:`checkpoint <checkpoints>`. 415 416 The context manager ``__enter__`` returns the :class:`CancelScope` 417 object itself, so you can also write ``with trio.CancelScope() as 418 cancel_scope:``. 419 420 If a cancel scope becomes cancelled before entering its ``with`` block, 421 the :exc:`Cancelled` exception will be raised at the first 422 checkpoint inside the ``with`` block. This allows a 423 :class:`CancelScope` to be created in one :ref:`task <tasks>` and 424 passed to another, so that the first task can later cancel some work 425 inside the second. 426 427 Cancel scopes are not reusable or reentrant; that is, each cancel 428 scope can be used for at most one ``with`` block. (You'll get a 429 :exc:`RuntimeError` if you violate this rule.) 430 431 The :class:`CancelScope` constructor takes initial values for the 432 cancel scope's :attr:`deadline` and :attr:`shield` attributes; these 433 may be freely modified after construction, whether or not the scope 434 has been entered yet, and changes take immediate effect. 435 """ 436 437 _cancel_status = attr.ib(default=None, init=False) 438 _has_been_entered = attr.ib(default=False, init=False) 439 _registered_deadline = attr.ib(default=inf, init=False) 440 _cancel_called = attr.ib(default=False, init=False) 441 cancelled_caught = attr.ib(default=False, init=False) 442 443 # Constructor arguments: 444 _deadline = attr.ib(default=inf, kw_only=True) 445 _shield = attr.ib(default=False, kw_only=True) 446 447 @enable_ki_protection 448 def __enter__(self): 449 task = _core.current_task() 450 if self._has_been_entered: 451 raise RuntimeError( 452 "Each CancelScope may only be used for a single 'with' block" 453 ) 454 self._has_been_entered = True 455 if current_time() >= self._deadline: 456 self.cancel() 457 with self._might_change_registered_deadline(): 458 self._cancel_status = CancelStatus(scope=self, parent=task._cancel_status) 459 task._activate_cancel_status(self._cancel_status) 460 return self 461 462 def _exc_filter(self, exc): 463 if isinstance(exc, Cancelled): 464 self.cancelled_caught = True 465 return None 466 return exc 467 468 def _close(self, exc): 469 if self._cancel_status is None: 470 new_exc = RuntimeError( 471 "Cancel scope stack corrupted: attempted to exit {!r} " 472 "which had already been exited".format(self) 473 ) 474 new_exc.__context__ = exc 475 return new_exc 476 scope_task = current_task() 477 if scope_task._cancel_status is not self._cancel_status: 478 # Cancel scope mis-nesting: this cancel scope isn't the most 479 # recently opened by this task (that's still open). That is, 480 # our assumptions about context managers forming a stack 481 # have been violated. Try and make the best of it. 482 if self._cancel_status.abandoned_by_misnesting: 483 # We are an inner cancel scope that was still active when 484 # some outer scope was closed. The closure of that outer 485 # scope threw an error, so we don't need to throw another 486 # one; it would just confuse the traceback. 487 pass 488 elif not self._cancel_status.encloses(scope_task._cancel_status): 489 # This task isn't even indirectly contained within the 490 # cancel scope it's trying to close. Raise an error 491 # without changing any state. 492 new_exc = RuntimeError( 493 "Cancel scope stack corrupted: attempted to exit {!r} " 494 "from unrelated {!r}\n{}".format( 495 self, scope_task, MISNESTING_ADVICE 496 ) 497 ) 498 new_exc.__context__ = exc 499 return new_exc 500 else: 501 # Otherwise, there's some inner cancel scope(s) that 502 # we're abandoning by closing this outer one. 503 # CancelStatus.close() will take care of the plumbing; 504 # we just need to make sure we don't let the error 505 # pass silently. 506 new_exc = RuntimeError( 507 "Cancel scope stack corrupted: attempted to exit {!r} " 508 "in {!r} that's still within its child {!r}\n{}".format( 509 self, 510 scope_task, 511 scope_task._cancel_status._scope, 512 MISNESTING_ADVICE, 513 ) 514 ) 515 new_exc.__context__ = exc 516 exc = new_exc 517 scope_task._activate_cancel_status(self._cancel_status.parent) 518 else: 519 scope_task._activate_cancel_status(self._cancel_status.parent) 520 if ( 521 exc is not None 522 and self._cancel_status.effectively_cancelled 523 and not self._cancel_status.parent_cancellation_is_visible_to_us 524 ): 525 exc = MultiError.filter(self._exc_filter, exc) 526 self._cancel_status.close() 527 with self._might_change_registered_deadline(): 528 self._cancel_status = None 529 return exc 530 531 @enable_ki_protection 532 def __exit__(self, etype, exc, tb): 533 # NB: NurseryManager calls _close() directly rather than __exit__(), 534 # so __exit__() must be just _close() plus this logic for adapting 535 # the exception-filtering result to the context manager API. 536 537 # Tracebacks show the 'raise' line below out of context, so let's give 538 # this variable a name that makes sense out of context. 539 remaining_error_after_cancel_scope = self._close(exc) 540 if remaining_error_after_cancel_scope is None: 541 return True 542 elif remaining_error_after_cancel_scope is exc: 543 return False 544 else: 545 # Copied verbatim from MultiErrorCatcher. Python doesn't 546 # allow us to encapsulate this __context__ fixup. 547 old_context = remaining_error_after_cancel_scope.__context__ 548 try: 549 raise remaining_error_after_cancel_scope 550 finally: 551 _, value, _ = sys.exc_info() 552 assert value is remaining_error_after_cancel_scope 553 value.__context__ = old_context 554 555 def __repr__(self): 556 if self._cancel_status is not None: 557 binding = "active" 558 elif self._has_been_entered: 559 binding = "exited" 560 else: 561 binding = "unbound" 562 563 if self._cancel_called: 564 state = ", cancelled" 565 elif self._deadline == inf: 566 state = "" 567 else: 568 try: 569 now = current_time() 570 except RuntimeError: # must be called from async context 571 state = "" 572 else: 573 state = ", deadline is {:.2f} seconds {}".format( 574 abs(self._deadline - now), 575 "from now" if self._deadline >= now else "ago", 576 ) 577 578 return "<trio.CancelScope at {:#x}, {}{}>".format(id(self), binding, state) 579 580 @contextmanager 581 @enable_ki_protection 582 def _might_change_registered_deadline(self): 583 try: 584 yield 585 finally: 586 old = self._registered_deadline 587 if self._cancel_status is None or self._cancel_called: 588 new = inf 589 else: 590 new = self._deadline 591 if old != new: 592 self._registered_deadline = new 593 runner = GLOBAL_RUN_CONTEXT.runner 594 if runner.is_guest: 595 old_next_deadline = runner.deadlines.next_deadline() 596 if old != inf: 597 runner.deadlines.remove(old, self) 598 if new != inf: 599 runner.deadlines.add(new, self) 600 if runner.is_guest: 601 new_next_deadline = runner.deadlines.next_deadline() 602 if old_next_deadline != new_next_deadline: 603 runner.force_guest_tick_asap() 604 605 @property 606 def deadline(self): 607 """Read-write, :class:`float`. An absolute time on the current 608 run's clock at which this scope will automatically become 609 cancelled. You can adjust the deadline by modifying this 610 attribute, e.g.:: 611 612 # I need a little more time! 613 cancel_scope.deadline += 30 614 615 Note that for efficiency, the core run loop only checks for 616 expired deadlines every once in a while. This means that in 617 certain cases there may be a short delay between when the clock 618 says the deadline should have expired, and when checkpoints 619 start raising :exc:`~trio.Cancelled`. This is a very obscure 620 corner case that you're unlikely to notice, but we document it 621 for completeness. (If this *does* cause problems for you, of 622 course, then `we want to know! 623 <https://github.com/python-trio/trio/issues>`__) 624 625 Defaults to :data:`math.inf`, which means "no deadline", though 626 this can be overridden by the ``deadline=`` argument to 627 the :class:`~trio.CancelScope` constructor. 628 """ 629 return self._deadline 630 631 @deadline.setter 632 def deadline(self, new_deadline): 633 with self._might_change_registered_deadline(): 634 self._deadline = float(new_deadline) 635 636 @property 637 def shield(self): 638 """Read-write, :class:`bool`, default :data:`False`. So long as 639 this is set to :data:`True`, then the code inside this scope 640 will not receive :exc:`~trio.Cancelled` exceptions from scopes 641 that are outside this scope. They can still receive 642 :exc:`~trio.Cancelled` exceptions from (1) this scope, or (2) 643 scopes inside this scope. You can modify this attribute:: 644 645 with trio.CancelScope() as cancel_scope: 646 cancel_scope.shield = True 647 # This cannot be interrupted by any means short of 648 # killing the process: 649 await sleep(10) 650 651 cancel_scope.shield = False 652 # Now this can be cancelled normally: 653 await sleep(10) 654 655 Defaults to :data:`False`, though this can be overridden by the 656 ``shield=`` argument to the :class:`~trio.CancelScope` constructor. 657 """ 658 return self._shield 659 660 @shield.setter # type: ignore # "decorated property not supported" 661 @enable_ki_protection 662 def shield(self, new_value): 663 if not isinstance(new_value, bool): 664 raise TypeError("shield must be a bool") 665 self._shield = new_value 666 if self._cancel_status is not None: 667 self._cancel_status.recalculate() 668 669 @enable_ki_protection 670 def cancel(self): 671 """Cancels this scope immediately. 672 673 This method is idempotent, i.e., if the scope was already 674 cancelled then this method silently does nothing. 675 """ 676 if self._cancel_called: 677 return 678 with self._might_change_registered_deadline(): 679 self._cancel_called = True 680 if self._cancel_status is not None: 681 self._cancel_status.recalculate() 682 683 @property 684 def cancel_called(self): 685 """Readonly :class:`bool`. Records whether cancellation has been 686 requested for this scope, either by an explicit call to 687 :meth:`cancel` or by the deadline expiring. 688 689 This attribute being True does *not* necessarily mean that the 690 code within the scope has been, or will be, affected by the 691 cancellation. For example, if :meth:`cancel` was called after 692 the last checkpoint in the ``with`` block, when it's too late to 693 deliver a :exc:`~trio.Cancelled` exception, then this attribute 694 will still be True. 695 696 This attribute is mostly useful for debugging and introspection. 697 If you want to know whether or not a chunk of code was actually 698 cancelled, then :attr:`cancelled_caught` is usually more 699 appropriate. 700 """ 701 if self._cancel_status is not None or not self._has_been_entered: 702 # Scope is active or not yet entered: make sure cancel_called 703 # is true if the deadline has passed. This shouldn't 704 # be able to actually change behavior, since we check for 705 # deadline expiry on scope entry and at every checkpoint, 706 # but it makes the value returned by cancel_called more 707 # closely match expectations. 708 if not self._cancel_called and current_time() >= self._deadline: 709 self.cancel() 710 return self._cancel_called 711 712 713################################################################ 714# Nursery and friends 715################################################################ 716 717 718# This code needs to be read alongside the code from Nursery.start to make 719# sense. 720@attr.s(eq=False, hash=False, repr=False) 721class _TaskStatus: 722 _old_nursery = attr.ib() 723 _new_nursery = attr.ib() 724 _called_started = attr.ib(default=False) 725 _value = attr.ib(default=None) 726 727 def __repr__(self): 728 return "<Task status object at {:#x}>".format(id(self)) 729 730 def started(self, value=None): 731 if self._called_started: 732 raise RuntimeError("called 'started' twice on the same task status") 733 self._called_started = True 734 self._value = value 735 736 # If the old nursery is cancelled, then quietly quit now; the child 737 # will eventually exit on its own, and we don't want to risk moving 738 # children that might have propagating Cancelled exceptions into 739 # a place with no cancelled cancel scopes to catch them. 740 if self._old_nursery._cancel_status.effectively_cancelled: 741 return 742 743 # Can't be closed, b/c we checked in start() and then _pending_starts 744 # should keep it open. 745 assert not self._new_nursery._closed 746 747 # Move tasks from the old nursery to the new 748 tasks = self._old_nursery._children 749 self._old_nursery._children = set() 750 for task in tasks: 751 task._parent_nursery = self._new_nursery 752 task._eventual_parent_nursery = None 753 self._new_nursery._children.add(task) 754 755 # Move all children of the old nursery's cancel status object 756 # to be underneath the new nursery instead. This includes both 757 # tasks and child cancel status objects. 758 # NB: If the new nursery is cancelled, reparenting a cancel 759 # status to be underneath it can invoke an abort_fn, which might 760 # do something evil like cancel the old nursery. We thus break 761 # everything off from the old nursery before we start attaching 762 # anything to the new. 763 cancel_status_children = self._old_nursery._cancel_status.children 764 cancel_status_tasks = set(self._old_nursery._cancel_status.tasks) 765 cancel_status_tasks.discard(self._old_nursery._parent_task) 766 for cancel_status in cancel_status_children: 767 cancel_status.parent = None 768 for task in cancel_status_tasks: 769 task._activate_cancel_status(None) 770 for cancel_status in cancel_status_children: 771 cancel_status.parent = self._new_nursery._cancel_status 772 for task in cancel_status_tasks: 773 task._activate_cancel_status(self._new_nursery._cancel_status) 774 775 # That should have removed all the children from the old nursery 776 assert not self._old_nursery._children 777 778 # And finally, poke the old nursery so it notices that all its 779 # children have disappeared and can exit. 780 self._old_nursery._check_nursery_closed() 781 782 783class NurseryManager: 784 """Nursery context manager. 785 786 Note we explicitly avoid @asynccontextmanager and @async_generator 787 since they add a lot of extraneous stack frames to exceptions, as 788 well as cause problematic behavior with handling of StopIteration 789 and StopAsyncIteration. 790 791 """ 792 793 @enable_ki_protection 794 async def __aenter__(self): 795 self._scope = CancelScope() 796 self._scope.__enter__() 797 self._nursery = Nursery._create(current_task(), self._scope) 798 return self._nursery 799 800 @enable_ki_protection 801 async def __aexit__(self, etype, exc, tb): 802 new_exc = await self._nursery._nested_child_finished(exc) 803 # Tracebacks show the 'raise' line below out of context, so let's give 804 # this variable a name that makes sense out of context. 805 combined_error_from_nursery = self._scope._close(new_exc) 806 if combined_error_from_nursery is None: 807 return True 808 elif combined_error_from_nursery is exc: 809 return False 810 else: 811 # Copied verbatim from MultiErrorCatcher. Python doesn't 812 # allow us to encapsulate this __context__ fixup. 813 old_context = combined_error_from_nursery.__context__ 814 try: 815 raise combined_error_from_nursery 816 finally: 817 _, value, _ = sys.exc_info() 818 assert value is combined_error_from_nursery 819 value.__context__ = old_context 820 821 def __enter__(self): 822 raise RuntimeError( 823 "use 'async with open_nursery(...)', not 'with open_nursery(...)'" 824 ) 825 826 def __exit__(self): # pragma: no cover 827 assert False, """Never called, but should be defined""" 828 829 830def open_nursery(): 831 """Returns an async context manager which must be used to create a 832 new `Nursery`. 833 834 It does not block on entry; on exit it blocks until all child tasks 835 have exited. 836 837 """ 838 return NurseryManager() 839 840 841class Nursery(metaclass=NoPublicConstructor): 842 """A context which may be used to spawn (or cancel) child tasks. 843 844 Not constructed directly, use `open_nursery` instead. 845 846 The nursery will remain open until all child tasks have completed, 847 or until it is cancelled, at which point it will cancel all its 848 remaining child tasks and close. 849 850 Nurseries ensure the absence of orphaned Tasks, since all running 851 tasks will belong to an open Nursery. 852 853 Attributes: 854 cancel_scope: 855 Creating a nursery also implicitly creates a cancellation scope, 856 which is exposed as the :attr:`cancel_scope` attribute. This is 857 used internally to implement the logic where if an error occurs 858 then ``__aexit__`` cancels all children, but you can use it for 859 other things, e.g. if you want to explicitly cancel all children 860 in response to some external event. 861 """ 862 863 def __init__(self, parent_task, cancel_scope): 864 self._parent_task = parent_task 865 parent_task._child_nurseries.append(self) 866 # the cancel status that children inherit - we take a snapshot, so it 867 # won't be affected by any changes in the parent. 868 self._cancel_status = parent_task._cancel_status 869 # the cancel scope that directly surrounds us; used for cancelling all 870 # children. 871 self.cancel_scope = cancel_scope 872 assert self.cancel_scope._cancel_status is self._cancel_status 873 self._children = set() 874 self._pending_excs = [] 875 # The "nested child" is how this code refers to the contents of the 876 # nursery's 'async with' block, which acts like a child Task in all 877 # the ways we can make it. 878 self._nested_child_running = True 879 self._parent_waiting_in_aexit = False 880 self._pending_starts = 0 881 self._closed = False 882 883 @property 884 def child_tasks(self): 885 """(`frozenset`): Contains all the child :class:`~trio.lowlevel.Task` 886 objects which are still running.""" 887 return frozenset(self._children) 888 889 @property 890 def parent_task(self): 891 "(`~trio.lowlevel.Task`): The Task that opened this nursery." 892 return self._parent_task 893 894 def _add_exc(self, exc): 895 self._pending_excs.append(exc) 896 self.cancel_scope.cancel() 897 898 def _check_nursery_closed(self): 899 if not any([self._nested_child_running, self._children, self._pending_starts]): 900 self._closed = True 901 if self._parent_waiting_in_aexit: 902 self._parent_waiting_in_aexit = False 903 GLOBAL_RUN_CONTEXT.runner.reschedule(self._parent_task) 904 905 def _child_finished(self, task, outcome): 906 self._children.remove(task) 907 if isinstance(outcome, Error): 908 self._add_exc(outcome.error) 909 self._check_nursery_closed() 910 911 async def _nested_child_finished(self, nested_child_exc): 912 """Returns MultiError instance if there are pending exceptions.""" 913 if nested_child_exc is not None: 914 self._add_exc(nested_child_exc) 915 self._nested_child_running = False 916 self._check_nursery_closed() 917 918 if not self._closed: 919 # If we get cancelled (or have an exception injected, like 920 # KeyboardInterrupt), then save that, but still wait until our 921 # children finish. 922 def aborted(raise_cancel): 923 self._add_exc(capture(raise_cancel).error) 924 return Abort.FAILED 925 926 self._parent_waiting_in_aexit = True 927 await wait_task_rescheduled(aborted) 928 else: 929 # Nothing to wait for, so just execute a checkpoint -- but we 930 # still need to mix any exception (e.g. from an external 931 # cancellation) in with the rest of our exceptions. 932 try: 933 await checkpoint() 934 except BaseException as exc: 935 self._add_exc(exc) 936 937 popped = self._parent_task._child_nurseries.pop() 938 assert popped is self 939 if self._pending_excs: 940 try: 941 return MultiError(self._pending_excs) 942 finally: 943 # avoid a garbage cycle 944 # (see test_nursery_cancel_doesnt_create_cyclic_garbage) 945 del self._pending_excs 946 947 def start_soon(self, async_fn, *args, name=None): 948 """Creates a child task, scheduling ``await async_fn(*args)``. 949 950 This and :meth:`start` are the two fundamental methods for 951 creating concurrent tasks in Trio. 952 953 Note that this is *not* an async function and you don't use await 954 when calling it. It sets up the new task, but then returns 955 immediately, *before* it has a chance to run. The new task won’t 956 actually get a chance to do anything until some later point when 957 you execute a checkpoint and the scheduler decides to run it. 958 If you want to run a function and immediately wait for its result, 959 then you don't need a nursery; just use ``await async_fn(*args)``. 960 If you want to wait for the task to initialize itself before 961 continuing, see :meth:`start`. 962 963 It's possible to pass a nursery object into another task, which 964 allows that task to start new child tasks in the first task's 965 nursery. 966 967 The child task inherits its parent nursery's cancel scopes. 968 969 Args: 970 async_fn: An async callable. 971 args: Positional arguments for ``async_fn``. If you want 972 to pass keyword arguments, use 973 :func:`functools.partial`. 974 name: The name for this task. Only used for 975 debugging/introspection 976 (e.g. ``repr(task_obj)``). If this isn't a string, 977 :meth:`start_soon` will try to make it one. A 978 common use case is if you're wrapping a function 979 before spawning a new task, you might pass the 980 original function as the ``name=`` to make 981 debugging easier. 982 983 Raises: 984 RuntimeError: If this nursery is no longer open 985 (i.e. its ``async with`` block has 986 exited). 987 """ 988 GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) 989 990 async def start(self, async_fn, *args, name=None): 991 r"""Creates and initializes a child task. 992 993 Like :meth:`start_soon`, but blocks until the new task has 994 finished initializing itself, and optionally returns some 995 information from it. 996 997 The ``async_fn`` must accept a ``task_status`` keyword argument, 998 and it must make sure that it (or someone) eventually calls 999 ``task_status.started()``. 1000 1001 The conventional way to define ``async_fn`` is like:: 1002 1003 async def async_fn(arg1, arg2, *, task_status=trio.TASK_STATUS_IGNORED): 1004 ... 1005 task_status.started() 1006 ... 1007 1008 :attr:`trio.TASK_STATUS_IGNORED` is a special global object with 1009 a do-nothing ``started`` method. This way your function supports 1010 being called either like ``await nursery.start(async_fn, arg1, 1011 arg2)`` or directly like ``await async_fn(arg1, arg2)``, and 1012 either way it can call ``task_status.started()`` without 1013 worrying about which mode it's in. Defining your function like 1014 this will make it obvious to readers that it supports being used 1015 in both modes. 1016 1017 Before the child calls ``task_status.started()``, it's 1018 effectively run underneath the call to :meth:`start`: if it 1019 raises an exception then that exception is reported by 1020 :meth:`start`, and does *not* propagate out of the nursery. If 1021 :meth:`start` is cancelled, then the child task is also 1022 cancelled. 1023 1024 When the child calls ``task_status.started()``, it's moved out 1025 from underneath :meth:`start` and into the given nursery. 1026 1027 If the child task passes a value to 1028 ``task_status.started(value)``, then :meth:`start` returns this 1029 value. Otherwise it returns ``None``. 1030 """ 1031 if self._closed: 1032 raise RuntimeError("Nursery is closed to new arrivals") 1033 try: 1034 self._pending_starts += 1 1035 async with open_nursery() as old_nursery: 1036 task_status = _TaskStatus(old_nursery, self) 1037 thunk = functools.partial(async_fn, task_status=task_status) 1038 task = GLOBAL_RUN_CONTEXT.runner.spawn_impl( 1039 thunk, args, old_nursery, name 1040 ) 1041 task._eventual_parent_nursery = self 1042 # Wait for either _TaskStatus.started or an exception to 1043 # cancel this nursery: 1044 # If we get here, then the child either got reparented or exited 1045 # normally. The complicated logic is all in _TaskStatus.started(). 1046 # (Any exceptions propagate directly out of the above.) 1047 if not task_status._called_started: 1048 raise RuntimeError("child exited without calling task_status.started()") 1049 return task_status._value 1050 finally: 1051 self._pending_starts -= 1 1052 self._check_nursery_closed() 1053 1054 def __del__(self): 1055 assert not self._children 1056 1057 1058################################################################ 1059# Task and friends 1060################################################################ 1061 1062 1063@attr.s(eq=False, hash=False, repr=False, slots=True) 1064class Task(metaclass=NoPublicConstructor): 1065 _parent_nursery = attr.ib() 1066 coro = attr.ib() 1067 _runner = attr.ib() 1068 name = attr.ib() 1069 # PEP 567 contextvars context 1070 context = attr.ib() 1071 _counter = attr.ib(init=False, factory=itertools.count().__next__) 1072 1073 # Invariant: 1074 # - for unscheduled tasks, _next_send_fn and _next_send are both None 1075 # - for scheduled tasks, _next_send_fn(_next_send) resumes the task; 1076 # usually _next_send_fn is self.coro.send and _next_send is an 1077 # Outcome. When recovering from a foreign await, _next_send_fn is 1078 # self.coro.throw and _next_send is an exception. _next_send_fn 1079 # will effectively be at the top of every task's call stack, so 1080 # it should be written in C if you don't want to pollute Trio 1081 # tracebacks with extraneous frames. 1082 # - for scheduled tasks, custom_sleep_data is None 1083 # Tasks start out unscheduled. 1084 _next_send_fn = attr.ib(default=None) 1085 _next_send = attr.ib(default=None) 1086 _abort_func = attr.ib(default=None) 1087 custom_sleep_data = attr.ib(default=None) 1088 1089 # For introspection and nursery.start() 1090 _child_nurseries = attr.ib(factory=list) 1091 _eventual_parent_nursery = attr.ib(default=None) 1092 1093 # these are counts of how many cancel/schedule points this task has 1094 # executed, for assert{_no,}_checkpoints 1095 # XX maybe these should be exposed as part of a statistics() method? 1096 _cancel_points = attr.ib(default=0) 1097 _schedule_points = attr.ib(default=0) 1098 1099 def __repr__(self): 1100 return "<Task {!r} at {:#x}>".format(self.name, id(self)) 1101 1102 @property 1103 def parent_nursery(self): 1104 """The nursery this task is inside (or None if this is the "init" 1105 task). 1106 1107 Example use case: drawing a visualization of the task tree in a 1108 debugger. 1109 1110 """ 1111 return self._parent_nursery 1112 1113 @property 1114 def eventual_parent_nursery(self): 1115 """The nursery this task will be inside after it calls 1116 ``task_status.started()``. 1117 1118 If this task has already called ``started()``, or if it was not 1119 spawned using `nursery.start() <trio.Nursery.start>`, then 1120 its `eventual_parent_nursery` is ``None``. 1121 1122 """ 1123 return self._eventual_parent_nursery 1124 1125 @property 1126 def child_nurseries(self): 1127 """The nurseries this task contains. 1128 1129 This is a list, with outer nurseries before inner nurseries. 1130 1131 """ 1132 return list(self._child_nurseries) 1133 1134 ################ 1135 # Cancellation 1136 ################ 1137 1138 # The CancelStatus object that is currently active for this task. 1139 # Don't change this directly; instead, use _activate_cancel_status(). 1140 _cancel_status = attr.ib(default=None, repr=False) 1141 1142 def _activate_cancel_status(self, cancel_status): 1143 if self._cancel_status is not None: 1144 self._cancel_status._tasks.remove(self) 1145 self._cancel_status = cancel_status 1146 if self._cancel_status is not None: 1147 self._cancel_status._tasks.add(self) 1148 if self._cancel_status.effectively_cancelled: 1149 self._attempt_delivery_of_any_pending_cancel() 1150 1151 def _attempt_abort(self, raise_cancel): 1152 # Either the abort succeeds, in which case we will reschedule the 1153 # task, or else it fails, in which case it will worry about 1154 # rescheduling itself (hopefully eventually calling reraise to raise 1155 # the given exception, but not necessarily). 1156 success = self._abort_func(raise_cancel) 1157 if type(success) is not Abort: 1158 raise TrioInternalError("abort function must return Abort enum") 1159 # We only attempt to abort once per blocking call, regardless of 1160 # whether we succeeded or failed. 1161 self._abort_func = None 1162 if success is Abort.SUCCEEDED: 1163 self._runner.reschedule(self, capture(raise_cancel)) 1164 1165 def _attempt_delivery_of_any_pending_cancel(self): 1166 if self._abort_func is None: 1167 return 1168 if not self._cancel_status.effectively_cancelled: 1169 return 1170 1171 def raise_cancel(): 1172 raise Cancelled._create() 1173 1174 self._attempt_abort(raise_cancel) 1175 1176 def _attempt_delivery_of_pending_ki(self): 1177 assert self._runner.ki_pending 1178 if self._abort_func is None: 1179 return 1180 1181 def raise_cancel(): 1182 self._runner.ki_pending = False 1183 raise KeyboardInterrupt 1184 1185 self._attempt_abort(raise_cancel) 1186 1187 1188################################################################ 1189# The central Runner object 1190################################################################ 1191 1192 1193class RunContext(threading.local): 1194 runner: "Runner" 1195 task: Task 1196 1197 1198GLOBAL_RUN_CONTEXT = RunContext() 1199 1200 1201@attr.s(frozen=True) 1202class _RunStatistics: 1203 tasks_living = attr.ib() 1204 tasks_runnable = attr.ib() 1205 seconds_to_next_deadline = attr.ib() 1206 io_statistics = attr.ib() 1207 run_sync_soon_queue_size = attr.ib() 1208 1209 1210# This holds all the state that gets trampolined back and forth between 1211# callbacks when we're running in guest mode. 1212# 1213# It has to be a separate object from Runner, and Runner *cannot* hold 1214# references to it (directly or indirectly)! 1215# 1216# The idea is that we want a chance to detect if our host loop quits and stops 1217# driving us forward. We detect that by unrolled_run_gen being garbage 1218# collected, and hitting its 'except GeneratorExit:' block. So this only 1219# happens if unrolled_run_gen is GCed. 1220# 1221# The Runner state is referenced from the global GLOBAL_RUN_CONTEXT. The only 1222# way it gets *un*referenced is by unrolled_run_gen completing, e.g. by being 1223# GCed. But if Runner has a direct or indirect reference to it, and the host 1224# loop has abandoned it, then this will never happen! 1225# 1226# So this object can reference Runner, but Runner can't reference it. The only 1227# references to it are the "in flight" callback chain on the host loop / 1228# worker thread. 1229@attr.s(eq=False, hash=False, slots=True) 1230class GuestState: 1231 runner = attr.ib() 1232 run_sync_soon_threadsafe = attr.ib() 1233 run_sync_soon_not_threadsafe = attr.ib() 1234 done_callback = attr.ib() 1235 unrolled_run_gen = attr.ib() 1236 _value_factory: Callable[[], Value] = lambda: Value(None) 1237 unrolled_run_next_send = attr.ib(factory=_value_factory, type=Outcome) 1238 1239 def guest_tick(self): 1240 try: 1241 timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) 1242 except StopIteration: 1243 self.done_callback(self.runner.main_task_outcome) 1244 return 1245 except TrioInternalError as exc: 1246 self.done_callback(Error(exc)) 1247 return 1248 1249 # Optimization: try to skip going into the thread if we can avoid it 1250 events_outcome = capture(self.runner.io_manager.get_events, 0) 1251 if timeout <= 0 or isinstance(events_outcome, Error) or events_outcome.value: 1252 # No need to go into the thread 1253 self.unrolled_run_next_send = events_outcome 1254 self.runner.guest_tick_scheduled = True 1255 self.run_sync_soon_not_threadsafe(self.guest_tick) 1256 else: 1257 # Need to go into the thread and call get_events() there 1258 self.runner.guest_tick_scheduled = False 1259 1260 def get_events(): 1261 return self.runner.io_manager.get_events(timeout) 1262 1263 def deliver(events_outcome): 1264 def in_main_thread(): 1265 self.unrolled_run_next_send = events_outcome 1266 self.runner.guest_tick_scheduled = True 1267 self.guest_tick() 1268 1269 self.run_sync_soon_threadsafe(in_main_thread) 1270 1271 start_thread_soon(get_events, deliver) 1272 1273 1274@attr.s(eq=False, hash=False, slots=True) 1275class Runner: 1276 clock = attr.ib() 1277 instruments: Instruments = attr.ib() 1278 io_manager = attr.ib() 1279 ki_manager = attr.ib() 1280 1281 # Run-local values, see _local.py 1282 _locals = attr.ib(factory=dict) 1283 1284 runq = attr.ib(factory=deque) 1285 tasks = attr.ib(factory=set) 1286 1287 deadlines = attr.ib(factory=Deadlines) 1288 1289 init_task = attr.ib(default=None) 1290 system_nursery = attr.ib(default=None) 1291 system_context = attr.ib(default=None) 1292 main_task = attr.ib(default=None) 1293 main_task_outcome = attr.ib(default=None) 1294 1295 entry_queue = attr.ib(factory=EntryQueue) 1296 trio_token = attr.ib(default=None) 1297 asyncgens = attr.ib(factory=AsyncGenerators) 1298 1299 # If everything goes idle for this long, we call clock._autojump() 1300 clock_autojump_threshold = attr.ib(default=inf) 1301 1302 # Guest mode stuff 1303 is_guest = attr.ib(default=False) 1304 guest_tick_scheduled = attr.ib(default=False) 1305 1306 def force_guest_tick_asap(self): 1307 if self.guest_tick_scheduled: 1308 return 1309 self.guest_tick_scheduled = True 1310 self.io_manager.force_wakeup() 1311 1312 def close(self): 1313 self.io_manager.close() 1314 self.entry_queue.close() 1315 self.asyncgens.close() 1316 if "after_run" in self.instruments: 1317 self.instruments.call("after_run") 1318 # This is where KI protection gets disabled, so we do it last 1319 self.ki_manager.close() 1320 1321 @_public 1322 def current_statistics(self): 1323 """Returns an object containing run-loop-level debugging information. 1324 1325 Currently the following fields are defined: 1326 1327 * ``tasks_living`` (int): The number of tasks that have been spawned 1328 and not yet exited. 1329 * ``tasks_runnable`` (int): The number of tasks that are currently 1330 queued on the run queue (as opposed to blocked waiting for something 1331 to happen). 1332 * ``seconds_to_next_deadline`` (float): The time until the next 1333 pending cancel scope deadline. May be negative if the deadline has 1334 expired but we haven't yet processed cancellations. May be 1335 :data:`~math.inf` if there are no pending deadlines. 1336 * ``run_sync_soon_queue_size`` (int): The number of 1337 unprocessed callbacks queued via 1338 :meth:`trio.lowlevel.TrioToken.run_sync_soon`. 1339 * ``io_statistics`` (object): Some statistics from Trio's I/O 1340 backend. This always has an attribute ``backend`` which is a string 1341 naming which operating-system-specific I/O backend is in use; the 1342 other attributes vary between backends. 1343 1344 """ 1345 seconds_to_next_deadline = self.deadlines.next_deadline() - self.current_time() 1346 return _RunStatistics( 1347 tasks_living=len(self.tasks), 1348 tasks_runnable=len(self.runq), 1349 seconds_to_next_deadline=seconds_to_next_deadline, 1350 io_statistics=self.io_manager.statistics(), 1351 run_sync_soon_queue_size=self.entry_queue.size(), 1352 ) 1353 1354 @_public 1355 def current_time(self): 1356 """Returns the current time according to Trio's internal clock. 1357 1358 Returns: 1359 float: The current time. 1360 1361 Raises: 1362 RuntimeError: if not inside a call to :func:`trio.run`. 1363 1364 """ 1365 return self.clock.current_time() 1366 1367 @_public 1368 def current_clock(self): 1369 """Returns the current :class:`~trio.abc.Clock`.""" 1370 return self.clock 1371 1372 @_public 1373 def current_root_task(self): 1374 """Returns the current root :class:`Task`. 1375 1376 This is the task that is the ultimate parent of all other tasks. 1377 1378 """ 1379 return self.init_task 1380 1381 ################ 1382 # Core task handling primitives 1383 ################ 1384 1385 @_public 1386 def reschedule(self, task, next_send=_NO_SEND): 1387 """Reschedule the given task with the given 1388 :class:`outcome.Outcome`. 1389 1390 See :func:`wait_task_rescheduled` for the gory details. 1391 1392 There must be exactly one call to :func:`reschedule` for every call to 1393 :func:`wait_task_rescheduled`. (And when counting, keep in mind that 1394 returning :data:`Abort.SUCCEEDED` from an abort callback is equivalent 1395 to calling :func:`reschedule` once.) 1396 1397 Args: 1398 task (trio.lowlevel.Task): the task to be rescheduled. Must be blocked 1399 in a call to :func:`wait_task_rescheduled`. 1400 next_send (outcome.Outcome): the value (or error) to return (or 1401 raise) from :func:`wait_task_rescheduled`. 1402 1403 """ 1404 if next_send is _NO_SEND: 1405 next_send = Value(None) 1406 1407 assert task._runner is self 1408 assert task._next_send_fn is None 1409 task._next_send_fn = task.coro.send 1410 task._next_send = next_send 1411 task._abort_func = None 1412 task.custom_sleep_data = None 1413 if not self.runq and self.is_guest: 1414 self.force_guest_tick_asap() 1415 self.runq.append(task) 1416 if "task_scheduled" in self.instruments: 1417 self.instruments.call("task_scheduled", task) 1418 1419 def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False): 1420 1421 ###### 1422 # Make sure the nursery is in working order 1423 ###### 1424 1425 # This sorta feels like it should be a method on nursery, except it 1426 # has to handle nursery=None for init. And it touches the internals of 1427 # all kinds of objects. 1428 if nursery is not None and nursery._closed: 1429 raise RuntimeError("Nursery is closed to new arrivals") 1430 if nursery is None: 1431 assert self.init_task is None 1432 1433 ###### 1434 # Call the function and get the coroutine object, while giving helpful 1435 # errors for common mistakes. 1436 ###### 1437 coro = coroutine_or_error(async_fn, *args) 1438 1439 if name is None: 1440 name = async_fn 1441 if isinstance(name, functools.partial): 1442 name = name.func 1443 if not isinstance(name, str): 1444 try: 1445 name = "{}.{}".format(name.__module__, name.__qualname__) 1446 except AttributeError: 1447 name = repr(name) 1448 1449 if system_task: 1450 context = self.system_context.copy() 1451 else: 1452 context = copy_context() 1453 1454 if not hasattr(coro, "cr_frame"): 1455 # This async function is implemented in C or Cython 1456 async def python_wrapper(orig_coro): 1457 return await orig_coro 1458 1459 coro = python_wrapper(coro) 1460 coro.cr_frame.f_locals.setdefault(LOCALS_KEY_KI_PROTECTION_ENABLED, system_task) 1461 1462 ###### 1463 # Set up the Task object 1464 ###### 1465 task = Task._create( 1466 coro=coro, parent_nursery=nursery, runner=self, name=name, context=context 1467 ) 1468 1469 self.tasks.add(task) 1470 if nursery is not None: 1471 nursery._children.add(task) 1472 task._activate_cancel_status(nursery._cancel_status) 1473 1474 if "task_spawned" in self.instruments: 1475 self.instruments.call("task_spawned", task) 1476 # Special case: normally next_send should be an Outcome, but for the 1477 # very first send we have to send a literal unboxed None. 1478 self.reschedule(task, None) 1479 return task 1480 1481 def task_exited(self, task, outcome): 1482 if ( 1483 task._cancel_status is not None 1484 and task._cancel_status.abandoned_by_misnesting 1485 and task._cancel_status.parent is None 1486 ): 1487 # The cancel scope surrounding this task's nursery was closed 1488 # before the task exited. Force the task to exit with an error, 1489 # since the error might not have been caught elsewhere. See the 1490 # comments in CancelStatus.close(). 1491 try: 1492 # Raise this, rather than just constructing it, to get a 1493 # traceback frame included 1494 raise RuntimeError( 1495 "Cancel scope stack corrupted: cancel scope surrounding " 1496 "{!r} was closed before the task exited\n{}".format( 1497 task, MISNESTING_ADVICE 1498 ) 1499 ) 1500 except RuntimeError as new_exc: 1501 if isinstance(outcome, Error): 1502 new_exc.__context__ = outcome.error 1503 outcome = Error(new_exc) 1504 1505 task._activate_cancel_status(None) 1506 self.tasks.remove(task) 1507 if task is self.init_task: 1508 # If the init task crashed, then something is very wrong and we 1509 # let the error propagate. (It'll eventually be wrapped in a 1510 # TrioInternalError.) 1511 outcome.unwrap() 1512 # the init task should be the last task to exit. If not, then 1513 # something is very wrong. 1514 if self.tasks: # pragma: no cover 1515 raise TrioInternalError 1516 else: 1517 if task is self.main_task: 1518 self.main_task_outcome = outcome 1519 outcome = Value(None) 1520 task._parent_nursery._child_finished(task, outcome) 1521 1522 if "task_exited" in self.instruments: 1523 self.instruments.call("task_exited", task) 1524 1525 ################ 1526 # System tasks and init 1527 ################ 1528 1529 @_public 1530 def spawn_system_task(self, async_fn, *args, name=None): 1531 """Spawn a "system" task. 1532 1533 System tasks have a few differences from regular tasks: 1534 1535 * They don't need an explicit nursery; instead they go into the 1536 internal "system nursery". 1537 1538 * If a system task raises an exception, then it's converted into a 1539 :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. If you 1540 write a system task, you should be careful to make sure it doesn't 1541 crash. 1542 1543 * System tasks are automatically cancelled when the main task exits. 1544 1545 * By default, system tasks have :exc:`KeyboardInterrupt` protection 1546 *enabled*. If you want your task to be interruptible by control-C, 1547 then you need to use :func:`disable_ki_protection` explicitly (and 1548 come up with some plan for what to do with a 1549 :exc:`KeyboardInterrupt`, given that system tasks aren't allowed to 1550 raise exceptions). 1551 1552 * System tasks do not inherit context variables from their creator. 1553 1554 Towards the end of a call to :meth:`trio.run`, after the main 1555 task and all system tasks have exited, the system nursery 1556 becomes closed. At this point, new calls to 1557 :func:`spawn_system_task` will raise ``RuntimeError("Nursery 1558 is closed to new arrivals")`` instead of creating a system 1559 task. It's possible to encounter this state either in 1560 a ``finally`` block in an async generator, or in a callback 1561 passed to :meth:`TrioToken.run_sync_soon` at the right moment. 1562 1563 Args: 1564 async_fn: An async callable. 1565 args: Positional arguments for ``async_fn``. If you want to pass 1566 keyword arguments, use :func:`functools.partial`. 1567 name: The name for this task. Only used for debugging/introspection 1568 (e.g. ``repr(task_obj)``). If this isn't a string, 1569 :func:`spawn_system_task` will try to make it one. A common use 1570 case is if you're wrapping a function before spawning a new 1571 task, you might pass the original function as the ``name=`` to 1572 make debugging easier. 1573 1574 Returns: 1575 Task: the newly spawned task 1576 1577 """ 1578 return self.spawn_impl( 1579 async_fn, args, self.system_nursery, name, system_task=True 1580 ) 1581 1582 async def init(self, async_fn, args): 1583 # run_sync_soon task runs here: 1584 async with open_nursery() as run_sync_soon_nursery: 1585 # All other system tasks run here: 1586 async with open_nursery() as self.system_nursery: 1587 # Only the main task runs here: 1588 async with open_nursery() as main_task_nursery: 1589 try: 1590 self.main_task = self.spawn_impl( 1591 async_fn, args, main_task_nursery, None 1592 ) 1593 except BaseException as exc: 1594 self.main_task_outcome = Error(exc) 1595 return 1596 self.spawn_impl( 1597 self.entry_queue.task, 1598 (), 1599 run_sync_soon_nursery, 1600 "<TrioToken.run_sync_soon task>", 1601 system_task=True, 1602 ) 1603 1604 # Main task is done; start shutting down system tasks 1605 self.system_nursery.cancel_scope.cancel() 1606 1607 # System nursery is closed; finalize remaining async generators 1608 await self.asyncgens.finalize_remaining(self) 1609 1610 # There are no more asyncgens, which means no more user-provided 1611 # code except possibly run_sync_soon callbacks. It's finally safe 1612 # to stop the run_sync_soon task and exit run(). 1613 run_sync_soon_nursery.cancel_scope.cancel() 1614 1615 ################ 1616 # Outside context problems 1617 ################ 1618 1619 @_public 1620 def current_trio_token(self): 1621 """Retrieve the :class:`TrioToken` for the current call to 1622 :func:`trio.run`. 1623 1624 """ 1625 if self.trio_token is None: 1626 self.trio_token = TrioToken._create(self.entry_queue) 1627 return self.trio_token 1628 1629 ################ 1630 # KI handling 1631 ################ 1632 1633 ki_pending = attr.ib(default=False) 1634 1635 # deliver_ki is broke. Maybe move all the actual logic and state into 1636 # RunToken, and we'll only have one instance per runner? But then we can't 1637 # have a public constructor. Eh, but current_run_token() returning a 1638 # unique object per run feels pretty nice. Maybe let's just go for it. And 1639 # keep the class public so people can isinstance() it if they want. 1640 1641 # This gets called from signal context 1642 def deliver_ki(self): 1643 self.ki_pending = True 1644 try: 1645 self.entry_queue.run_sync_soon(self._deliver_ki_cb) 1646 except RunFinishedError: 1647 pass 1648 1649 def _deliver_ki_cb(self): 1650 if not self.ki_pending: 1651 return 1652 # Can't happen because main_task and run_sync_soon_task are created at 1653 # the same time -- so even if KI arrives before main_task is created, 1654 # we won't get here until afterwards. 1655 assert self.main_task is not None 1656 if self.main_task_outcome is not None: 1657 # We're already in the process of exiting -- leave ki_pending set 1658 # and we'll check it again on our way out of run(). 1659 return 1660 self.main_task._attempt_delivery_of_pending_ki() 1661 1662 ################ 1663 # Quiescing 1664 ################ 1665 1666 waiting_for_idle = attr.ib(factory=SortedDict) 1667 1668 @_public 1669 async def wait_all_tasks_blocked(self, cushion=0.0): 1670 """Block until there are no runnable tasks. 1671 1672 This is useful in testing code when you want to give other tasks a 1673 chance to "settle down". The calling task is blocked, and doesn't wake 1674 up until all other tasks are also blocked for at least ``cushion`` 1675 seconds. (Setting a non-zero ``cushion`` is intended to handle cases 1676 like two tasks talking to each other over a local socket, where we 1677 want to ignore the potential brief moment between a send and receive 1678 when all tasks are blocked.) 1679 1680 Note that ``cushion`` is measured in *real* time, not the Trio clock 1681 time. 1682 1683 If there are multiple tasks blocked in :func:`wait_all_tasks_blocked`, 1684 then the one with the shortest ``cushion`` is the one woken (and 1685 this task becoming unblocked resets the timers for the remaining 1686 tasks). If there are multiple tasks that have exactly the same 1687 ``cushion``, then all are woken. 1688 1689 You should also consider :class:`trio.testing.Sequencer`, which 1690 provides a more explicit way to control execution ordering within a 1691 test, and will often produce more readable tests. 1692 1693 Example: 1694 Here's an example of one way to test that Trio's locks are fair: we 1695 take the lock in the parent, start a child, wait for the child to be 1696 blocked waiting for the lock (!), and then check that we can't 1697 release and immediately re-acquire the lock:: 1698 1699 async def lock_taker(lock): 1700 await lock.acquire() 1701 lock.release() 1702 1703 async def test_lock_fairness(): 1704 lock = trio.Lock() 1705 await lock.acquire() 1706 async with trio.open_nursery() as nursery: 1707 nursery.start_soon(lock_taker, lock) 1708 # child hasn't run yet, we have the lock 1709 assert lock.locked() 1710 assert lock._owner is trio.lowlevel.current_task() 1711 await trio.testing.wait_all_tasks_blocked() 1712 # now the child has run and is blocked on lock.acquire(), we 1713 # still have the lock 1714 assert lock.locked() 1715 assert lock._owner is trio.lowlevel.current_task() 1716 lock.release() 1717 try: 1718 # The child has a prior claim, so we can't have it 1719 lock.acquire_nowait() 1720 except trio.WouldBlock: 1721 assert lock._owner is not trio.lowlevel.current_task() 1722 print("PASS") 1723 else: 1724 print("FAIL") 1725 1726 """ 1727 task = current_task() 1728 key = (cushion, id(task)) 1729 self.waiting_for_idle[key] = task 1730 1731 def abort(_): 1732 del self.waiting_for_idle[key] 1733 return Abort.SUCCEEDED 1734 1735 await wait_task_rescheduled(abort) 1736 1737 1738################################################################ 1739# run 1740################################################################ 1741# 1742# Trio's core task scheduler and coroutine runner is in 'unrolled_run'. It's 1743# called that because it has an unusual feature: it's actually a generator. 1744# Whenever it needs to fetch IO events from the OS, it yields, and waits for 1745# its caller to send the IO events back in. So the loop is "unrolled" into a 1746# sequence of generator send() calls. 1747# 1748# The reason for this unusual design is to support two different modes of 1749# operation, where the IO is handled differently. 1750# 1751# In normal mode using trio.run, the scheduler and IO run in the same thread: 1752# 1753# Main thread: 1754# 1755# +---------------------------+ 1756# | Run tasks | 1757# | (unrolled_run) | 1758# +---------------------------+ 1759# | Block waiting for I/O | 1760# | (io_manager.get_events) | 1761# +---------------------------+ 1762# | Run tasks | 1763# | (unrolled_run) | 1764# +---------------------------+ 1765# | Block waiting for I/O | 1766# | (io_manager.get_events) | 1767# +---------------------------+ 1768# : 1769# 1770# 1771# In guest mode using trio.lowlevel.start_guest_run, the scheduler runs on the 1772# main thread as a host loop callback, but blocking for IO gets pushed into a 1773# worker thread: 1774# 1775# Main thread executing host loop: Trio I/O thread: 1776# 1777# +---------------------------+ 1778# | Run Trio tasks | 1779# | (unrolled_run) | 1780# +---------------------------+ --------------+ 1781# v 1782# +---------------------------+ +----------------------------+ 1783# | Host loop does whatever | | Block waiting for Trio I/O | 1784# | it wants | | (io_manager.get_events) | 1785# +---------------------------+ +----------------------------+ 1786# | 1787# +---------------------------+ <-------------+ 1788# | Run Trio tasks | 1789# | (unrolled_run) | 1790# +---------------------------+ --------------+ 1791# v 1792# +---------------------------+ +----------------------------+ 1793# | Host loop does whatever | | Block waiting for Trio I/O | 1794# | it wants | | (io_manager.get_events) | 1795# +---------------------------+ +----------------------------+ 1796# : : 1797# 1798# Most of Trio's internals don't need to care about this difference. The main 1799# complication it creates is that in guest mode, we might need to wake up not 1800# just due to OS-reported IO events, but also because of code running on the 1801# host loop calling reschedule() or changing task deadlines. Search for 1802# 'is_guest' to see the special cases we need to handle this. 1803 1804 1805def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints): 1806 """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" 1807 # It wouldn't be *hard* to support nested calls to run(), but I can't 1808 # think of a single good reason for it, so let's be conservative for 1809 # now: 1810 if hasattr(GLOBAL_RUN_CONTEXT, "runner"): 1811 raise RuntimeError("Attempted to call run() from inside a run()") 1812 1813 if clock is None: 1814 clock = SystemClock() 1815 instruments = Instruments(instruments) 1816 io_manager = TheIOManager() 1817 system_context = copy_context() 1818 system_context.run(current_async_library_cvar.set, "trio") 1819 ki_manager = KIManager() 1820 1821 runner = Runner( 1822 clock=clock, 1823 instruments=instruments, 1824 io_manager=io_manager, 1825 system_context=system_context, 1826 ki_manager=ki_manager, 1827 ) 1828 runner.asyncgens.install_hooks(runner) 1829 1830 # This is where KI protection gets enabled, so we want to do it early - in 1831 # particular before we start modifying global state like GLOBAL_RUN_CONTEXT 1832 ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints) 1833 1834 GLOBAL_RUN_CONTEXT.runner = runner 1835 return runner 1836 1837 1838def run( 1839 async_fn, 1840 *args, 1841 clock=None, 1842 instruments=(), 1843 restrict_keyboard_interrupt_to_checkpoints=False, 1844): 1845 """Run a Trio-flavored async function, and return the result. 1846 1847 Calling:: 1848 1849 run(async_fn, *args) 1850 1851 is the equivalent of:: 1852 1853 await async_fn(*args) 1854 1855 except that :func:`run` can (and must) be called from a synchronous 1856 context. 1857 1858 This is Trio's main entry point. Almost every other function in Trio 1859 requires that you be inside a call to :func:`run`. 1860 1861 Args: 1862 async_fn: An async function. 1863 1864 args: Positional arguments to be passed to *async_fn*. If you need to 1865 pass keyword arguments, then use :func:`functools.partial`. 1866 1867 clock: ``None`` to use the default system-specific monotonic clock; 1868 otherwise, an object implementing the :class:`trio.abc.Clock` 1869 interface, like (for example) a :class:`trio.testing.MockClock` 1870 instance. 1871 1872 instruments (list of :class:`trio.abc.Instrument` objects): Any 1873 instrumentation you want to apply to this run. This can also be 1874 modified during the run; see :ref:`instrumentation`. 1875 1876 restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the 1877 user hits control-C while :func:`run` is running? If this argument 1878 is False (the default), then you get the standard Python behavior: a 1879 :exc:`KeyboardInterrupt` exception will immediately interrupt 1880 whatever task is running (or if no task is running, then Trio will 1881 wake up a task to be interrupted). Alternatively, if you set this 1882 argument to True, then :exc:`KeyboardInterrupt` delivery will be 1883 delayed: it will be *only* be raised at :ref:`checkpoints 1884 <checkpoints>`, like a :exc:`Cancelled` exception. 1885 1886 The default behavior is nice because it means that even if you 1887 accidentally write an infinite loop that never executes any 1888 checkpoints, then you can still break out of it using control-C. 1889 The alternative behavior is nice if you're paranoid about a 1890 :exc:`KeyboardInterrupt` at just the wrong place leaving your 1891 program in an inconsistent state, because it means that you only 1892 have to worry about :exc:`KeyboardInterrupt` at the exact same 1893 places where you already have to worry about :exc:`Cancelled`. 1894 1895 This setting has no effect if your program has registered a custom 1896 SIGINT handler, or if :func:`run` is called from anywhere but the 1897 main thread (this is a Python limitation), or if you use 1898 :func:`open_signal_receiver` to catch SIGINT. 1899 1900 Returns: 1901 Whatever ``async_fn`` returns. 1902 1903 Raises: 1904 TrioInternalError: if an unexpected error is encountered inside Trio's 1905 internal machinery. This is a bug and you should `let us know 1906 <https://github.com/python-trio/trio/issues>`__. 1907 1908 Anything else: if ``async_fn`` raises an exception, then :func:`run` 1909 propagates it. 1910 1911 """ 1912 1913 __tracebackhide__ = True 1914 1915 runner = setup_runner( 1916 clock, instruments, restrict_keyboard_interrupt_to_checkpoints 1917 ) 1918 1919 gen = unrolled_run(runner, async_fn, args) 1920 next_send = None 1921 while True: 1922 try: 1923 timeout = gen.send(next_send) 1924 except StopIteration: 1925 break 1926 next_send = runner.io_manager.get_events(timeout) 1927 # Inlined copy of runner.main_task_outcome.unwrap() to avoid 1928 # cluttering every single Trio traceback with an extra frame. 1929 if isinstance(runner.main_task_outcome, Value): 1930 return runner.main_task_outcome.value 1931 else: 1932 raise runner.main_task_outcome.error 1933 1934 1935def start_guest_run( 1936 async_fn, 1937 *args, 1938 run_sync_soon_threadsafe, 1939 done_callback, 1940 run_sync_soon_not_threadsafe=None, 1941 host_uses_signal_set_wakeup_fd=False, 1942 clock=None, 1943 instruments=(), 1944 restrict_keyboard_interrupt_to_checkpoints=False, 1945): 1946 """Start a "guest" run of Trio on top of some other "host" event loop. 1947 1948 Each host loop can only have one guest run at a time. 1949 1950 You should always let the Trio run finish before stopping the host loop; 1951 if not, it may leave Trio's internal data structures in an inconsistent 1952 state. You might be able to get away with it if you immediately exit the 1953 program, but it's safest not to go there in the first place. 1954 1955 Generally, the best way to do this is wrap this in a function that starts 1956 the host loop and then immediately starts the guest run, and then shuts 1957 down the host when the guest run completes. 1958 1959 Args: 1960 1961 run_sync_soon_threadsafe: An arbitrary callable, which will be passed a 1962 function as its sole argument:: 1963 1964 def my_run_sync_soon_threadsafe(fn): 1965 ... 1966 1967 This callable should schedule ``fn()`` to be run by the host on its 1968 next pass through its loop. **Must support being called from 1969 arbitrary threads.** 1970 1971 done_callback: An arbitrary callable:: 1972 1973 def my_done_callback(run_outcome): 1974 ... 1975 1976 When the Trio run has finished, Trio will invoke this callback to let 1977 you know. The argument is an `outcome.Outcome`, reporting what would 1978 have been returned or raised by `trio.run`. This function can do 1979 anything you want, but commonly you'll want it to shut down the 1980 host loop, unwrap the outcome, etc. 1981 1982 run_sync_soon_not_threadsafe: Like ``run_sync_soon_threadsafe``, but 1983 will only be called from inside the host loop's main thread. 1984 Optional, but if your host loop allows you to implement this more 1985 efficiently than ``run_sync_soon_threadsafe`` then passing it will 1986 make things a bit faster. 1987 1988 host_uses_signal_set_wakeup_fd (bool): Pass `True` if your host loop 1989 uses `signal.set_wakeup_fd`, and `False` otherwise. For more details, 1990 see :ref:`guest-run-implementation`. 1991 1992 For the meaning of other arguments, see `trio.run`. 1993 1994 """ 1995 runner = setup_runner( 1996 clock, instruments, restrict_keyboard_interrupt_to_checkpoints 1997 ) 1998 runner.is_guest = True 1999 runner.guest_tick_scheduled = True 2000 2001 if run_sync_soon_not_threadsafe is None: 2002 run_sync_soon_not_threadsafe = run_sync_soon_threadsafe 2003 2004 guest_state = GuestState( 2005 runner=runner, 2006 run_sync_soon_threadsafe=run_sync_soon_threadsafe, 2007 run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, 2008 done_callback=done_callback, 2009 unrolled_run_gen=unrolled_run( 2010 runner, 2011 async_fn, 2012 args, 2013 host_uses_signal_set_wakeup_fd=host_uses_signal_set_wakeup_fd, 2014 ), 2015 ) 2016 run_sync_soon_not_threadsafe(guest_state.guest_tick) 2017 2018 2019# 24 hours is arbitrary, but it avoids issues like people setting timeouts of 2020# 10**20 and then getting integer overflows in the underlying system calls. 2021_MAX_TIMEOUT = 24 * 60 * 60 2022 2023 2024# Weird quirk: this is written as a generator in order to support "guest 2025# mode", where our core event loop gets unrolled into a series of callbacks on 2026# the host loop. If you're doing a regular trio.run then this gets run 2027# straight through. 2028def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): 2029 locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True 2030 __tracebackhide__ = True 2031 2032 try: 2033 if not host_uses_signal_set_wakeup_fd: 2034 runner.entry_queue.wakeup.wakeup_on_signals() 2035 2036 if "before_run" in runner.instruments: 2037 runner.instruments.call("before_run") 2038 runner.clock.start_clock() 2039 runner.init_task = runner.spawn_impl( 2040 runner.init, (async_fn, args), None, "<init>", system_task=True 2041 ) 2042 2043 # You know how people talk about "event loops"? This 'while' loop right 2044 # here is our event loop: 2045 while runner.tasks: 2046 if runner.runq: 2047 timeout = 0 2048 else: 2049 deadline = runner.deadlines.next_deadline() 2050 timeout = runner.clock.deadline_to_sleep_time(deadline) 2051 timeout = min(max(0, timeout), _MAX_TIMEOUT) 2052 2053 idle_primed = None 2054 if runner.waiting_for_idle: 2055 cushion, _ = runner.waiting_for_idle.keys()[0] 2056 if cushion < timeout: 2057 timeout = cushion 2058 idle_primed = IdlePrimedTypes.WAITING_FOR_IDLE 2059 # We use 'elif' here because if there are tasks in 2060 # wait_all_tasks_blocked, then those tasks will wake up without 2061 # jumping the clock, so we don't need to autojump. 2062 elif runner.clock_autojump_threshold < timeout: 2063 timeout = runner.clock_autojump_threshold 2064 idle_primed = IdlePrimedTypes.AUTOJUMP_CLOCK 2065 2066 if "before_io_wait" in runner.instruments: 2067 runner.instruments.call("before_io_wait", timeout) 2068 2069 # Driver will call io_manager.get_events(timeout) and pass it back 2070 # in through the yield 2071 events = yield timeout 2072 runner.io_manager.process_events(events) 2073 2074 if "after_io_wait" in runner.instruments: 2075 runner.instruments.call("after_io_wait", timeout) 2076 2077 # Process cancellations due to deadline expiry 2078 now = runner.clock.current_time() 2079 if runner.deadlines.expire(now): 2080 idle_primed = None 2081 2082 # idle_primed != None means: if the IO wait hit the timeout, and 2083 # still nothing is happening, then we should start waking up 2084 # wait_all_tasks_blocked tasks or autojump the clock. But there 2085 # are some subtleties in defining "nothing is happening". 2086 # 2087 # 'not runner.runq' means that no tasks are currently runnable. 2088 # 'not events' means that the last IO wait call hit its full 2089 # timeout. These are very similar, and if idle_primed != None and 2090 # we're running in regular mode then they always go together. But, 2091 # in *guest* mode, they can happen independently, even when 2092 # idle_primed=True: 2093 # 2094 # - runner.runq=empty and events=True: the host loop adjusted a 2095 # deadline and that forced an IO wakeup before the timeout expired, 2096 # even though no actual tasks were scheduled. 2097 # 2098 # - runner.runq=nonempty and events=False: the IO wait hit its 2099 # timeout, but then some code in the host thread rescheduled a task 2100 # before we got here. 2101 # 2102 # So we need to check both. 2103 if idle_primed is not None and not runner.runq and not events: 2104 if idle_primed is IdlePrimedTypes.WAITING_FOR_IDLE: 2105 while runner.waiting_for_idle: 2106 key, task = runner.waiting_for_idle.peekitem(0) 2107 if key[0] == cushion: 2108 del runner.waiting_for_idle[key] 2109 runner.reschedule(task) 2110 else: 2111 break 2112 else: 2113 assert idle_primed is IdlePrimedTypes.AUTOJUMP_CLOCK 2114 runner.clock._autojump() 2115 2116 # Process all runnable tasks, but only the ones that are already 2117 # runnable now. Anything that becomes runnable during this cycle 2118 # needs to wait until the next pass. This avoids various 2119 # starvation issues by ensuring that there's never an unbounded 2120 # delay between successive checks for I/O. 2121 # 2122 # Also, we randomize the order of each batch to avoid assumptions 2123 # about scheduling order sneaking in. In the long run, I suspect 2124 # we'll either (a) use strict FIFO ordering and document that for 2125 # predictability/determinism, or (b) implement a more 2126 # sophisticated scheduler (e.g. some variant of fair queueing), 2127 # for better behavior under load. For now, this is the worst of 2128 # both worlds - but it keeps our options open. (If we do decide to 2129 # go all in on deterministic scheduling, then there are other 2130 # things that will probably need to change too, like the deadlines 2131 # tie-breaker and the non-deterministic ordering of 2132 # task._notify_queues.) 2133 batch = list(runner.runq) 2134 runner.runq.clear() 2135 if _ALLOW_DETERMINISTIC_SCHEDULING: 2136 # We're running under Hypothesis, and pytest-trio has patched 2137 # this in to make the scheduler deterministic and avoid flaky 2138 # tests. It's not worth the (small) performance cost in normal 2139 # operation, since we'll shuffle the list and _r is only 2140 # seeded for tests. 2141 batch.sort(key=lambda t: t._counter) 2142 _r.shuffle(batch) 2143 else: 2144 # 50% chance of reversing the batch, this way each task 2145 # can appear before/after any other task. 2146 if _r.random() < 0.5: 2147 batch.reverse() 2148 while batch: 2149 task = batch.pop() 2150 GLOBAL_RUN_CONTEXT.task = task 2151 2152 if "before_task_step" in runner.instruments: 2153 runner.instruments.call("before_task_step", task) 2154 2155 next_send_fn = task._next_send_fn 2156 next_send = task._next_send 2157 task._next_send_fn = task._next_send = None 2158 final_outcome = None 2159 try: 2160 # We used to unwrap the Outcome object here and send/throw 2161 # its contents in directly, but it turns out that .throw() 2162 # is buggy, at least on CPython 3.6: 2163 # https://bugs.python.org/issue29587 2164 # https://bugs.python.org/issue29590 2165 # So now we send in the Outcome object and unwrap it on the 2166 # other side. 2167 msg = task.context.run(next_send_fn, next_send) 2168 except StopIteration as stop_iteration: 2169 final_outcome = Value(stop_iteration.value) 2170 except BaseException as task_exc: 2171 # Store for later, removing uninteresting top frames: 1 2172 # frame we always remove, because it's this function 2173 # catching it, and then in addition we remove however many 2174 # more Context.run adds. 2175 tb = task_exc.__traceback__.tb_next 2176 for _ in range(CONTEXT_RUN_TB_FRAMES): 2177 tb = tb.tb_next 2178 final_outcome = Error(task_exc.with_traceback(tb)) 2179 # Remove local refs so that e.g. cancelled coroutine locals 2180 # are not kept alive by this frame until another exception 2181 # comes along. 2182 del tb 2183 2184 if final_outcome is not None: 2185 # We can't call this directly inside the except: blocks 2186 # above, because then the exceptions end up attaching 2187 # themselves to other exceptions as __context__ in 2188 # unwanted ways. 2189 runner.task_exited(task, final_outcome) 2190 # final_outcome may contain a traceback ref. It's not as 2191 # crucial compared to the above, but this will allow more 2192 # prompt release of resources in coroutine locals. 2193 final_outcome = None 2194 else: 2195 task._schedule_points += 1 2196 if msg is CancelShieldedCheckpoint: 2197 runner.reschedule(task) 2198 elif type(msg) is WaitTaskRescheduled: 2199 task._cancel_points += 1 2200 task._abort_func = msg.abort_func 2201 # KI is "outside" all cancel scopes, so check for it 2202 # before checking for regular cancellation: 2203 if runner.ki_pending and task is runner.main_task: 2204 task._attempt_delivery_of_pending_ki() 2205 task._attempt_delivery_of_any_pending_cancel() 2206 elif type(msg) is PermanentlyDetachCoroutineObject: 2207 # Pretend the task just exited with the given outcome 2208 runner.task_exited(task, msg.final_outcome) 2209 else: 2210 exc = TypeError( 2211 "trio.run received unrecognized yield message {!r}. " 2212 "Are you trying to use a library written for some " 2213 "other framework like asyncio? That won't work " 2214 "without some kind of compatibility shim.".format(msg) 2215 ) 2216 # The foreign library probably doesn't adhere to our 2217 # protocol of unwrapping whatever outcome gets sent in. 2218 # Instead, we'll arrange to throw `exc` in directly, 2219 # which works for at least asyncio and curio. 2220 runner.reschedule(task, exc) 2221 task._next_send_fn = task.coro.throw 2222 # prevent long-lived reference 2223 # TODO: develop test for this deletion 2224 del msg 2225 2226 if "after_task_step" in runner.instruments: 2227 runner.instruments.call("after_task_step", task) 2228 del GLOBAL_RUN_CONTEXT.task 2229 # prevent long-lived references 2230 # TODO: develop test for these deletions 2231 del task, next_send, next_send_fn 2232 2233 except GeneratorExit: 2234 # The run-loop generator has been garbage collected without finishing 2235 warnings.warn( 2236 RuntimeWarning( 2237 "Trio guest run got abandoned without properly finishing... " 2238 "weird stuff might happen" 2239 ) 2240 ) 2241 except TrioInternalError: 2242 raise 2243 except BaseException as exc: 2244 raise TrioInternalError("internal error in Trio - please file a bug!") from exc 2245 finally: 2246 GLOBAL_RUN_CONTEXT.__dict__.clear() 2247 runner.close() 2248 # Have to do this after runner.close() has disabled KI protection, 2249 # because otherwise there's a race where ki_pending could get set 2250 # after we check it. 2251 if runner.ki_pending: 2252 ki = KeyboardInterrupt() 2253 if isinstance(runner.main_task_outcome, Error): 2254 ki.__context__ = runner.main_task_outcome.error 2255 runner.main_task_outcome = Error(ki) 2256 2257 2258################################################################ 2259# Other public API functions 2260################################################################ 2261 2262 2263class _TaskStatusIgnored: 2264 def __repr__(self): 2265 return "TASK_STATUS_IGNORED" 2266 2267 def started(self, value=None): 2268 pass 2269 2270 2271TASK_STATUS_IGNORED = _TaskStatusIgnored() 2272 2273 2274def current_task(): 2275 """Return the :class:`Task` object representing the current task. 2276 2277 Returns: 2278 Task: the :class:`Task` that called :func:`current_task`. 2279 2280 """ 2281 2282 try: 2283 return GLOBAL_RUN_CONTEXT.task 2284 except AttributeError: 2285 raise RuntimeError("must be called from async context") from None 2286 2287 2288def current_effective_deadline(): 2289 """Returns the current effective deadline for the current task. 2290 2291 This function examines all the cancellation scopes that are currently in 2292 effect (taking into account shielding), and returns the deadline that will 2293 expire first. 2294 2295 One example of where this might be is useful is if your code is trying to 2296 decide whether to begin an expensive operation like an RPC call, but wants 2297 to skip it if it knows that it can't possibly complete in the available 2298 time. Another example would be if you're using a protocol like gRPC that 2299 `propagates timeout information to the remote peer 2300 <http://www.grpc.io/docs/guides/concepts.html#deadlines>`__; this function 2301 gives a way to fetch that information so you can send it along. 2302 2303 If this is called in a context where a cancellation is currently active 2304 (i.e., a blocking call will immediately raise :exc:`Cancelled`), then 2305 returned deadline is ``-inf``. If it is called in a context where no 2306 scopes have a deadline set, it returns ``inf``. 2307 2308 Returns: 2309 float: the effective deadline, as an absolute time. 2310 2311 """ 2312 return current_task()._cancel_status.effective_deadline() 2313 2314 2315async def checkpoint(): 2316 """A pure :ref:`checkpoint <checkpoints>`. 2317 2318 This checks for cancellation and allows other tasks to be scheduled, 2319 without otherwise blocking. 2320 2321 Note that the scheduler has the option of ignoring this and continuing to 2322 run the current task if it decides this is appropriate (e.g. for increased 2323 efficiency). 2324 2325 Equivalent to ``await trio.sleep(0)`` (which is implemented by calling 2326 :func:`checkpoint`.) 2327 2328 """ 2329 # The scheduler is what checks timeouts and converts them into 2330 # cancellations. So by doing the schedule point first, we ensure that the 2331 # cancel point has the most up-to-date info. 2332 await cancel_shielded_checkpoint() 2333 task = current_task() 2334 task._cancel_points += 1 2335 if task._cancel_status.effectively_cancelled or ( 2336 task is task._runner.main_task and task._runner.ki_pending 2337 ): 2338 with CancelScope(deadline=-inf): 2339 await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED) 2340 2341 2342async def checkpoint_if_cancelled(): 2343 """Issue a :ref:`checkpoint <checkpoints>` if the calling context has been 2344 cancelled. 2345 2346 Equivalent to (but potentially more efficient than):: 2347 2348 if trio.current_deadline() == -inf: 2349 await trio.lowlevel.checkpoint() 2350 2351 This is either a no-op, or else it allow other tasks to be scheduled and 2352 then raises :exc:`trio.Cancelled`. 2353 2354 Typically used together with :func:`cancel_shielded_checkpoint`. 2355 2356 """ 2357 task = current_task() 2358 if task._cancel_status.effectively_cancelled or ( 2359 task is task._runner.main_task and task._runner.ki_pending 2360 ): 2361 await _core.checkpoint() 2362 assert False # pragma: no cover 2363 task._cancel_points += 1 2364 2365 2366if sys.platform == "win32": 2367 from ._io_windows import WindowsIOManager as TheIOManager 2368 from ._generated_io_windows import * 2369elif sys.platform == "linux" or (not TYPE_CHECKING and hasattr(select, "epoll")): 2370 from ._io_epoll import EpollIOManager as TheIOManager 2371 from ._generated_io_epoll import * 2372elif TYPE_CHECKING or hasattr(select, "kqueue"): 2373 from ._io_kqueue import KqueueIOManager as TheIOManager 2374 from ._generated_io_kqueue import * 2375else: # pragma: no cover 2376 raise NotImplementedError("unsupported platform") 2377 2378from ._generated_run import * 2379from ._generated_instrumentation import * 2380