1# coding: utf-8
2
3import functools
4import itertools
5import logging
6import os
7import random
8import select
9import sys
10import threading
11from collections import deque
12import collections.abc
13from contextlib import contextmanager
14import warnings
15import weakref
16import enum
17
18from contextvars import copy_context
19from math import inf
20from time import perf_counter
21from typing import Callable, TYPE_CHECKING
22
23from sniffio import current_async_library_cvar
24
25import attr
26from heapq import heapify, heappop, heappush
27from sortedcontainers import SortedDict
28from outcome import Error, Outcome, Value, capture
29
30from ._entry_queue import EntryQueue, TrioToken
31from ._exceptions import TrioInternalError, RunFinishedError, Cancelled
32from ._ki import (
33    LOCALS_KEY_KI_PROTECTION_ENABLED,
34    KIManager,
35    enable_ki_protection,
36)
37from ._multierror import MultiError
38from ._traps import (
39    Abort,
40    wait_task_rescheduled,
41    cancel_shielded_checkpoint,
42    CancelShieldedCheckpoint,
43    PermanentlyDetachCoroutineObject,
44    WaitTaskRescheduled,
45)
46from ._asyncgens import AsyncGenerators
47from ._thread_cache import start_thread_soon
48from ._instrumentation import Instruments
49from .. import _core
50from .._deprecate import warn_deprecated
51from .._util import Final, NoPublicConstructor, coroutine_or_error
52
53DEADLINE_HEAP_MIN_PRUNE_THRESHOLD = 1000
54
55_NO_SEND = object()
56
57
58# Decorator to mark methods public. This does nothing by itself, but
59# trio/_tools/gen_exports.py looks for it.
60def _public(fn):
61    return fn
62
63
64# When running under Hypothesis, we want examples to be reproducible and
65# shrinkable.  pytest-trio's Hypothesis integration monkeypatches this
66# variable to True, and registers the Random instance _r for Hypothesis
67# to manage for each test case, which together should make Trio's task
68# scheduling loop deterministic.  We have a test for that, of course.
69_ALLOW_DETERMINISTIC_SCHEDULING = False
70_r = random.Random()
71
72
73# On 3.7+, Context.run() is implemented in C and doesn't show up in
74# tracebacks. On 3.6, we use the contextvars backport, which is
75# currently implemented in Python and adds 1 frame to tracebacks. So this
76# function is a super-overkill version of "0 if sys.version_info >= (3, 7)
77# else 1". But if Context.run ever changes, we'll be ready!
78#
79# This can all be removed once we drop support for 3.6.
80def _count_context_run_tb_frames():
81    def function_with_unique_name_xyzzy():
82        1 / 0
83
84    ctx = copy_context()
85    try:
86        ctx.run(function_with_unique_name_xyzzy)
87    except ZeroDivisionError as exc:
88        tb = exc.__traceback__
89        # Skip the frame where we caught it
90        tb = tb.tb_next
91        count = 0
92        while tb.tb_frame.f_code.co_name != "function_with_unique_name_xyzzy":
93            tb = tb.tb_next
94            count += 1
95        return count
96
97
98CONTEXT_RUN_TB_FRAMES = _count_context_run_tb_frames()
99
100
101@attr.s(frozen=True, slots=True)
102class SystemClock:
103    # Add a large random offset to our clock to ensure that if people
104    # accidentally call time.perf_counter() directly or start comparing clocks
105    # between different runs, then they'll notice the bug quickly:
106    offset = attr.ib(factory=lambda: _r.uniform(10000, 200000))
107
108    def start_clock(self):
109        pass
110
111    # In cPython 3, on every platform except Windows, perf_counter is
112    # exactly the same as time.monotonic; and on Windows, it uses
113    # QueryPerformanceCounter instead of GetTickCount64.
114    def current_time(self):
115        return self.offset + perf_counter()
116
117    def deadline_to_sleep_time(self, deadline):
118        return deadline - self.current_time()
119
120
121class IdlePrimedTypes(enum.Enum):
122    WAITING_FOR_IDLE = 1
123    AUTOJUMP_CLOCK = 2
124
125
126################################################################
127# CancelScope and friends
128################################################################
129
130
131@attr.s(eq=False, slots=True)
132class Deadlines:
133    """A container of deadlined cancel scopes.
134
135    Only contains scopes with non-infinite deadlines that are currently
136    attached to at least one task.
137
138    """
139
140    # Heap of (deadline, id(CancelScope), CancelScope)
141    _heap = attr.ib(factory=list)
142    # Count of active deadlines (those that haven't been changed)
143    _active = attr.ib(default=0)
144
145    def add(self, deadline, cancel_scope):
146        heappush(self._heap, (deadline, id(cancel_scope), cancel_scope))
147        self._active += 1
148
149    def remove(self, deadline, cancel_scope):
150        self._active -= 1
151
152    def next_deadline(self):
153        while self._heap:
154            deadline, _, cancel_scope = self._heap[0]
155            if deadline == cancel_scope._registered_deadline:
156                return deadline
157            else:
158                # This entry is stale; discard it and try again
159                heappop(self._heap)
160        return inf
161
162    def _prune(self):
163        # In principle, it's possible for a cancel scope to toggle back and
164        # forth repeatedly between the same two deadlines, and end up with
165        # lots of stale entries that *look* like they're still active, because
166        # their deadline is correct, but in fact are redundant. So when
167        # pruning we have to eliminate entries with the wrong deadline, *and*
168        # eliminate duplicates.
169        seen = set()
170        pruned_heap = []
171        for deadline, tiebreaker, cancel_scope in self._heap:
172            if deadline == cancel_scope._registered_deadline:
173                if cancel_scope in seen:
174                    continue
175                seen.add(cancel_scope)
176                pruned_heap.append((deadline, tiebreaker, cancel_scope))
177        # See test_cancel_scope_deadline_duplicates for a test that exercises
178        # this assert:
179        assert len(pruned_heap) == self._active
180        heapify(pruned_heap)
181        self._heap = pruned_heap
182
183    def expire(self, now):
184        did_something = False
185        while self._heap and self._heap[0][0] <= now:
186            deadline, _, cancel_scope = heappop(self._heap)
187            if deadline == cancel_scope._registered_deadline:
188                did_something = True
189                # This implicitly calls self.remove(), so we don't need to
190                # decrement _active here
191                cancel_scope.cancel()
192        # If we've accumulated too many stale entries, then prune the heap to
193        # keep it under control. (We only do this occasionally in a batch, to
194        # keep the amortized cost down)
195        if len(self._heap) > self._active * 2 + DEADLINE_HEAP_MIN_PRUNE_THRESHOLD:
196            self._prune()
197        return did_something
198
199
200@attr.s(eq=False, slots=True)
201class CancelStatus:
202    """Tracks the cancellation status for a contiguous extent
203    of code that will become cancelled, or not, as a unit.
204
205    Each task has at all times a single "active" CancelStatus whose
206    cancellation state determines whether checkpoints executed in that
207    task raise Cancelled. Each 'with CancelScope(...)' context is
208    associated with a particular CancelStatus.  When a task enters
209    such a context, a CancelStatus is created which becomes the active
210    CancelStatus for that task; when the 'with' block is exited, the
211    active CancelStatus for that task goes back to whatever it was
212    before.
213
214    CancelStatus objects are arranged in a tree whose structure
215    mirrors the lexical nesting of the cancel scope contexts.  When a
216    CancelStatus becomes cancelled, it notifies all of its direct
217    children, who become cancelled in turn (and continue propagating
218    the cancellation down the tree) unless they are shielded. (There
219    will be at most one such child except in the case of a
220    CancelStatus that immediately encloses a nursery.) At the leaves
221    of this tree are the tasks themselves, which get woken up to deliver
222    an abort when their direct parent CancelStatus becomes cancelled.
223
224    You can think of CancelStatus as being responsible for the
225    "plumbing" of cancellations as oppposed to CancelScope which is
226    responsible for the origination of them.
227
228    """
229
230    # Our associated cancel scope. Can be any object with attributes
231    # `deadline`, `shield`, and `cancel_called`, but in current usage
232    # is always a CancelScope object. Must not be None.
233    _scope = attr.ib()
234
235    # True iff the tasks in self._tasks should receive cancellations
236    # when they checkpoint. Always True when scope.cancel_called is True;
237    # may also be True due to a cancellation propagated from our
238    # parent.  Unlike scope.cancel_called, this does not necessarily stay
239    # true once it becomes true. For example, we might become
240    # effectively cancelled due to the cancel scope two levels out
241    # becoming cancelled, but then the cancel scope one level out
242    # becomes shielded so we're not effectively cancelled anymore.
243    effectively_cancelled = attr.ib(default=False)
244
245    # The CancelStatus whose cancellations can propagate to us; we
246    # become effectively cancelled when they do, unless scope.shield
247    # is True.  May be None (for the outermost CancelStatus in a call
248    # to trio.run(), briefly during TaskStatus.started(), or during
249    # recovery from mis-nesting of cancel scopes).
250    _parent = attr.ib(default=None, repr=False)
251
252    # All of the CancelStatuses that have this CancelStatus as their parent.
253    _children = attr.ib(factory=set, init=False, repr=False)
254
255    # Tasks whose cancellation state is currently tied directly to
256    # the cancellation state of this CancelStatus object. Don't modify
257    # this directly; instead, use Task._activate_cancel_status().
258    # Invariant: all(task._cancel_status is self for task in self._tasks)
259    _tasks = attr.ib(factory=set, init=False, repr=False)
260
261    # Set to True on still-active cancel statuses that are children
262    # of a cancel status that's been closed. This is used to permit
263    # recovery from mis-nested cancel scopes (well, at least enough
264    # recovery to show a useful traceback).
265    abandoned_by_misnesting = attr.ib(default=False, init=False, repr=False)
266
267    def __attrs_post_init__(self):
268        if self._parent is not None:
269            self._parent._children.add(self)
270            self.recalculate()
271
272    # parent/children/tasks accessors are used by TaskStatus.started()
273
274    @property
275    def parent(self):
276        return self._parent
277
278    @parent.setter
279    def parent(self, parent):
280        if self._parent is not None:
281            self._parent._children.remove(self)
282        self._parent = parent
283        if self._parent is not None:
284            self._parent._children.add(self)
285            self.recalculate()
286
287    @property
288    def children(self):
289        return frozenset(self._children)
290
291    @property
292    def tasks(self):
293        return frozenset(self._tasks)
294
295    def encloses(self, other):
296        """Returns true if this cancel status is a direct or indirect
297        parent of cancel status *other*, or if *other* is *self*.
298        """
299        while other is not None:
300            if other is self:
301                return True
302            other = other.parent
303        return False
304
305    def close(self):
306        self.parent = None  # now we're not a child of self.parent anymore
307        if self._tasks or self._children:
308            # Cancel scopes weren't exited in opposite order of being
309            # entered. CancelScope._close() deals with raising an error
310            # if appropriate; our job is to leave things in a reasonable
311            # state for unwinding our dangling children. We choose to leave
312            # this part of the CancelStatus tree unlinked from everyone
313            # else, cancelled, and marked so that exiting a CancelScope
314            # within the abandoned subtree doesn't affect the active
315            # CancelStatus. Note that it's possible for us to get here
316            # without CancelScope._close() raising an error, if a
317            # nursery's cancel scope is closed within the nursery's
318            # nested child and no other cancel scopes are involved,
319            # but in that case task_exited() will deal with raising
320            # the error.
321            self._mark_abandoned()
322
323            # Since our CancelScope is about to forget about us, and we
324            # have no parent anymore, there's nothing left to call
325            # recalculate(). So, we can stay cancelled by setting
326            # effectively_cancelled and updating our children.
327            self.effectively_cancelled = True
328            for task in self._tasks:
329                task._attempt_delivery_of_any_pending_cancel()
330            for child in self._children:
331                child.recalculate()
332
333    @property
334    def parent_cancellation_is_visible_to_us(self):
335        return (
336            self._parent is not None
337            and not self._scope.shield
338            and self._parent.effectively_cancelled
339        )
340
341    def recalculate(self):
342        # This does a depth-first traversal over this and descendent cancel
343        # statuses, to ensure their state is up-to-date. It's basically a
344        # recursive algorithm, but we use an explicit stack to avoid any
345        # issues with stack overflow.
346        todo = [self]
347        while todo:
348            current = todo.pop()
349            new_state = (
350                current._scope.cancel_called
351                or current.parent_cancellation_is_visible_to_us
352            )
353            if new_state != current.effectively_cancelled:
354                current.effectively_cancelled = new_state
355                if new_state:
356                    for task in current._tasks:
357                        task._attempt_delivery_of_any_pending_cancel()
358                todo.extend(current._children)
359
360    def _mark_abandoned(self):
361        self.abandoned_by_misnesting = True
362        for child in self._children:
363            child._mark_abandoned()
364
365    def effective_deadline(self):
366        if self.effectively_cancelled:
367            return -inf
368        if self._parent is None or self._scope.shield:
369            return self._scope.deadline
370        return min(self._scope.deadline, self._parent.effective_deadline())
371
372
373MISNESTING_ADVICE = """
374This is probably a bug in your code, that has caused Trio's internal state to
375become corrupted. We'll do our best to recover, but from now on there are
376no guarantees.
377
378Typically this is caused by one of the following:
379  - yielding within a generator or async generator that's opened a cancel
380    scope or nursery (unless the generator is a @contextmanager or
381    @asynccontextmanager); see https://github.com/python-trio/trio/issues/638
382  - manually calling __enter__ or __exit__ on a trio.CancelScope, or
383    __aenter__ or __aexit__ on the object returned by trio.open_nursery();
384    doing so correctly is difficult and you should use @[async]contextmanager
385    instead, or maybe [Async]ExitStack
386  - using [Async]ExitStack to interleave the entries/exits of cancel scopes
387    and/or nurseries in a way that couldn't be achieved by some nesting of
388    'with' and 'async with' blocks
389  - using the low-level coroutine object protocol to execute some parts of
390    an async function in a different cancel scope/nursery context than
391    other parts
392If you don't believe you're doing any of these things, please file a bug:
393https://github.com/python-trio/trio/issues/new
394"""
395
396
397@attr.s(eq=False, repr=False, slots=True)
398class CancelScope(metaclass=Final):
399    """A *cancellation scope*: the link between a unit of cancellable
400    work and Trio's cancellation system.
401
402    A :class:`CancelScope` becomes associated with some cancellable work
403    when it is used as a context manager surrounding that work::
404
405        cancel_scope = trio.CancelScope()
406        ...
407        with cancel_scope:
408            await long_running_operation()
409
410    Inside the ``with`` block, a cancellation of ``cancel_scope`` (via
411    a call to its :meth:`cancel` method or via the expiry of its
412    :attr:`deadline`) will immediately interrupt the
413    ``long_running_operation()`` by raising :exc:`Cancelled` at its
414    next :ref:`checkpoint <checkpoints>`.
415
416    The context manager ``__enter__`` returns the :class:`CancelScope`
417    object itself, so you can also write ``with trio.CancelScope() as
418    cancel_scope:``.
419
420    If a cancel scope becomes cancelled before entering its ``with`` block,
421    the :exc:`Cancelled` exception will be raised at the first
422    checkpoint inside the ``with`` block. This allows a
423    :class:`CancelScope` to be created in one :ref:`task <tasks>` and
424    passed to another, so that the first task can later cancel some work
425    inside the second.
426
427    Cancel scopes are not reusable or reentrant; that is, each cancel
428    scope can be used for at most one ``with`` block.  (You'll get a
429    :exc:`RuntimeError` if you violate this rule.)
430
431    The :class:`CancelScope` constructor takes initial values for the
432    cancel scope's :attr:`deadline` and :attr:`shield` attributes; these
433    may be freely modified after construction, whether or not the scope
434    has been entered yet, and changes take immediate effect.
435    """
436
437    _cancel_status = attr.ib(default=None, init=False)
438    _has_been_entered = attr.ib(default=False, init=False)
439    _registered_deadline = attr.ib(default=inf, init=False)
440    _cancel_called = attr.ib(default=False, init=False)
441    cancelled_caught = attr.ib(default=False, init=False)
442
443    # Constructor arguments:
444    _deadline = attr.ib(default=inf, kw_only=True)
445    _shield = attr.ib(default=False, kw_only=True)
446
447    @enable_ki_protection
448    def __enter__(self):
449        task = _core.current_task()
450        if self._has_been_entered:
451            raise RuntimeError(
452                "Each CancelScope may only be used for a single 'with' block"
453            )
454        self._has_been_entered = True
455        if current_time() >= self._deadline:
456            self.cancel()
457        with self._might_change_registered_deadline():
458            self._cancel_status = CancelStatus(scope=self, parent=task._cancel_status)
459            task._activate_cancel_status(self._cancel_status)
460        return self
461
462    def _exc_filter(self, exc):
463        if isinstance(exc, Cancelled):
464            self.cancelled_caught = True
465            return None
466        return exc
467
468    def _close(self, exc):
469        if self._cancel_status is None:
470            new_exc = RuntimeError(
471                "Cancel scope stack corrupted: attempted to exit {!r} "
472                "which had already been exited".format(self)
473            )
474            new_exc.__context__ = exc
475            return new_exc
476        scope_task = current_task()
477        if scope_task._cancel_status is not self._cancel_status:
478            # Cancel scope mis-nesting: this cancel scope isn't the most
479            # recently opened by this task (that's still open). That is,
480            # our assumptions about context managers forming a stack
481            # have been violated. Try and make the best of it.
482            if self._cancel_status.abandoned_by_misnesting:
483                # We are an inner cancel scope that was still active when
484                # some outer scope was closed. The closure of that outer
485                # scope threw an error, so we don't need to throw another
486                # one; it would just confuse the traceback.
487                pass
488            elif not self._cancel_status.encloses(scope_task._cancel_status):
489                # This task isn't even indirectly contained within the
490                # cancel scope it's trying to close. Raise an error
491                # without changing any state.
492                new_exc = RuntimeError(
493                    "Cancel scope stack corrupted: attempted to exit {!r} "
494                    "from unrelated {!r}\n{}".format(
495                        self, scope_task, MISNESTING_ADVICE
496                    )
497                )
498                new_exc.__context__ = exc
499                return new_exc
500            else:
501                # Otherwise, there's some inner cancel scope(s) that
502                # we're abandoning by closing this outer one.
503                # CancelStatus.close() will take care of the plumbing;
504                # we just need to make sure we don't let the error
505                # pass silently.
506                new_exc = RuntimeError(
507                    "Cancel scope stack corrupted: attempted to exit {!r} "
508                    "in {!r} that's still within its child {!r}\n{}".format(
509                        self,
510                        scope_task,
511                        scope_task._cancel_status._scope,
512                        MISNESTING_ADVICE,
513                    )
514                )
515                new_exc.__context__ = exc
516                exc = new_exc
517                scope_task._activate_cancel_status(self._cancel_status.parent)
518        else:
519            scope_task._activate_cancel_status(self._cancel_status.parent)
520        if (
521            exc is not None
522            and self._cancel_status.effectively_cancelled
523            and not self._cancel_status.parent_cancellation_is_visible_to_us
524        ):
525            exc = MultiError.filter(self._exc_filter, exc)
526        self._cancel_status.close()
527        with self._might_change_registered_deadline():
528            self._cancel_status = None
529        return exc
530
531    @enable_ki_protection
532    def __exit__(self, etype, exc, tb):
533        # NB: NurseryManager calls _close() directly rather than __exit__(),
534        # so __exit__() must be just _close() plus this logic for adapting
535        # the exception-filtering result to the context manager API.
536
537        # Tracebacks show the 'raise' line below out of context, so let's give
538        # this variable a name that makes sense out of context.
539        remaining_error_after_cancel_scope = self._close(exc)
540        if remaining_error_after_cancel_scope is None:
541            return True
542        elif remaining_error_after_cancel_scope is exc:
543            return False
544        else:
545            # Copied verbatim from MultiErrorCatcher.  Python doesn't
546            # allow us to encapsulate this __context__ fixup.
547            old_context = remaining_error_after_cancel_scope.__context__
548            try:
549                raise remaining_error_after_cancel_scope
550            finally:
551                _, value, _ = sys.exc_info()
552                assert value is remaining_error_after_cancel_scope
553                value.__context__ = old_context
554
555    def __repr__(self):
556        if self._cancel_status is not None:
557            binding = "active"
558        elif self._has_been_entered:
559            binding = "exited"
560        else:
561            binding = "unbound"
562
563        if self._cancel_called:
564            state = ", cancelled"
565        elif self._deadline == inf:
566            state = ""
567        else:
568            try:
569                now = current_time()
570            except RuntimeError:  # must be called from async context
571                state = ""
572            else:
573                state = ", deadline is {:.2f} seconds {}".format(
574                    abs(self._deadline - now),
575                    "from now" if self._deadline >= now else "ago",
576                )
577
578        return "<trio.CancelScope at {:#x}, {}{}>".format(id(self), binding, state)
579
580    @contextmanager
581    @enable_ki_protection
582    def _might_change_registered_deadline(self):
583        try:
584            yield
585        finally:
586            old = self._registered_deadline
587            if self._cancel_status is None or self._cancel_called:
588                new = inf
589            else:
590                new = self._deadline
591            if old != new:
592                self._registered_deadline = new
593                runner = GLOBAL_RUN_CONTEXT.runner
594                if runner.is_guest:
595                    old_next_deadline = runner.deadlines.next_deadline()
596                if old != inf:
597                    runner.deadlines.remove(old, self)
598                if new != inf:
599                    runner.deadlines.add(new, self)
600                if runner.is_guest:
601                    new_next_deadline = runner.deadlines.next_deadline()
602                    if old_next_deadline != new_next_deadline:
603                        runner.force_guest_tick_asap()
604
605    @property
606    def deadline(self):
607        """Read-write, :class:`float`. An absolute time on the current
608        run's clock at which this scope will automatically become
609        cancelled. You can adjust the deadline by modifying this
610        attribute, e.g.::
611
612           # I need a little more time!
613           cancel_scope.deadline += 30
614
615        Note that for efficiency, the core run loop only checks for
616        expired deadlines every once in a while. This means that in
617        certain cases there may be a short delay between when the clock
618        says the deadline should have expired, and when checkpoints
619        start raising :exc:`~trio.Cancelled`. This is a very obscure
620        corner case that you're unlikely to notice, but we document it
621        for completeness. (If this *does* cause problems for you, of
622        course, then `we want to know!
623        <https://github.com/python-trio/trio/issues>`__)
624
625        Defaults to :data:`math.inf`, which means "no deadline", though
626        this can be overridden by the ``deadline=`` argument to
627        the :class:`~trio.CancelScope` constructor.
628        """
629        return self._deadline
630
631    @deadline.setter
632    def deadline(self, new_deadline):
633        with self._might_change_registered_deadline():
634            self._deadline = float(new_deadline)
635
636    @property
637    def shield(self):
638        """Read-write, :class:`bool`, default :data:`False`. So long as
639        this is set to :data:`True`, then the code inside this scope
640        will not receive :exc:`~trio.Cancelled` exceptions from scopes
641        that are outside this scope. They can still receive
642        :exc:`~trio.Cancelled` exceptions from (1) this scope, or (2)
643        scopes inside this scope. You can modify this attribute::
644
645           with trio.CancelScope() as cancel_scope:
646               cancel_scope.shield = True
647               # This cannot be interrupted by any means short of
648               # killing the process:
649               await sleep(10)
650
651               cancel_scope.shield = False
652               # Now this can be cancelled normally:
653               await sleep(10)
654
655        Defaults to :data:`False`, though this can be overridden by the
656        ``shield=`` argument to the :class:`~trio.CancelScope` constructor.
657        """
658        return self._shield
659
660    @shield.setter  # type: ignore  # "decorated property not supported"
661    @enable_ki_protection
662    def shield(self, new_value):
663        if not isinstance(new_value, bool):
664            raise TypeError("shield must be a bool")
665        self._shield = new_value
666        if self._cancel_status is not None:
667            self._cancel_status.recalculate()
668
669    @enable_ki_protection
670    def cancel(self):
671        """Cancels this scope immediately.
672
673        This method is idempotent, i.e., if the scope was already
674        cancelled then this method silently does nothing.
675        """
676        if self._cancel_called:
677            return
678        with self._might_change_registered_deadline():
679            self._cancel_called = True
680        if self._cancel_status is not None:
681            self._cancel_status.recalculate()
682
683    @property
684    def cancel_called(self):
685        """Readonly :class:`bool`. Records whether cancellation has been
686        requested for this scope, either by an explicit call to
687        :meth:`cancel` or by the deadline expiring.
688
689        This attribute being True does *not* necessarily mean that the
690        code within the scope has been, or will be, affected by the
691        cancellation. For example, if :meth:`cancel` was called after
692        the last checkpoint in the ``with`` block, when it's too late to
693        deliver a :exc:`~trio.Cancelled` exception, then this attribute
694        will still be True.
695
696        This attribute is mostly useful for debugging and introspection.
697        If you want to know whether or not a chunk of code was actually
698        cancelled, then :attr:`cancelled_caught` is usually more
699        appropriate.
700        """
701        if self._cancel_status is not None or not self._has_been_entered:
702            # Scope is active or not yet entered: make sure cancel_called
703            # is true if the deadline has passed. This shouldn't
704            # be able to actually change behavior, since we check for
705            # deadline expiry on scope entry and at every checkpoint,
706            # but it makes the value returned by cancel_called more
707            # closely match expectations.
708            if not self._cancel_called and current_time() >= self._deadline:
709                self.cancel()
710        return self._cancel_called
711
712
713################################################################
714# Nursery and friends
715################################################################
716
717
718# This code needs to be read alongside the code from Nursery.start to make
719# sense.
720@attr.s(eq=False, hash=False, repr=False)
721class _TaskStatus:
722    _old_nursery = attr.ib()
723    _new_nursery = attr.ib()
724    _called_started = attr.ib(default=False)
725    _value = attr.ib(default=None)
726
727    def __repr__(self):
728        return "<Task status object at {:#x}>".format(id(self))
729
730    def started(self, value=None):
731        if self._called_started:
732            raise RuntimeError("called 'started' twice on the same task status")
733        self._called_started = True
734        self._value = value
735
736        # If the old nursery is cancelled, then quietly quit now; the child
737        # will eventually exit on its own, and we don't want to risk moving
738        # children that might have propagating Cancelled exceptions into
739        # a place with no cancelled cancel scopes to catch them.
740        if self._old_nursery._cancel_status.effectively_cancelled:
741            return
742
743        # Can't be closed, b/c we checked in start() and then _pending_starts
744        # should keep it open.
745        assert not self._new_nursery._closed
746
747        # Move tasks from the old nursery to the new
748        tasks = self._old_nursery._children
749        self._old_nursery._children = set()
750        for task in tasks:
751            task._parent_nursery = self._new_nursery
752            task._eventual_parent_nursery = None
753            self._new_nursery._children.add(task)
754
755        # Move all children of the old nursery's cancel status object
756        # to be underneath the new nursery instead. This includes both
757        # tasks and child cancel status objects.
758        # NB: If the new nursery is cancelled, reparenting a cancel
759        # status to be underneath it can invoke an abort_fn, which might
760        # do something evil like cancel the old nursery. We thus break
761        # everything off from the old nursery before we start attaching
762        # anything to the new.
763        cancel_status_children = self._old_nursery._cancel_status.children
764        cancel_status_tasks = set(self._old_nursery._cancel_status.tasks)
765        cancel_status_tasks.discard(self._old_nursery._parent_task)
766        for cancel_status in cancel_status_children:
767            cancel_status.parent = None
768        for task in cancel_status_tasks:
769            task._activate_cancel_status(None)
770        for cancel_status in cancel_status_children:
771            cancel_status.parent = self._new_nursery._cancel_status
772        for task in cancel_status_tasks:
773            task._activate_cancel_status(self._new_nursery._cancel_status)
774
775        # That should have removed all the children from the old nursery
776        assert not self._old_nursery._children
777
778        # And finally, poke the old nursery so it notices that all its
779        # children have disappeared and can exit.
780        self._old_nursery._check_nursery_closed()
781
782
783class NurseryManager:
784    """Nursery context manager.
785
786    Note we explicitly avoid @asynccontextmanager and @async_generator
787    since they add a lot of extraneous stack frames to exceptions, as
788    well as cause problematic behavior with handling of StopIteration
789    and StopAsyncIteration.
790
791    """
792
793    @enable_ki_protection
794    async def __aenter__(self):
795        self._scope = CancelScope()
796        self._scope.__enter__()
797        self._nursery = Nursery._create(current_task(), self._scope)
798        return self._nursery
799
800    @enable_ki_protection
801    async def __aexit__(self, etype, exc, tb):
802        new_exc = await self._nursery._nested_child_finished(exc)
803        # Tracebacks show the 'raise' line below out of context, so let's give
804        # this variable a name that makes sense out of context.
805        combined_error_from_nursery = self._scope._close(new_exc)
806        if combined_error_from_nursery is None:
807            return True
808        elif combined_error_from_nursery is exc:
809            return False
810        else:
811            # Copied verbatim from MultiErrorCatcher.  Python doesn't
812            # allow us to encapsulate this __context__ fixup.
813            old_context = combined_error_from_nursery.__context__
814            try:
815                raise combined_error_from_nursery
816            finally:
817                _, value, _ = sys.exc_info()
818                assert value is combined_error_from_nursery
819                value.__context__ = old_context
820
821    def __enter__(self):
822        raise RuntimeError(
823            "use 'async with open_nursery(...)', not 'with open_nursery(...)'"
824        )
825
826    def __exit__(self):  # pragma: no cover
827        assert False, """Never called, but should be defined"""
828
829
830def open_nursery():
831    """Returns an async context manager which must be used to create a
832    new `Nursery`.
833
834    It does not block on entry; on exit it blocks until all child tasks
835    have exited.
836
837    """
838    return NurseryManager()
839
840
841class Nursery(metaclass=NoPublicConstructor):
842    """A context which may be used to spawn (or cancel) child tasks.
843
844    Not constructed directly, use `open_nursery` instead.
845
846    The nursery will remain open until all child tasks have completed,
847    or until it is cancelled, at which point it will cancel all its
848    remaining child tasks and close.
849
850    Nurseries ensure the absence of orphaned Tasks, since all running
851    tasks will belong to an open Nursery.
852
853    Attributes:
854        cancel_scope:
855            Creating a nursery also implicitly creates a cancellation scope,
856            which is exposed as the :attr:`cancel_scope` attribute. This is
857            used internally to implement the logic where if an error occurs
858            then ``__aexit__`` cancels all children, but you can use it for
859            other things, e.g. if you want to explicitly cancel all children
860            in response to some external event.
861    """
862
863    def __init__(self, parent_task, cancel_scope):
864        self._parent_task = parent_task
865        parent_task._child_nurseries.append(self)
866        # the cancel status that children inherit - we take a snapshot, so it
867        # won't be affected by any changes in the parent.
868        self._cancel_status = parent_task._cancel_status
869        # the cancel scope that directly surrounds us; used for cancelling all
870        # children.
871        self.cancel_scope = cancel_scope
872        assert self.cancel_scope._cancel_status is self._cancel_status
873        self._children = set()
874        self._pending_excs = []
875        # The "nested child" is how this code refers to the contents of the
876        # nursery's 'async with' block, which acts like a child Task in all
877        # the ways we can make it.
878        self._nested_child_running = True
879        self._parent_waiting_in_aexit = False
880        self._pending_starts = 0
881        self._closed = False
882
883    @property
884    def child_tasks(self):
885        """(`frozenset`): Contains all the child :class:`~trio.lowlevel.Task`
886        objects which are still running."""
887        return frozenset(self._children)
888
889    @property
890    def parent_task(self):
891        "(`~trio.lowlevel.Task`):  The Task that opened this nursery."
892        return self._parent_task
893
894    def _add_exc(self, exc):
895        self._pending_excs.append(exc)
896        self.cancel_scope.cancel()
897
898    def _check_nursery_closed(self):
899        if not any([self._nested_child_running, self._children, self._pending_starts]):
900            self._closed = True
901            if self._parent_waiting_in_aexit:
902                self._parent_waiting_in_aexit = False
903                GLOBAL_RUN_CONTEXT.runner.reschedule(self._parent_task)
904
905    def _child_finished(self, task, outcome):
906        self._children.remove(task)
907        if isinstance(outcome, Error):
908            self._add_exc(outcome.error)
909        self._check_nursery_closed()
910
911    async def _nested_child_finished(self, nested_child_exc):
912        """Returns MultiError instance if there are pending exceptions."""
913        if nested_child_exc is not None:
914            self._add_exc(nested_child_exc)
915        self._nested_child_running = False
916        self._check_nursery_closed()
917
918        if not self._closed:
919            # If we get cancelled (or have an exception injected, like
920            # KeyboardInterrupt), then save that, but still wait until our
921            # children finish.
922            def aborted(raise_cancel):
923                self._add_exc(capture(raise_cancel).error)
924                return Abort.FAILED
925
926            self._parent_waiting_in_aexit = True
927            await wait_task_rescheduled(aborted)
928        else:
929            # Nothing to wait for, so just execute a checkpoint -- but we
930            # still need to mix any exception (e.g. from an external
931            # cancellation) in with the rest of our exceptions.
932            try:
933                await checkpoint()
934            except BaseException as exc:
935                self._add_exc(exc)
936
937        popped = self._parent_task._child_nurseries.pop()
938        assert popped is self
939        if self._pending_excs:
940            try:
941                return MultiError(self._pending_excs)
942            finally:
943                # avoid a garbage cycle
944                # (see test_nursery_cancel_doesnt_create_cyclic_garbage)
945                del self._pending_excs
946
947    def start_soon(self, async_fn, *args, name=None):
948        """Creates a child task, scheduling ``await async_fn(*args)``.
949
950        This and :meth:`start` are the two fundamental methods for
951        creating concurrent tasks in Trio.
952
953        Note that this is *not* an async function and you don't use await
954        when calling it. It sets up the new task, but then returns
955        immediately, *before* it has a chance to run. The new task won’t
956        actually get a chance to do anything until some later point when
957        you execute a checkpoint and the scheduler decides to run it.
958        If you want to run a function and immediately wait for its result,
959        then you don't need a nursery; just use ``await async_fn(*args)``.
960        If you want to wait for the task to initialize itself before
961        continuing, see :meth:`start`.
962
963        It's possible to pass a nursery object into another task, which
964        allows that task to start new child tasks in the first task's
965        nursery.
966
967        The child task inherits its parent nursery's cancel scopes.
968
969        Args:
970            async_fn: An async callable.
971            args: Positional arguments for ``async_fn``. If you want
972                  to pass keyword arguments, use
973                  :func:`functools.partial`.
974            name: The name for this task. Only used for
975                  debugging/introspection
976                  (e.g. ``repr(task_obj)``). If this isn't a string,
977                  :meth:`start_soon` will try to make it one. A
978                  common use case is if you're wrapping a function
979                  before spawning a new task, you might pass the
980                  original function as the ``name=`` to make
981                  debugging easier.
982
983        Raises:
984            RuntimeError: If this nursery is no longer open
985                          (i.e. its ``async with`` block has
986                          exited).
987        """
988        GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)
989
990    async def start(self, async_fn, *args, name=None):
991        r"""Creates and initializes a child task.
992
993        Like :meth:`start_soon`, but blocks until the new task has
994        finished initializing itself, and optionally returns some
995        information from it.
996
997        The ``async_fn`` must accept a ``task_status`` keyword argument,
998        and it must make sure that it (or someone) eventually calls
999        ``task_status.started()``.
1000
1001        The conventional way to define ``async_fn`` is like::
1002
1003            async def async_fn(arg1, arg2, *, task_status=trio.TASK_STATUS_IGNORED):
1004                ...
1005                task_status.started()
1006                ...
1007
1008        :attr:`trio.TASK_STATUS_IGNORED` is a special global object with
1009        a do-nothing ``started`` method. This way your function supports
1010        being called either like ``await nursery.start(async_fn, arg1,
1011        arg2)`` or directly like ``await async_fn(arg1, arg2)``, and
1012        either way it can call ``task_status.started()`` without
1013        worrying about which mode it's in. Defining your function like
1014        this will make it obvious to readers that it supports being used
1015        in both modes.
1016
1017        Before the child calls ``task_status.started()``, it's
1018        effectively run underneath the call to :meth:`start`: if it
1019        raises an exception then that exception is reported by
1020        :meth:`start`, and does *not* propagate out of the nursery. If
1021        :meth:`start` is cancelled, then the child task is also
1022        cancelled.
1023
1024        When the child calls ``task_status.started()``, it's moved out
1025        from underneath :meth:`start` and into the given nursery.
1026
1027        If the child task passes a value to
1028        ``task_status.started(value)``, then :meth:`start` returns this
1029        value. Otherwise it returns ``None``.
1030        """
1031        if self._closed:
1032            raise RuntimeError("Nursery is closed to new arrivals")
1033        try:
1034            self._pending_starts += 1
1035            async with open_nursery() as old_nursery:
1036                task_status = _TaskStatus(old_nursery, self)
1037                thunk = functools.partial(async_fn, task_status=task_status)
1038                task = GLOBAL_RUN_CONTEXT.runner.spawn_impl(
1039                    thunk, args, old_nursery, name
1040                )
1041                task._eventual_parent_nursery = self
1042                # Wait for either _TaskStatus.started or an exception to
1043                # cancel this nursery:
1044            # If we get here, then the child either got reparented or exited
1045            # normally. The complicated logic is all in _TaskStatus.started().
1046            # (Any exceptions propagate directly out of the above.)
1047            if not task_status._called_started:
1048                raise RuntimeError("child exited without calling task_status.started()")
1049            return task_status._value
1050        finally:
1051            self._pending_starts -= 1
1052            self._check_nursery_closed()
1053
1054    def __del__(self):
1055        assert not self._children
1056
1057
1058################################################################
1059# Task and friends
1060################################################################
1061
1062
1063@attr.s(eq=False, hash=False, repr=False, slots=True)
1064class Task(metaclass=NoPublicConstructor):
1065    _parent_nursery = attr.ib()
1066    coro = attr.ib()
1067    _runner = attr.ib()
1068    name = attr.ib()
1069    # PEP 567 contextvars context
1070    context = attr.ib()
1071    _counter = attr.ib(init=False, factory=itertools.count().__next__)
1072
1073    # Invariant:
1074    # - for unscheduled tasks, _next_send_fn and _next_send are both None
1075    # - for scheduled tasks, _next_send_fn(_next_send) resumes the task;
1076    #   usually _next_send_fn is self.coro.send and _next_send is an
1077    #   Outcome. When recovering from a foreign await, _next_send_fn is
1078    #   self.coro.throw and _next_send is an exception. _next_send_fn
1079    #   will effectively be at the top of every task's call stack, so
1080    #   it should be written in C if you don't want to pollute Trio
1081    #   tracebacks with extraneous frames.
1082    # - for scheduled tasks, custom_sleep_data is None
1083    # Tasks start out unscheduled.
1084    _next_send_fn = attr.ib(default=None)
1085    _next_send = attr.ib(default=None)
1086    _abort_func = attr.ib(default=None)
1087    custom_sleep_data = attr.ib(default=None)
1088
1089    # For introspection and nursery.start()
1090    _child_nurseries = attr.ib(factory=list)
1091    _eventual_parent_nursery = attr.ib(default=None)
1092
1093    # these are counts of how many cancel/schedule points this task has
1094    # executed, for assert{_no,}_checkpoints
1095    # XX maybe these should be exposed as part of a statistics() method?
1096    _cancel_points = attr.ib(default=0)
1097    _schedule_points = attr.ib(default=0)
1098
1099    def __repr__(self):
1100        return "<Task {!r} at {:#x}>".format(self.name, id(self))
1101
1102    @property
1103    def parent_nursery(self):
1104        """The nursery this task is inside (or None if this is the "init"
1105        task).
1106
1107        Example use case: drawing a visualization of the task tree in a
1108        debugger.
1109
1110        """
1111        return self._parent_nursery
1112
1113    @property
1114    def eventual_parent_nursery(self):
1115        """The nursery this task will be inside after it calls
1116        ``task_status.started()``.
1117
1118        If this task has already called ``started()``, or if it was not
1119        spawned using `nursery.start() <trio.Nursery.start>`, then
1120        its `eventual_parent_nursery` is ``None``.
1121
1122        """
1123        return self._eventual_parent_nursery
1124
1125    @property
1126    def child_nurseries(self):
1127        """The nurseries this task contains.
1128
1129        This is a list, with outer nurseries before inner nurseries.
1130
1131        """
1132        return list(self._child_nurseries)
1133
1134    ################
1135    # Cancellation
1136    ################
1137
1138    # The CancelStatus object that is currently active for this task.
1139    # Don't change this directly; instead, use _activate_cancel_status().
1140    _cancel_status = attr.ib(default=None, repr=False)
1141
1142    def _activate_cancel_status(self, cancel_status):
1143        if self._cancel_status is not None:
1144            self._cancel_status._tasks.remove(self)
1145        self._cancel_status = cancel_status
1146        if self._cancel_status is not None:
1147            self._cancel_status._tasks.add(self)
1148            if self._cancel_status.effectively_cancelled:
1149                self._attempt_delivery_of_any_pending_cancel()
1150
1151    def _attempt_abort(self, raise_cancel):
1152        # Either the abort succeeds, in which case we will reschedule the
1153        # task, or else it fails, in which case it will worry about
1154        # rescheduling itself (hopefully eventually calling reraise to raise
1155        # the given exception, but not necessarily).
1156        success = self._abort_func(raise_cancel)
1157        if type(success) is not Abort:
1158            raise TrioInternalError("abort function must return Abort enum")
1159        # We only attempt to abort once per blocking call, regardless of
1160        # whether we succeeded or failed.
1161        self._abort_func = None
1162        if success is Abort.SUCCEEDED:
1163            self._runner.reschedule(self, capture(raise_cancel))
1164
1165    def _attempt_delivery_of_any_pending_cancel(self):
1166        if self._abort_func is None:
1167            return
1168        if not self._cancel_status.effectively_cancelled:
1169            return
1170
1171        def raise_cancel():
1172            raise Cancelled._create()
1173
1174        self._attempt_abort(raise_cancel)
1175
1176    def _attempt_delivery_of_pending_ki(self):
1177        assert self._runner.ki_pending
1178        if self._abort_func is None:
1179            return
1180
1181        def raise_cancel():
1182            self._runner.ki_pending = False
1183            raise KeyboardInterrupt
1184
1185        self._attempt_abort(raise_cancel)
1186
1187
1188################################################################
1189# The central Runner object
1190################################################################
1191
1192
1193class RunContext(threading.local):
1194    runner: "Runner"
1195    task: Task
1196
1197
1198GLOBAL_RUN_CONTEXT = RunContext()
1199
1200
1201@attr.s(frozen=True)
1202class _RunStatistics:
1203    tasks_living = attr.ib()
1204    tasks_runnable = attr.ib()
1205    seconds_to_next_deadline = attr.ib()
1206    io_statistics = attr.ib()
1207    run_sync_soon_queue_size = attr.ib()
1208
1209
1210# This holds all the state that gets trampolined back and forth between
1211# callbacks when we're running in guest mode.
1212#
1213# It has to be a separate object from Runner, and Runner *cannot* hold
1214# references to it (directly or indirectly)!
1215#
1216# The idea is that we want a chance to detect if our host loop quits and stops
1217# driving us forward. We detect that by unrolled_run_gen being garbage
1218# collected, and hitting its 'except GeneratorExit:' block. So this only
1219# happens if unrolled_run_gen is GCed.
1220#
1221# The Runner state is referenced from the global GLOBAL_RUN_CONTEXT. The only
1222# way it gets *un*referenced is by unrolled_run_gen completing, e.g. by being
1223# GCed. But if Runner has a direct or indirect reference to it, and the host
1224# loop has abandoned it, then this will never happen!
1225#
1226# So this object can reference Runner, but Runner can't reference it. The only
1227# references to it are the "in flight" callback chain on the host loop /
1228# worker thread.
1229@attr.s(eq=False, hash=False, slots=True)
1230class GuestState:
1231    runner = attr.ib()
1232    run_sync_soon_threadsafe = attr.ib()
1233    run_sync_soon_not_threadsafe = attr.ib()
1234    done_callback = attr.ib()
1235    unrolled_run_gen = attr.ib()
1236    _value_factory: Callable[[], Value] = lambda: Value(None)
1237    unrolled_run_next_send = attr.ib(factory=_value_factory, type=Outcome)
1238
1239    def guest_tick(self):
1240        try:
1241            timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen)
1242        except StopIteration:
1243            self.done_callback(self.runner.main_task_outcome)
1244            return
1245        except TrioInternalError as exc:
1246            self.done_callback(Error(exc))
1247            return
1248
1249        # Optimization: try to skip going into the thread if we can avoid it
1250        events_outcome = capture(self.runner.io_manager.get_events, 0)
1251        if timeout <= 0 or isinstance(events_outcome, Error) or events_outcome.value:
1252            # No need to go into the thread
1253            self.unrolled_run_next_send = events_outcome
1254            self.runner.guest_tick_scheduled = True
1255            self.run_sync_soon_not_threadsafe(self.guest_tick)
1256        else:
1257            # Need to go into the thread and call get_events() there
1258            self.runner.guest_tick_scheduled = False
1259
1260            def get_events():
1261                return self.runner.io_manager.get_events(timeout)
1262
1263            def deliver(events_outcome):
1264                def in_main_thread():
1265                    self.unrolled_run_next_send = events_outcome
1266                    self.runner.guest_tick_scheduled = True
1267                    self.guest_tick()
1268
1269                self.run_sync_soon_threadsafe(in_main_thread)
1270
1271            start_thread_soon(get_events, deliver)
1272
1273
1274@attr.s(eq=False, hash=False, slots=True)
1275class Runner:
1276    clock = attr.ib()
1277    instruments: Instruments = attr.ib()
1278    io_manager = attr.ib()
1279    ki_manager = attr.ib()
1280
1281    # Run-local values, see _local.py
1282    _locals = attr.ib(factory=dict)
1283
1284    runq = attr.ib(factory=deque)
1285    tasks = attr.ib(factory=set)
1286
1287    deadlines = attr.ib(factory=Deadlines)
1288
1289    init_task = attr.ib(default=None)
1290    system_nursery = attr.ib(default=None)
1291    system_context = attr.ib(default=None)
1292    main_task = attr.ib(default=None)
1293    main_task_outcome = attr.ib(default=None)
1294
1295    entry_queue = attr.ib(factory=EntryQueue)
1296    trio_token = attr.ib(default=None)
1297    asyncgens = attr.ib(factory=AsyncGenerators)
1298
1299    # If everything goes idle for this long, we call clock._autojump()
1300    clock_autojump_threshold = attr.ib(default=inf)
1301
1302    # Guest mode stuff
1303    is_guest = attr.ib(default=False)
1304    guest_tick_scheduled = attr.ib(default=False)
1305
1306    def force_guest_tick_asap(self):
1307        if self.guest_tick_scheduled:
1308            return
1309        self.guest_tick_scheduled = True
1310        self.io_manager.force_wakeup()
1311
1312    def close(self):
1313        self.io_manager.close()
1314        self.entry_queue.close()
1315        self.asyncgens.close()
1316        if "after_run" in self.instruments:
1317            self.instruments.call("after_run")
1318        # This is where KI protection gets disabled, so we do it last
1319        self.ki_manager.close()
1320
1321    @_public
1322    def current_statistics(self):
1323        """Returns an object containing run-loop-level debugging information.
1324
1325        Currently the following fields are defined:
1326
1327        * ``tasks_living`` (int): The number of tasks that have been spawned
1328          and not yet exited.
1329        * ``tasks_runnable`` (int): The number of tasks that are currently
1330          queued on the run queue (as opposed to blocked waiting for something
1331          to happen).
1332        * ``seconds_to_next_deadline`` (float): The time until the next
1333          pending cancel scope deadline. May be negative if the deadline has
1334          expired but we haven't yet processed cancellations. May be
1335          :data:`~math.inf` if there are no pending deadlines.
1336        * ``run_sync_soon_queue_size`` (int): The number of
1337          unprocessed callbacks queued via
1338          :meth:`trio.lowlevel.TrioToken.run_sync_soon`.
1339        * ``io_statistics`` (object): Some statistics from Trio's I/O
1340          backend. This always has an attribute ``backend`` which is a string
1341          naming which operating-system-specific I/O backend is in use; the
1342          other attributes vary between backends.
1343
1344        """
1345        seconds_to_next_deadline = self.deadlines.next_deadline() - self.current_time()
1346        return _RunStatistics(
1347            tasks_living=len(self.tasks),
1348            tasks_runnable=len(self.runq),
1349            seconds_to_next_deadline=seconds_to_next_deadline,
1350            io_statistics=self.io_manager.statistics(),
1351            run_sync_soon_queue_size=self.entry_queue.size(),
1352        )
1353
1354    @_public
1355    def current_time(self):
1356        """Returns the current time according to Trio's internal clock.
1357
1358        Returns:
1359            float: The current time.
1360
1361        Raises:
1362            RuntimeError: if not inside a call to :func:`trio.run`.
1363
1364        """
1365        return self.clock.current_time()
1366
1367    @_public
1368    def current_clock(self):
1369        """Returns the current :class:`~trio.abc.Clock`."""
1370        return self.clock
1371
1372    @_public
1373    def current_root_task(self):
1374        """Returns the current root :class:`Task`.
1375
1376        This is the task that is the ultimate parent of all other tasks.
1377
1378        """
1379        return self.init_task
1380
1381    ################
1382    # Core task handling primitives
1383    ################
1384
1385    @_public
1386    def reschedule(self, task, next_send=_NO_SEND):
1387        """Reschedule the given task with the given
1388        :class:`outcome.Outcome`.
1389
1390        See :func:`wait_task_rescheduled` for the gory details.
1391
1392        There must be exactly one call to :func:`reschedule` for every call to
1393        :func:`wait_task_rescheduled`. (And when counting, keep in mind that
1394        returning :data:`Abort.SUCCEEDED` from an abort callback is equivalent
1395        to calling :func:`reschedule` once.)
1396
1397        Args:
1398          task (trio.lowlevel.Task): the task to be rescheduled. Must be blocked
1399              in a call to :func:`wait_task_rescheduled`.
1400          next_send (outcome.Outcome): the value (or error) to return (or
1401              raise) from :func:`wait_task_rescheduled`.
1402
1403        """
1404        if next_send is _NO_SEND:
1405            next_send = Value(None)
1406
1407        assert task._runner is self
1408        assert task._next_send_fn is None
1409        task._next_send_fn = task.coro.send
1410        task._next_send = next_send
1411        task._abort_func = None
1412        task.custom_sleep_data = None
1413        if not self.runq and self.is_guest:
1414            self.force_guest_tick_asap()
1415        self.runq.append(task)
1416        if "task_scheduled" in self.instruments:
1417            self.instruments.call("task_scheduled", task)
1418
1419    def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False):
1420
1421        ######
1422        # Make sure the nursery is in working order
1423        ######
1424
1425        # This sorta feels like it should be a method on nursery, except it
1426        # has to handle nursery=None for init. And it touches the internals of
1427        # all kinds of objects.
1428        if nursery is not None and nursery._closed:
1429            raise RuntimeError("Nursery is closed to new arrivals")
1430        if nursery is None:
1431            assert self.init_task is None
1432
1433        ######
1434        # Call the function and get the coroutine object, while giving helpful
1435        # errors for common mistakes.
1436        ######
1437        coro = coroutine_or_error(async_fn, *args)
1438
1439        if name is None:
1440            name = async_fn
1441        if isinstance(name, functools.partial):
1442            name = name.func
1443        if not isinstance(name, str):
1444            try:
1445                name = "{}.{}".format(name.__module__, name.__qualname__)
1446            except AttributeError:
1447                name = repr(name)
1448
1449        if system_task:
1450            context = self.system_context.copy()
1451        else:
1452            context = copy_context()
1453
1454        if not hasattr(coro, "cr_frame"):
1455            # This async function is implemented in C or Cython
1456            async def python_wrapper(orig_coro):
1457                return await orig_coro
1458
1459            coro = python_wrapper(coro)
1460        coro.cr_frame.f_locals.setdefault(LOCALS_KEY_KI_PROTECTION_ENABLED, system_task)
1461
1462        ######
1463        # Set up the Task object
1464        ######
1465        task = Task._create(
1466            coro=coro, parent_nursery=nursery, runner=self, name=name, context=context
1467        )
1468
1469        self.tasks.add(task)
1470        if nursery is not None:
1471            nursery._children.add(task)
1472            task._activate_cancel_status(nursery._cancel_status)
1473
1474        if "task_spawned" in self.instruments:
1475            self.instruments.call("task_spawned", task)
1476        # Special case: normally next_send should be an Outcome, but for the
1477        # very first send we have to send a literal unboxed None.
1478        self.reschedule(task, None)
1479        return task
1480
1481    def task_exited(self, task, outcome):
1482        if (
1483            task._cancel_status is not None
1484            and task._cancel_status.abandoned_by_misnesting
1485            and task._cancel_status.parent is None
1486        ):
1487            # The cancel scope surrounding this task's nursery was closed
1488            # before the task exited. Force the task to exit with an error,
1489            # since the error might not have been caught elsewhere. See the
1490            # comments in CancelStatus.close().
1491            try:
1492                # Raise this, rather than just constructing it, to get a
1493                # traceback frame included
1494                raise RuntimeError(
1495                    "Cancel scope stack corrupted: cancel scope surrounding "
1496                    "{!r} was closed before the task exited\n{}".format(
1497                        task, MISNESTING_ADVICE
1498                    )
1499                )
1500            except RuntimeError as new_exc:
1501                if isinstance(outcome, Error):
1502                    new_exc.__context__ = outcome.error
1503                outcome = Error(new_exc)
1504
1505        task._activate_cancel_status(None)
1506        self.tasks.remove(task)
1507        if task is self.init_task:
1508            # If the init task crashed, then something is very wrong and we
1509            # let the error propagate. (It'll eventually be wrapped in a
1510            # TrioInternalError.)
1511            outcome.unwrap()
1512            # the init task should be the last task to exit. If not, then
1513            # something is very wrong.
1514            if self.tasks:  # pragma: no cover
1515                raise TrioInternalError
1516        else:
1517            if task is self.main_task:
1518                self.main_task_outcome = outcome
1519                outcome = Value(None)
1520            task._parent_nursery._child_finished(task, outcome)
1521
1522        if "task_exited" in self.instruments:
1523            self.instruments.call("task_exited", task)
1524
1525    ################
1526    # System tasks and init
1527    ################
1528
1529    @_public
1530    def spawn_system_task(self, async_fn, *args, name=None):
1531        """Spawn a "system" task.
1532
1533        System tasks have a few differences from regular tasks:
1534
1535        * They don't need an explicit nursery; instead they go into the
1536          internal "system nursery".
1537
1538        * If a system task raises an exception, then it's converted into a
1539          :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. If you
1540          write a system task, you should be careful to make sure it doesn't
1541          crash.
1542
1543        * System tasks are automatically cancelled when the main task exits.
1544
1545        * By default, system tasks have :exc:`KeyboardInterrupt` protection
1546          *enabled*. If you want your task to be interruptible by control-C,
1547          then you need to use :func:`disable_ki_protection` explicitly (and
1548          come up with some plan for what to do with a
1549          :exc:`KeyboardInterrupt`, given that system tasks aren't allowed to
1550          raise exceptions).
1551
1552        * System tasks do not inherit context variables from their creator.
1553
1554        Towards the end of a call to :meth:`trio.run`, after the main
1555        task and all system tasks have exited, the system nursery
1556        becomes closed. At this point, new calls to
1557        :func:`spawn_system_task` will raise ``RuntimeError("Nursery
1558        is closed to new arrivals")`` instead of creating a system
1559        task. It's possible to encounter this state either in
1560        a ``finally`` block in an async generator, or in a callback
1561        passed to :meth:`TrioToken.run_sync_soon` at the right moment.
1562
1563        Args:
1564          async_fn: An async callable.
1565          args: Positional arguments for ``async_fn``. If you want to pass
1566              keyword arguments, use :func:`functools.partial`.
1567          name: The name for this task. Only used for debugging/introspection
1568              (e.g. ``repr(task_obj)``). If this isn't a string,
1569              :func:`spawn_system_task` will try to make it one. A common use
1570              case is if you're wrapping a function before spawning a new
1571              task, you might pass the original function as the ``name=`` to
1572              make debugging easier.
1573
1574        Returns:
1575          Task: the newly spawned task
1576
1577        """
1578        return self.spawn_impl(
1579            async_fn, args, self.system_nursery, name, system_task=True
1580        )
1581
1582    async def init(self, async_fn, args):
1583        # run_sync_soon task runs here:
1584        async with open_nursery() as run_sync_soon_nursery:
1585            # All other system tasks run here:
1586            async with open_nursery() as self.system_nursery:
1587                # Only the main task runs here:
1588                async with open_nursery() as main_task_nursery:
1589                    try:
1590                        self.main_task = self.spawn_impl(
1591                            async_fn, args, main_task_nursery, None
1592                        )
1593                    except BaseException as exc:
1594                        self.main_task_outcome = Error(exc)
1595                        return
1596                    self.spawn_impl(
1597                        self.entry_queue.task,
1598                        (),
1599                        run_sync_soon_nursery,
1600                        "<TrioToken.run_sync_soon task>",
1601                        system_task=True,
1602                    )
1603
1604                # Main task is done; start shutting down system tasks
1605                self.system_nursery.cancel_scope.cancel()
1606
1607            # System nursery is closed; finalize remaining async generators
1608            await self.asyncgens.finalize_remaining(self)
1609
1610            # There are no more asyncgens, which means no more user-provided
1611            # code except possibly run_sync_soon callbacks. It's finally safe
1612            # to stop the run_sync_soon task and exit run().
1613            run_sync_soon_nursery.cancel_scope.cancel()
1614
1615    ################
1616    # Outside context problems
1617    ################
1618
1619    @_public
1620    def current_trio_token(self):
1621        """Retrieve the :class:`TrioToken` for the current call to
1622        :func:`trio.run`.
1623
1624        """
1625        if self.trio_token is None:
1626            self.trio_token = TrioToken._create(self.entry_queue)
1627        return self.trio_token
1628
1629    ################
1630    # KI handling
1631    ################
1632
1633    ki_pending = attr.ib(default=False)
1634
1635    # deliver_ki is broke. Maybe move all the actual logic and state into
1636    # RunToken, and we'll only have one instance per runner? But then we can't
1637    # have a public constructor. Eh, but current_run_token() returning a
1638    # unique object per run feels pretty nice. Maybe let's just go for it. And
1639    # keep the class public so people can isinstance() it if they want.
1640
1641    # This gets called from signal context
1642    def deliver_ki(self):
1643        self.ki_pending = True
1644        try:
1645            self.entry_queue.run_sync_soon(self._deliver_ki_cb)
1646        except RunFinishedError:
1647            pass
1648
1649    def _deliver_ki_cb(self):
1650        if not self.ki_pending:
1651            return
1652        # Can't happen because main_task and run_sync_soon_task are created at
1653        # the same time -- so even if KI arrives before main_task is created,
1654        # we won't get here until afterwards.
1655        assert self.main_task is not None
1656        if self.main_task_outcome is not None:
1657            # We're already in the process of exiting -- leave ki_pending set
1658            # and we'll check it again on our way out of run().
1659            return
1660        self.main_task._attempt_delivery_of_pending_ki()
1661
1662    ################
1663    # Quiescing
1664    ################
1665
1666    waiting_for_idle = attr.ib(factory=SortedDict)
1667
1668    @_public
1669    async def wait_all_tasks_blocked(self, cushion=0.0):
1670        """Block until there are no runnable tasks.
1671
1672        This is useful in testing code when you want to give other tasks a
1673        chance to "settle down". The calling task is blocked, and doesn't wake
1674        up until all other tasks are also blocked for at least ``cushion``
1675        seconds. (Setting a non-zero ``cushion`` is intended to handle cases
1676        like two tasks talking to each other over a local socket, where we
1677        want to ignore the potential brief moment between a send and receive
1678        when all tasks are blocked.)
1679
1680        Note that ``cushion`` is measured in *real* time, not the Trio clock
1681        time.
1682
1683        If there are multiple tasks blocked in :func:`wait_all_tasks_blocked`,
1684        then the one with the shortest ``cushion`` is the one woken (and
1685        this task becoming unblocked resets the timers for the remaining
1686        tasks). If there are multiple tasks that have exactly the same
1687        ``cushion``, then all are woken.
1688
1689        You should also consider :class:`trio.testing.Sequencer`, which
1690        provides a more explicit way to control execution ordering within a
1691        test, and will often produce more readable tests.
1692
1693        Example:
1694          Here's an example of one way to test that Trio's locks are fair: we
1695          take the lock in the parent, start a child, wait for the child to be
1696          blocked waiting for the lock (!), and then check that we can't
1697          release and immediately re-acquire the lock::
1698
1699             async def lock_taker(lock):
1700                 await lock.acquire()
1701                 lock.release()
1702
1703             async def test_lock_fairness():
1704                 lock = trio.Lock()
1705                 await lock.acquire()
1706                 async with trio.open_nursery() as nursery:
1707                     nursery.start_soon(lock_taker, lock)
1708                     # child hasn't run yet, we have the lock
1709                     assert lock.locked()
1710                     assert lock._owner is trio.lowlevel.current_task()
1711                     await trio.testing.wait_all_tasks_blocked()
1712                     # now the child has run and is blocked on lock.acquire(), we
1713                     # still have the lock
1714                     assert lock.locked()
1715                     assert lock._owner is trio.lowlevel.current_task()
1716                     lock.release()
1717                     try:
1718                         # The child has a prior claim, so we can't have it
1719                         lock.acquire_nowait()
1720                     except trio.WouldBlock:
1721                         assert lock._owner is not trio.lowlevel.current_task()
1722                         print("PASS")
1723                     else:
1724                         print("FAIL")
1725
1726        """
1727        task = current_task()
1728        key = (cushion, id(task))
1729        self.waiting_for_idle[key] = task
1730
1731        def abort(_):
1732            del self.waiting_for_idle[key]
1733            return Abort.SUCCEEDED
1734
1735        await wait_task_rescheduled(abort)
1736
1737
1738################################################################
1739# run
1740################################################################
1741#
1742# Trio's core task scheduler and coroutine runner is in 'unrolled_run'. It's
1743# called that because it has an unusual feature: it's actually a generator.
1744# Whenever it needs to fetch IO events from the OS, it yields, and waits for
1745# its caller to send the IO events back in. So the loop is "unrolled" into a
1746# sequence of generator send() calls.
1747#
1748# The reason for this unusual design is to support two different modes of
1749# operation, where the IO is handled differently.
1750#
1751# In normal mode using trio.run, the scheduler and IO run in the same thread:
1752#
1753# Main thread:
1754#
1755# +---------------------------+
1756# | Run tasks                 |
1757# | (unrolled_run)            |
1758# +---------------------------+
1759# | Block waiting for I/O     |
1760# | (io_manager.get_events)   |
1761# +---------------------------+
1762# | Run tasks                 |
1763# | (unrolled_run)            |
1764# +---------------------------+
1765# | Block waiting for I/O     |
1766# | (io_manager.get_events)   |
1767# +---------------------------+
1768# :
1769#
1770#
1771# In guest mode using trio.lowlevel.start_guest_run, the scheduler runs on the
1772# main thread as a host loop callback, but blocking for IO gets pushed into a
1773# worker thread:
1774#
1775# Main thread executing host loop:           Trio I/O thread:
1776#
1777# +---------------------------+
1778# | Run Trio tasks            |
1779# | (unrolled_run)            |
1780# +---------------------------+ --------------+
1781#                                             v
1782# +---------------------------+              +----------------------------+
1783# | Host loop does whatever   |              | Block waiting for Trio I/O |
1784# | it wants                  |              | (io_manager.get_events)    |
1785# +---------------------------+              +----------------------------+
1786#                                             |
1787# +---------------------------+ <-------------+
1788# | Run Trio tasks            |
1789# | (unrolled_run)            |
1790# +---------------------------+ --------------+
1791#                                             v
1792# +---------------------------+              +----------------------------+
1793# | Host loop does whatever   |              | Block waiting for Trio I/O |
1794# | it wants                  |              | (io_manager.get_events)    |
1795# +---------------------------+              +----------------------------+
1796# :                                            :
1797#
1798# Most of Trio's internals don't need to care about this difference. The main
1799# complication it creates is that in guest mode, we might need to wake up not
1800# just due to OS-reported IO events, but also because of code running on the
1801# host loop calling reschedule() or changing task deadlines. Search for
1802# 'is_guest' to see the special cases we need to handle this.
1803
1804
1805def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints):
1806    """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT."""
1807    # It wouldn't be *hard* to support nested calls to run(), but I can't
1808    # think of a single good reason for it, so let's be conservative for
1809    # now:
1810    if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
1811        raise RuntimeError("Attempted to call run() from inside a run()")
1812
1813    if clock is None:
1814        clock = SystemClock()
1815    instruments = Instruments(instruments)
1816    io_manager = TheIOManager()
1817    system_context = copy_context()
1818    system_context.run(current_async_library_cvar.set, "trio")
1819    ki_manager = KIManager()
1820
1821    runner = Runner(
1822        clock=clock,
1823        instruments=instruments,
1824        io_manager=io_manager,
1825        system_context=system_context,
1826        ki_manager=ki_manager,
1827    )
1828    runner.asyncgens.install_hooks(runner)
1829
1830    # This is where KI protection gets enabled, so we want to do it early - in
1831    # particular before we start modifying global state like GLOBAL_RUN_CONTEXT
1832    ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints)
1833
1834    GLOBAL_RUN_CONTEXT.runner = runner
1835    return runner
1836
1837
1838def run(
1839    async_fn,
1840    *args,
1841    clock=None,
1842    instruments=(),
1843    restrict_keyboard_interrupt_to_checkpoints=False,
1844):
1845    """Run a Trio-flavored async function, and return the result.
1846
1847    Calling::
1848
1849       run(async_fn, *args)
1850
1851    is the equivalent of::
1852
1853       await async_fn(*args)
1854
1855    except that :func:`run` can (and must) be called from a synchronous
1856    context.
1857
1858    This is Trio's main entry point. Almost every other function in Trio
1859    requires that you be inside a call to :func:`run`.
1860
1861    Args:
1862      async_fn: An async function.
1863
1864      args: Positional arguments to be passed to *async_fn*. If you need to
1865          pass keyword arguments, then use :func:`functools.partial`.
1866
1867      clock: ``None`` to use the default system-specific monotonic clock;
1868          otherwise, an object implementing the :class:`trio.abc.Clock`
1869          interface, like (for example) a :class:`trio.testing.MockClock`
1870          instance.
1871
1872      instruments (list of :class:`trio.abc.Instrument` objects): Any
1873          instrumentation you want to apply to this run. This can also be
1874          modified during the run; see :ref:`instrumentation`.
1875
1876      restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
1877          user hits control-C while :func:`run` is running? If this argument
1878          is False (the default), then you get the standard Python behavior: a
1879          :exc:`KeyboardInterrupt` exception will immediately interrupt
1880          whatever task is running (or if no task is running, then Trio will
1881          wake up a task to be interrupted). Alternatively, if you set this
1882          argument to True, then :exc:`KeyboardInterrupt` delivery will be
1883          delayed: it will be *only* be raised at :ref:`checkpoints
1884          <checkpoints>`, like a :exc:`Cancelled` exception.
1885
1886          The default behavior is nice because it means that even if you
1887          accidentally write an infinite loop that never executes any
1888          checkpoints, then you can still break out of it using control-C.
1889          The alternative behavior is nice if you're paranoid about a
1890          :exc:`KeyboardInterrupt` at just the wrong place leaving your
1891          program in an inconsistent state, because it means that you only
1892          have to worry about :exc:`KeyboardInterrupt` at the exact same
1893          places where you already have to worry about :exc:`Cancelled`.
1894
1895          This setting has no effect if your program has registered a custom
1896          SIGINT handler, or if :func:`run` is called from anywhere but the
1897          main thread (this is a Python limitation), or if you use
1898          :func:`open_signal_receiver` to catch SIGINT.
1899
1900    Returns:
1901      Whatever ``async_fn`` returns.
1902
1903    Raises:
1904      TrioInternalError: if an unexpected error is encountered inside Trio's
1905          internal machinery. This is a bug and you should `let us know
1906          <https://github.com/python-trio/trio/issues>`__.
1907
1908      Anything else: if ``async_fn`` raises an exception, then :func:`run`
1909          propagates it.
1910
1911    """
1912
1913    __tracebackhide__ = True
1914
1915    runner = setup_runner(
1916        clock, instruments, restrict_keyboard_interrupt_to_checkpoints
1917    )
1918
1919    gen = unrolled_run(runner, async_fn, args)
1920    next_send = None
1921    while True:
1922        try:
1923            timeout = gen.send(next_send)
1924        except StopIteration:
1925            break
1926        next_send = runner.io_manager.get_events(timeout)
1927    # Inlined copy of runner.main_task_outcome.unwrap() to avoid
1928    # cluttering every single Trio traceback with an extra frame.
1929    if isinstance(runner.main_task_outcome, Value):
1930        return runner.main_task_outcome.value
1931    else:
1932        raise runner.main_task_outcome.error
1933
1934
1935def start_guest_run(
1936    async_fn,
1937    *args,
1938    run_sync_soon_threadsafe,
1939    done_callback,
1940    run_sync_soon_not_threadsafe=None,
1941    host_uses_signal_set_wakeup_fd=False,
1942    clock=None,
1943    instruments=(),
1944    restrict_keyboard_interrupt_to_checkpoints=False,
1945):
1946    """Start a "guest" run of Trio on top of some other "host" event loop.
1947
1948    Each host loop can only have one guest run at a time.
1949
1950    You should always let the Trio run finish before stopping the host loop;
1951    if not, it may leave Trio's internal data structures in an inconsistent
1952    state. You might be able to get away with it if you immediately exit the
1953    program, but it's safest not to go there in the first place.
1954
1955    Generally, the best way to do this is wrap this in a function that starts
1956    the host loop and then immediately starts the guest run, and then shuts
1957    down the host when the guest run completes.
1958
1959    Args:
1960
1961      run_sync_soon_threadsafe: An arbitrary callable, which will be passed a
1962         function as its sole argument::
1963
1964            def my_run_sync_soon_threadsafe(fn):
1965                ...
1966
1967         This callable should schedule ``fn()`` to be run by the host on its
1968         next pass through its loop. **Must support being called from
1969         arbitrary threads.**
1970
1971      done_callback: An arbitrary callable::
1972
1973            def my_done_callback(run_outcome):
1974                ...
1975
1976         When the Trio run has finished, Trio will invoke this callback to let
1977         you know. The argument is an `outcome.Outcome`, reporting what would
1978         have been returned or raised by `trio.run`. This function can do
1979         anything you want, but commonly you'll want it to shut down the
1980         host loop, unwrap the outcome, etc.
1981
1982      run_sync_soon_not_threadsafe: Like ``run_sync_soon_threadsafe``, but
1983         will only be called from inside the host loop's main thread.
1984         Optional, but if your host loop allows you to implement this more
1985         efficiently than ``run_sync_soon_threadsafe`` then passing it will
1986         make things a bit faster.
1987
1988      host_uses_signal_set_wakeup_fd (bool): Pass `True` if your host loop
1989         uses `signal.set_wakeup_fd`, and `False` otherwise. For more details,
1990         see :ref:`guest-run-implementation`.
1991
1992    For the meaning of other arguments, see `trio.run`.
1993
1994    """
1995    runner = setup_runner(
1996        clock, instruments, restrict_keyboard_interrupt_to_checkpoints
1997    )
1998    runner.is_guest = True
1999    runner.guest_tick_scheduled = True
2000
2001    if run_sync_soon_not_threadsafe is None:
2002        run_sync_soon_not_threadsafe = run_sync_soon_threadsafe
2003
2004    guest_state = GuestState(
2005        runner=runner,
2006        run_sync_soon_threadsafe=run_sync_soon_threadsafe,
2007        run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe,
2008        done_callback=done_callback,
2009        unrolled_run_gen=unrolled_run(
2010            runner,
2011            async_fn,
2012            args,
2013            host_uses_signal_set_wakeup_fd=host_uses_signal_set_wakeup_fd,
2014        ),
2015    )
2016    run_sync_soon_not_threadsafe(guest_state.guest_tick)
2017
2018
2019# 24 hours is arbitrary, but it avoids issues like people setting timeouts of
2020# 10**20 and then getting integer overflows in the underlying system calls.
2021_MAX_TIMEOUT = 24 * 60 * 60
2022
2023
2024# Weird quirk: this is written as a generator in order to support "guest
2025# mode", where our core event loop gets unrolled into a series of callbacks on
2026# the host loop. If you're doing a regular trio.run then this gets run
2027# straight through.
2028def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
2029    locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
2030    __tracebackhide__ = True
2031
2032    try:
2033        if not host_uses_signal_set_wakeup_fd:
2034            runner.entry_queue.wakeup.wakeup_on_signals()
2035
2036        if "before_run" in runner.instruments:
2037            runner.instruments.call("before_run")
2038        runner.clock.start_clock()
2039        runner.init_task = runner.spawn_impl(
2040            runner.init, (async_fn, args), None, "<init>", system_task=True
2041        )
2042
2043        # You know how people talk about "event loops"? This 'while' loop right
2044        # here is our event loop:
2045        while runner.tasks:
2046            if runner.runq:
2047                timeout = 0
2048            else:
2049                deadline = runner.deadlines.next_deadline()
2050                timeout = runner.clock.deadline_to_sleep_time(deadline)
2051            timeout = min(max(0, timeout), _MAX_TIMEOUT)
2052
2053            idle_primed = None
2054            if runner.waiting_for_idle:
2055                cushion, _ = runner.waiting_for_idle.keys()[0]
2056                if cushion < timeout:
2057                    timeout = cushion
2058                    idle_primed = IdlePrimedTypes.WAITING_FOR_IDLE
2059            # We use 'elif' here because if there are tasks in
2060            # wait_all_tasks_blocked, then those tasks will wake up without
2061            # jumping the clock, so we don't need to autojump.
2062            elif runner.clock_autojump_threshold < timeout:
2063                timeout = runner.clock_autojump_threshold
2064                idle_primed = IdlePrimedTypes.AUTOJUMP_CLOCK
2065
2066            if "before_io_wait" in runner.instruments:
2067                runner.instruments.call("before_io_wait", timeout)
2068
2069            # Driver will call io_manager.get_events(timeout) and pass it back
2070            # in through the yield
2071            events = yield timeout
2072            runner.io_manager.process_events(events)
2073
2074            if "after_io_wait" in runner.instruments:
2075                runner.instruments.call("after_io_wait", timeout)
2076
2077            # Process cancellations due to deadline expiry
2078            now = runner.clock.current_time()
2079            if runner.deadlines.expire(now):
2080                idle_primed = None
2081
2082            # idle_primed != None means: if the IO wait hit the timeout, and
2083            # still nothing is happening, then we should start waking up
2084            # wait_all_tasks_blocked tasks or autojump the clock. But there
2085            # are some subtleties in defining "nothing is happening".
2086            #
2087            # 'not runner.runq' means that no tasks are currently runnable.
2088            # 'not events' means that the last IO wait call hit its full
2089            # timeout. These are very similar, and if idle_primed != None and
2090            # we're running in regular mode then they always go together. But,
2091            # in *guest* mode, they can happen independently, even when
2092            # idle_primed=True:
2093            #
2094            # - runner.runq=empty and events=True: the host loop adjusted a
2095            #   deadline and that forced an IO wakeup before the timeout expired,
2096            #   even though no actual tasks were scheduled.
2097            #
2098            # - runner.runq=nonempty and events=False: the IO wait hit its
2099            #   timeout, but then some code in the host thread rescheduled a task
2100            #   before we got here.
2101            #
2102            # So we need to check both.
2103            if idle_primed is not None and not runner.runq and not events:
2104                if idle_primed is IdlePrimedTypes.WAITING_FOR_IDLE:
2105                    while runner.waiting_for_idle:
2106                        key, task = runner.waiting_for_idle.peekitem(0)
2107                        if key[0] == cushion:
2108                            del runner.waiting_for_idle[key]
2109                            runner.reschedule(task)
2110                        else:
2111                            break
2112                else:
2113                    assert idle_primed is IdlePrimedTypes.AUTOJUMP_CLOCK
2114                    runner.clock._autojump()
2115
2116            # Process all runnable tasks, but only the ones that are already
2117            # runnable now. Anything that becomes runnable during this cycle
2118            # needs to wait until the next pass. This avoids various
2119            # starvation issues by ensuring that there's never an unbounded
2120            # delay between successive checks for I/O.
2121            #
2122            # Also, we randomize the order of each batch to avoid assumptions
2123            # about scheduling order sneaking in. In the long run, I suspect
2124            # we'll either (a) use strict FIFO ordering and document that for
2125            # predictability/determinism, or (b) implement a more
2126            # sophisticated scheduler (e.g. some variant of fair queueing),
2127            # for better behavior under load. For now, this is the worst of
2128            # both worlds - but it keeps our options open. (If we do decide to
2129            # go all in on deterministic scheduling, then there are other
2130            # things that will probably need to change too, like the deadlines
2131            # tie-breaker and the non-deterministic ordering of
2132            # task._notify_queues.)
2133            batch = list(runner.runq)
2134            runner.runq.clear()
2135            if _ALLOW_DETERMINISTIC_SCHEDULING:
2136                # We're running under Hypothesis, and pytest-trio has patched
2137                # this in to make the scheduler deterministic and avoid flaky
2138                # tests. It's not worth the (small) performance cost in normal
2139                # operation, since we'll shuffle the list and _r is only
2140                # seeded for tests.
2141                batch.sort(key=lambda t: t._counter)
2142                _r.shuffle(batch)
2143            else:
2144                # 50% chance of reversing the batch, this way each task
2145                # can appear before/after any other task.
2146                if _r.random() < 0.5:
2147                    batch.reverse()
2148            while batch:
2149                task = batch.pop()
2150                GLOBAL_RUN_CONTEXT.task = task
2151
2152                if "before_task_step" in runner.instruments:
2153                    runner.instruments.call("before_task_step", task)
2154
2155                next_send_fn = task._next_send_fn
2156                next_send = task._next_send
2157                task._next_send_fn = task._next_send = None
2158                final_outcome = None
2159                try:
2160                    # We used to unwrap the Outcome object here and send/throw
2161                    # its contents in directly, but it turns out that .throw()
2162                    # is buggy, at least on CPython 3.6:
2163                    #   https://bugs.python.org/issue29587
2164                    #   https://bugs.python.org/issue29590
2165                    # So now we send in the Outcome object and unwrap it on the
2166                    # other side.
2167                    msg = task.context.run(next_send_fn, next_send)
2168                except StopIteration as stop_iteration:
2169                    final_outcome = Value(stop_iteration.value)
2170                except BaseException as task_exc:
2171                    # Store for later, removing uninteresting top frames: 1
2172                    # frame we always remove, because it's this function
2173                    # catching it, and then in addition we remove however many
2174                    # more Context.run adds.
2175                    tb = task_exc.__traceback__.tb_next
2176                    for _ in range(CONTEXT_RUN_TB_FRAMES):
2177                        tb = tb.tb_next
2178                    final_outcome = Error(task_exc.with_traceback(tb))
2179                    # Remove local refs so that e.g. cancelled coroutine locals
2180                    # are not kept alive by this frame until another exception
2181                    # comes along.
2182                    del tb
2183
2184                if final_outcome is not None:
2185                    # We can't call this directly inside the except: blocks
2186                    # above, because then the exceptions end up attaching
2187                    # themselves to other exceptions as __context__ in
2188                    # unwanted ways.
2189                    runner.task_exited(task, final_outcome)
2190                    # final_outcome may contain a traceback ref. It's not as
2191                    # crucial compared to the above, but this will allow more
2192                    # prompt release of resources in coroutine locals.
2193                    final_outcome = None
2194                else:
2195                    task._schedule_points += 1
2196                    if msg is CancelShieldedCheckpoint:
2197                        runner.reschedule(task)
2198                    elif type(msg) is WaitTaskRescheduled:
2199                        task._cancel_points += 1
2200                        task._abort_func = msg.abort_func
2201                        # KI is "outside" all cancel scopes, so check for it
2202                        # before checking for regular cancellation:
2203                        if runner.ki_pending and task is runner.main_task:
2204                            task._attempt_delivery_of_pending_ki()
2205                        task._attempt_delivery_of_any_pending_cancel()
2206                    elif type(msg) is PermanentlyDetachCoroutineObject:
2207                        # Pretend the task just exited with the given outcome
2208                        runner.task_exited(task, msg.final_outcome)
2209                    else:
2210                        exc = TypeError(
2211                            "trio.run received unrecognized yield message {!r}. "
2212                            "Are you trying to use a library written for some "
2213                            "other framework like asyncio? That won't work "
2214                            "without some kind of compatibility shim.".format(msg)
2215                        )
2216                        # The foreign library probably doesn't adhere to our
2217                        # protocol of unwrapping whatever outcome gets sent in.
2218                        # Instead, we'll arrange to throw `exc` in directly,
2219                        # which works for at least asyncio and curio.
2220                        runner.reschedule(task, exc)
2221                        task._next_send_fn = task.coro.throw
2222                    # prevent long-lived reference
2223                    # TODO: develop test for this deletion
2224                    del msg
2225
2226                if "after_task_step" in runner.instruments:
2227                    runner.instruments.call("after_task_step", task)
2228                del GLOBAL_RUN_CONTEXT.task
2229                # prevent long-lived references
2230                # TODO: develop test for these deletions
2231                del task, next_send, next_send_fn
2232
2233    except GeneratorExit:
2234        # The run-loop generator has been garbage collected without finishing
2235        warnings.warn(
2236            RuntimeWarning(
2237                "Trio guest run got abandoned without properly finishing... "
2238                "weird stuff might happen"
2239            )
2240        )
2241    except TrioInternalError:
2242        raise
2243    except BaseException as exc:
2244        raise TrioInternalError("internal error in Trio - please file a bug!") from exc
2245    finally:
2246        GLOBAL_RUN_CONTEXT.__dict__.clear()
2247        runner.close()
2248        # Have to do this after runner.close() has disabled KI protection,
2249        # because otherwise there's a race where ki_pending could get set
2250        # after we check it.
2251        if runner.ki_pending:
2252            ki = KeyboardInterrupt()
2253            if isinstance(runner.main_task_outcome, Error):
2254                ki.__context__ = runner.main_task_outcome.error
2255            runner.main_task_outcome = Error(ki)
2256
2257
2258################################################################
2259# Other public API functions
2260################################################################
2261
2262
2263class _TaskStatusIgnored:
2264    def __repr__(self):
2265        return "TASK_STATUS_IGNORED"
2266
2267    def started(self, value=None):
2268        pass
2269
2270
2271TASK_STATUS_IGNORED = _TaskStatusIgnored()
2272
2273
2274def current_task():
2275    """Return the :class:`Task` object representing the current task.
2276
2277    Returns:
2278      Task: the :class:`Task` that called :func:`current_task`.
2279
2280    """
2281
2282    try:
2283        return GLOBAL_RUN_CONTEXT.task
2284    except AttributeError:
2285        raise RuntimeError("must be called from async context") from None
2286
2287
2288def current_effective_deadline():
2289    """Returns the current effective deadline for the current task.
2290
2291    This function examines all the cancellation scopes that are currently in
2292    effect (taking into account shielding), and returns the deadline that will
2293    expire first.
2294
2295    One example of where this might be is useful is if your code is trying to
2296    decide whether to begin an expensive operation like an RPC call, but wants
2297    to skip it if it knows that it can't possibly complete in the available
2298    time. Another example would be if you're using a protocol like gRPC that
2299    `propagates timeout information to the remote peer
2300    <http://www.grpc.io/docs/guides/concepts.html#deadlines>`__; this function
2301    gives a way to fetch that information so you can send it along.
2302
2303    If this is called in a context where a cancellation is currently active
2304    (i.e., a blocking call will immediately raise :exc:`Cancelled`), then
2305    returned deadline is ``-inf``. If it is called in a context where no
2306    scopes have a deadline set, it returns ``inf``.
2307
2308    Returns:
2309        float: the effective deadline, as an absolute time.
2310
2311    """
2312    return current_task()._cancel_status.effective_deadline()
2313
2314
2315async def checkpoint():
2316    """A pure :ref:`checkpoint <checkpoints>`.
2317
2318    This checks for cancellation and allows other tasks to be scheduled,
2319    without otherwise blocking.
2320
2321    Note that the scheduler has the option of ignoring this and continuing to
2322    run the current task if it decides this is appropriate (e.g. for increased
2323    efficiency).
2324
2325    Equivalent to ``await trio.sleep(0)`` (which is implemented by calling
2326    :func:`checkpoint`.)
2327
2328    """
2329    # The scheduler is what checks timeouts and converts them into
2330    # cancellations. So by doing the schedule point first, we ensure that the
2331    # cancel point has the most up-to-date info.
2332    await cancel_shielded_checkpoint()
2333    task = current_task()
2334    task._cancel_points += 1
2335    if task._cancel_status.effectively_cancelled or (
2336        task is task._runner.main_task and task._runner.ki_pending
2337    ):
2338        with CancelScope(deadline=-inf):
2339            await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
2340
2341
2342async def checkpoint_if_cancelled():
2343    """Issue a :ref:`checkpoint <checkpoints>` if the calling context has been
2344    cancelled.
2345
2346    Equivalent to (but potentially more efficient than)::
2347
2348        if trio.current_deadline() == -inf:
2349            await trio.lowlevel.checkpoint()
2350
2351    This is either a no-op, or else it allow other tasks to be scheduled and
2352    then raises :exc:`trio.Cancelled`.
2353
2354    Typically used together with :func:`cancel_shielded_checkpoint`.
2355
2356    """
2357    task = current_task()
2358    if task._cancel_status.effectively_cancelled or (
2359        task is task._runner.main_task and task._runner.ki_pending
2360    ):
2361        await _core.checkpoint()
2362        assert False  # pragma: no cover
2363    task._cancel_points += 1
2364
2365
2366if sys.platform == "win32":
2367    from ._io_windows import WindowsIOManager as TheIOManager
2368    from ._generated_io_windows import *
2369elif sys.platform == "linux" or (not TYPE_CHECKING and hasattr(select, "epoll")):
2370    from ._io_epoll import EpollIOManager as TheIOManager
2371    from ._generated_io_epoll import *
2372elif TYPE_CHECKING or hasattr(select, "kqueue"):
2373    from ._io_kqueue import KqueueIOManager as TheIOManager
2374    from ._generated_io_kqueue import *
2375else:  # pragma: no cover
2376    raise NotImplementedError("unsupported platform")
2377
2378from ._generated_run import *
2379from ._generated_instrumentation import *
2380