1# curio/kernel.py
2#
3# Main execution kernel.
4#
5# Curio is based on a few overarching design principles that drive the code
6# you'll find here.
7#
8# 1. Environmental Isolation.
9#
10#    Curio strictly separates the environment of async and synchronous
11#    programming.  All functionality related to async operation is
12#    placed in async-function definitions.  Async functions request
13#    the services of the kernel using low-level yield statements
14#    (traps).  The kernel is an opaque black-box from the perspective
15#    of synchronous code.  There is only one available
16#    operation--run(coro) which runs a new task.  There are no other
17#    mechanisms available for interacting with the kernel from
18#    synchronous code.  A good analogy might be the distinction
19#    between user and protected mode in an OS.  User programs run in
20#    user-mode and the operating system kernel runs in protected mode.
21#    The same thing happens here.  User programs in Curio can only run
22#    in async functions. Those programs can request the services of
23#    the kernel. However, they're not granted any further access than
24#    that (there is no API surface or anything that can be used).
25#
26# 2. Microkernels
27#
28#    The low-level kernel is meant to be small, fast, and minimally
29#    featureful.  In fact, almost nothing interesting happens in the
30#    kernel. Instead, almost every useful part of Curio gets
31#    implemented in async functions found elsewhere.  If you're trying
32#    to add new features to Curio, don't add them to the kernel. Think
33#    about how to create objects and functions that operate at the
34#    async-function level instead.  See files such as sync.py or
35#    queue.py for examples.
36#
37# 3. Decoupling
38#
39#    No part of Curio has direct linkage to the Kernel class (it's
40#    not imported or used anywhere else in the code base).   If you want,
41#    you can make a completely custom Kernel object and have the
42#    rest of Curio run on it.  You just need to make sure you implement
43#    the required traps.   This is in contrast to libraries such as
44#    asyncio where many parts of the implementation are required to
45#    carry a reference to the underlying event loop.
46
47__all__ = [ 'Kernel', 'run' ]
48
49# -- Standard Library
50
51import socket
52import time
53import os
54import errno
55from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
56from collections import deque
57import threading
58
59# Logger where uncaught exceptions from crashed tasks are logged
60import logging
61log = logging.getLogger(__name__)
62
63# -- Curio
64
65from .errors import *
66from .task import Task
67from .traps import _read_wait
68from . import meta
69from .timequeue import TimeQueue
70
71
72class Kernel(object):
73    '''
74    Curio run-time kernel.  The selector argument specifies a
75    different I/O selector. The debug argument specifies a list of
76    debugger objects to apply. For example:
77
78        from curio.debug import schedtrace, traptrace
79        k = Kernel(debug=[schedtrace, traptrace])
80
81    Use the kernel run() method to submit work to the kernel.
82    '''
83
84    def __init__(self, *, selector=None, debug=None, activations=None, taskcls=Task,
85                 max_select_timeout=None if os.name != 'nt' else 1.0):
86
87        # Functions to call at shutdown
88        self._shutdown_funcs = []
89
90        # I/O Selector setup
91        self._selector = selector if selector else DefaultSelector()
92        self._call_at_shutdown(self._selector.close)
93
94        # Task table
95        self._tasks = {}
96
97        # Coroutine runner function (created upon first call to run())
98        self._runner = None
99
100        # Activations
101        self._activations = activations if activations else []
102
103        # Debugging (activations in disguise)
104        if debug:
105            from .debug import _create_debuggers
106            self._activations.extend(_create_debuggers(debug))
107
108        # Task creation class
109        self._taskcls = taskcls
110
111        self._max_select_timeout = max_select_timeout
112
113
114    def __del__(self):
115        if self._shutdown_funcs is not None:
116            raise RuntimeError(
117                'Curio kernel not properly terminated.  Please use Kernel.run(shutdown=True)')
118
119    def __enter__(self):
120        return self
121
122    def __exit__(self, ty, val, tb):
123        if self._shutdown_funcs is not None:
124            self.run(shutdown=True)
125
126    def _call_at_shutdown(self, func):
127        self._shutdown_funcs.append(func)
128
129
130    # ----------
131    # Submit a new task to the kernel
132
133    def run(self, corofunc=None, *args, shutdown=False):
134        if self._shutdown_funcs is None:
135            raise RuntimeError("Can't run a kernel that's been shut down or crashed. Create a new kernel.")
136
137        coro = meta.instantiate_coroutine(corofunc, *args) if corofunc else None
138        with meta.running(self):
139            # Make the kernel runtime environment (if needed)
140            if not self._runner:
141                self._runner = self._make_kernel_runtime()
142
143            ret_val = ret_exc = None
144            # Run the supplied coroutine (if any)
145            if coro or not shutdown:
146                task = self._runner(coro)
147                if task:
148                    ret_exc = task.exception
149                    ret_val = task.result if not ret_exc else None
150                del task
151
152            # If shutdown has been requested, run the shutdown process
153            if shutdown:
154                # For "reasons" related to task scheduling, the task
155                # of shutting down all remaining tasks is best managed
156                # by a launching a task dedicated to carrying out the task (sic)
157                async def _shutdown_tasks(tocancel):
158                    for task in tocancel:
159                        await task.cancel()
160
161                tocancel = sorted(self._tasks.values(), key=lambda t: t.id, reverse=True)
162                self._runner(_shutdown_tasks(tocancel))
163                assert not self._tasks, "New tasks created during shutdown"
164                self._runner = None
165
166                # Call registered shutdown functions
167                for func in self._shutdown_funcs:
168                    func()
169                self._shutdown_funcs = None
170
171            if ret_exc:
172                raise ret_exc
173            else:
174                return ret_val
175
176    # ------------------------------------------------------------
177    # Kernel runtime
178    #
179    # This function creates the kernel execution environment. It
180    # returns a single function (a closure) that executes a coroutine.
181    #
182    # At first glance, this function is going to look giant and
183    # insane. It is implementing the kernel runtime as a self-contained
184    # black box.  There is no external API.  The only possible
185    # communication is via traps defined in curio/traps.py.
186    # It's best to think of this as a "program within a program".
187
188    def _make_kernel_runtime(kernel):
189
190        # Motto:  "What happens in the kernel stays in the kernel"
191
192        # ---- Kernel State
193        current = None                          # Currently running task
194        selector = kernel._selector             # Event selector
195        ready = deque()                         # Ready queue
196        tasks = kernel._tasks                   # Task table
197        sleepq = TimeQueue()                    # Sleeping task queue
198        wake_queue = deque()                    # Thread wake queue
199        _activations = []
200
201        # ---- Bound methods
202        selector_register = selector.register
203        selector_unregister = selector.unregister
204        selector_modify = selector.modify
205        selector_select = selector.select
206        selector_getkey = selector.get_key
207        selector_max_timeout = kernel._max_select_timeout
208
209        ready_popleft = ready.popleft
210        ready_append = ready.append
211        time_monotonic = time.monotonic
212        taskcls = kernel._taskcls
213
214        # ------------------------------------------------------------
215        # In-kernel task used for processing futures.
216        #
217        # Internal task that monitors the loopback socket--allowing the kernel to
218        # awake for non-I/O events.
219
220        # Loop-back sockets
221        notify_sock = None
222        wait_sock = None
223
224        async def _kernel_task():
225            wake_queue_popleft = wake_queue.popleft
226            while True:
227                await _read_wait(wait_sock)
228                data = wait_sock.recv(1000)
229
230                # Process any waking tasks.  These are tasks that have
231                # been awakened externally to the event loop (e.g., by
232                # separate threads, Futures, etc.).
233                while wake_queue:
234                    task, future = wake_queue_popleft()
235                    # If the future associated with wakeup no longer
236                    # matches the future stored on the task, wakeup is
237                    # abandoned.  It means that a timeout or
238                    # cancellation event occurred in the time interval
239                    # between the call to wake() and the
240                    # subsequent processing of the waking task
241                    if future and task.future is not future:
242                        continue
243                    task.future = None
244                    task.state = 'READY'
245                    task.cancel_func = None
246                    ready_append(task)
247
248        # Force the kernel to wake, possibly scheduling a task to run.
249        # This method is called by threads running concurrently to the
250        # curio kernel.  For example, it's triggered upon completion of
251        # Futures created by thread pools and processes. It's inherently
252        # dangerous for any kind of operation on the kernel to be
253        # performed by a separate thread.  Thus, the *only* thing that
254        # happens here is that the task gets appended to a deque and a
255        # notification message is written to the kernel notification
256        # socket.  append() and pop() operations on deques are thread safe
257        # and do not need additional locking.  See
258        # https://docs.python.org/3/library/collections.html#collections.deque
259        # ----------
260        def wake(task=None, future=None):
261            if task:
262                wake_queue.append((task, future))
263
264            notify_sock.send(b'\x00')
265
266        def init_loopback():
267            nonlocal notify_sock, wait_sock
268            notify_sock, wait_sock = socket.socketpair()
269            wait_sock.setblocking(False)
270            notify_sock.setblocking(False)
271            kernel._call_at_shutdown(notify_sock.close)
272            kernel._call_at_shutdown(wait_sock.close)
273
274        # ------------------------------------------------------------
275        # Task management functions.
276        #
277
278        # Create a new task. Putting it on the ready queue
279        def new_task(coro):
280            task = taskcls(coro)
281            tasks[task.id] = task
282            reschedule_task(task)
283            for a in _activations:
284                a.created(task)
285            return task
286
287        # Reschedule a task, putting it back on the ready queue.
288        def reschedule_task(task):
289            assert task not in ready
290
291            ready_append(task)
292            task.state = 'READY'
293            task.cancel_func = None
294
295        # Suspend the current task
296        def suspend_task(state, cancel_func):
297            nonlocal current
298            current.state = state
299            current.cancel_func = cancel_func
300
301            # Unregister previous I/O request. Discussion follows:
302            #
303            # When a task performs I/O, it registers itself with the underlying
304            # I/O selector.  When the task is reawakened, it unregisters itself
305            # and prepares to run.  However, in many network applications, the
306            # task will perform a small amount of work and then go to sleep on
307            # exactly the same I/O resource that it was waiting on before. For
308            # example, a client handling task in a server will often spend most
309            # of its time waiting for incoming data on a single socket.
310            #
311            # Instead of always unregistering the task from the selector, we
312            # can defer the unregistration process until after the task goes
313            # back to sleep again.  If it happens to be sleeping on the same
314            # resource as before, there's no need to unregister it--it will
315            # still be registered from the last I/O operation.
316            #
317            # The code here performs the unregister step for a task that
318            # ran, but is now sleeping for a *different* reason than repeating the
319            # prior I/O operation.  There is coordination with code in trap_io().
320
321            if current._last_io:
322                unregister_event(*current._last_io)
323                current._last_io = None
324
325            current = None
326
327        # Check if task has pending cancellation
328        def check_cancellation():
329            if current.allow_cancel and current.cancel_pending:
330                current._trap_result = current.cancel_pending
331                current.cancel_pending = None
332                return True
333            else:
334                return False
335
336        # Set a timeout or sleep event on the current task
337        def set_timeout(clock, sleep_type='timeout'):
338            if clock is None:
339                sleepq.cancel((current.id, sleep_type), getattr(current, sleep_type))
340            else:
341                sleepq.push((current.id, sleep_type), clock)
342            setattr(current, sleep_type, clock)
343
344        # ------------------------------------------------------------
345        # I/O Support functions
346        #
347
348        def register_event(fileobj, event, task):
349            try:
350                key = selector_getkey(fileobj)
351                mask, (rtask, wtask) = key.events, key.data
352                if event == EVENT_READ and rtask:
353                    raise ReadResourceBusy(f"Multiple tasks can't wait to read on the same file descriptor {fileobj}")
354                if event == EVENT_WRITE and wtask:
355                    raise WriteResourceBusy(f"Multiple tasks can't wait to write on the same file descriptor {fileobj}")
356
357                selector_modify(fileobj, mask | event,
358                                (task, wtask) if event == EVENT_READ else (rtask, task))
359                selector_getkey(fileobj)
360            except KeyError:
361                selector_register(fileobj, event,
362                                  (task, None) if event == EVENT_READ else (None, task))
363
364        def unregister_event(fileobj, event):
365            key = selector_getkey(fileobj)
366            mask, (rtask, wtask) = key.events, key.data
367            mask &= ~event
368            if not mask:
369                selector_unregister(fileobj)
370            else:
371                selector_modify(fileobj, mask,
372                                (None, wtask) if event == EVENT_READ else (rtask, None))
373
374        # ------------------------------------------------------------
375        # Traps
376        #
377        # These implement the low-level functionality that is
378        # triggered by user-level code.  They are never invoked directly
379        # and there is no public API outside the kernel.  Instead,
380        # coroutines use a statement such as
381        #
382        #   yield ('trap_io', sock, EVENT_READ, 'READ_WAIT')
383        #
384        # to invoke a specific trap.
385        # ------------------------------------------------------------
386
387        # ----------------------------------------
388        # Wait for I/O
389        def trap_io(fileobj, event, state):
390            if check_cancellation():
391                return
392
393            # See comment about deferred unregister in suspend_task(). If the
394            # requested I/O operation is *different* than the last I/O operation
395            # that was performed by the task, we need to unregister the last I/O
396            # resource used and register a new one with the selector.
397            if current._last_io != (fileobj, event):
398                if current._last_io:
399                    unregister_event(*current._last_io)
400                try:
401                    register_event(fileobj, event, current)
402                except CurioError as e:
403                    current._trap_result = e
404                    return
405
406            # This step indicates that we have managed any deferred I/O management
407            # for the task.  Otherwise, I/O will be unregistered.
408            current._last_io = None
409            suspend_task(state, lambda: unregister_event(fileobj, event))
410
411        # ----------------------------------------
412        # Release any kernel resources associated with fileobj.
413        def trap_io_release(fileobj):
414            if current._last_io:
415                unregister_event(*current._last_io)
416                current._last_io = None
417            current._trap_result = None
418
419        # ----------------------------------------
420        # Return tasks currently waiting on a file obj.
421        def trap_io_waiting(fileobj):
422            try:
423                key = selector_getkey(fileobj)
424                rtask, wtask = key.data
425                rtask = rtask if rtask and rtask.cancel_func else None
426                wtask = wtask if wtask and wtask.cancel_func else None
427                current._trap_result = (rtask, wtask)
428            except KeyError:
429                current._trap_result = (None, None)
430
431        # ----------------------------------------
432        # Wait on a Future
433        def trap_future_wait(future, event):
434            if check_cancellation():
435                return
436
437            current.future = future
438
439            # Discussion: Each task records the future that it is
440            # currently waiting on.  The completion callback below only
441            # attempts to wake the task if its stored Future is exactly
442            # the same one that was stored above.  Due to support for
443            # cancellation and timeouts, it's possible that a task might
444            # abandon its attempt to wait for a Future and go on to
445            # perform other operations, including waiting for different
446            # Future in the future (got it?).  However, a running thread
447            # or process still might go on to eventually complete the
448            # earlier work.  In that case, it will trigger the callback,
449            # find that the task's current Future is now different, and
450            # discard the result.
451
452            future.add_done_callback(lambda fut, task=current: wake(task, fut))
453
454            # An optional threading.Event object can be passed and set to
455            # start a worker thread.   This makes it possible to have a lock-free
456            # Future implementation where worker threads only start after the
457            # callback function has been set above.
458            if event:
459                event.set()
460
461            suspend_task('FUTURE_WAIT',
462                          lambda task=current:
463                              setattr(task, 'future', future.cancel() and None))
464
465        # ----------------------------------------
466        # Add a new task to the kernel
467        def trap_spawn(coro):
468            task = new_task(coro)
469            task.parentid = current.id
470            current._trap_result = task
471
472        # ----------------------------------------
473        # Cancel a task
474        def trap_cancel_task(task, exc=TaskCancelled, val=None):
475            if task.cancelled:
476                return
477
478            task.cancelled = True
479
480            # Cancelling a task also cancels any currently pending timeout.
481            # If a task is being cancelled, the delivery of a timeout is
482            # somewhat immaterial--the task is already being cancelled.
483            task.timeout = None
484
485            # Set the cancellation exception
486            if isinstance(exc, BaseException):
487                task.cancel_pending = exc
488            else:
489                task.cancel_pending = exc(exc.__name__ if val is None else val)
490
491            # If the task doesn't allow the delivery of a cancellation exception right now
492            # we're done.  It's up to the task to check for it later
493            if not task.allow_cancel:
494                return
495
496            # If the task doesn't have a cancellation function set, it means the task
497            # is on the ready-queue.  It's not safe to deliver a cancellation exception
498            # to it right now.  Instead, we simply return.  It will get cancelled
499            # the next time it performs a blocking operation
500            if not task.cancel_func:
501                return
502
503            # Cancel and reschedule the task
504            task.cancel_func()
505            task._trap_result = task.cancel_pending
506            reschedule_task(task)
507            task.cancel_pending = None
508
509        # ----------------------------------------
510        # Wait on a scheduler primitive
511        def trap_sched_wait(sched, state):
512            if check_cancellation():
513                return
514            suspend_task(state, sched._kernel_suspend(current))
515
516        # ----------------------------------------
517        # Reschedule one or more tasks from a scheduler primitive
518        def trap_sched_wake(sched, n):
519            tasks = sched._kernel_wake(n)
520            for task in tasks:
521                reschedule_task(task)
522
523        # ----------------------------------------
524        # Return the current value of the kernel clock
525        def trap_clock():
526            current._trap_result = time_monotonic()
527
528        # ----------------------------------------
529        # Sleep for a specified period. Returns value of monotonic clock.
530        def trap_sleep(clock):
531            nonlocal current
532            if check_cancellation():
533                return
534
535            if clock <= 0:
536                reschedule_task(current)
537                current._trap_result = time_monotonic()
538                current = None
539                return
540
541            set_timeout(clock + time_monotonic(), 'sleep')
542            suspend_task('TIME_SLEEP',
543                          lambda task=current: (sleepq.cancel((task.id, 'sleep'), task.sleep), setattr(task, 'sleep', None)))
544
545        # ----------------------------------------
546        # Set a timeout to be delivered to the calling task
547        def trap_set_timeout(timeout):
548            old_timeout = current.timeout
549            if timeout is None:
550                # If no timeout period is given, leave the current timeout in effect
551                pass
552            else:
553                set_timeout(timeout)
554                if old_timeout and current.timeout > old_timeout:
555                    current.timeout = old_timeout
556            current._trap_result = old_timeout
557
558        # ----------------------------------------
559        # Clear a previously set timeout
560        def trap_unset_timeout(previous):
561            # Here's an evil corner case.  Suppose the previous timeout in effect
562            # has already expired?  If so, then we need to arrange for a timeout
563            # to be generated.  However, this has to happen on the *next* blocking
564            # call, not on this trap.  That's because the "unset" timeout feature
565            # is usually done in the finalization stage of the previous timeout
566            # handling.  If we were to raise a TaskTimeout here, it would get mixed
567            # up with the prior timeout handling and all manner of head-explosion
568            # will occur.
569
570            set_timeout(None)
571            current._trap_result = now = time_monotonic()
572            if previous and previous >= 0 and previous < now:
573                # Perhaps create a TaskTimeout pending exception here.
574                set_timeout(previous)
575            else:
576                set_timeout(previous)
577                current.timeout = previous
578                # But there's one other evil corner case.  It's possible that
579                # a timeout could be reset while a TaskTimeout exception
580                # is pending.  If that happens, it means that the task has
581                # left the timeout block.   We should probably take away the
582                # pending exception.
583                if isinstance(current.cancel_pending, TaskTimeout):
584                    current.cancel_pending = None
585
586        # ----------------------------------------
587        # Return the running kernel
588        def trap_get_kernel():
589            current._trap_result = kernel
590
591        # ----------------------------------------
592        # Return the currently running task
593        def trap_get_current():
594            current._trap_result = current
595
596        # ------------------------------------------------------------
597        # Final setup.
598        # ------------------------------------------------------------
599
600        # Create the traps tables
601        kernel._traps = traps = { key:value for key, value in locals().items()
602                                  if key.startswith('trap_') }
603
604        # Initialize activations
605        kernel._activations = _activations = \
606            [ act() if (isinstance(act, type) and issubclass(act, Activation)) else act
607                    for act in kernel._activations ]
608
609        for act in _activations:
610            act.activate(kernel)
611
612        # Initialize the loopback task (if not already initialized)
613        init_loopback()
614        task = new_task(_kernel_task())
615        task.daemon = True
616
617        # ------------------------------------------------------------
618        # Main Kernel Loop.  Runs the supplied coroutine until it
619        # terminates. If no coroutine is supplied, it runs one cycle
620        # of the kernel.
621        # ------------------------------------------------------------
622        def kernel_run(coro):
623            nonlocal current
624            main_task = new_task(coro) if coro else None
625            del coro
626            trap = None
627
628            while True:
629                # ------------------------------------------------------------
630                # I/O Polling/Waiting
631                # ------------------------------------------------------------
632
633                if ready or not main_task:
634                    timeout = 0
635                else:
636                    current_time = time_monotonic()
637                    timeout = sleepq.next_deadline(current_time)
638                    if selector_max_timeout and (timeout is None or timeout > selector_max_timeout):
639                        timeout = selector_max_timeout
640                try:
641                    events = selector_select(timeout)
642                except OSError as e:
643                    # If there is nothing to select, windows throws an
644                    # OSError, so just set events to an empty list.
645                    if e.errno != getattr(errno, 'WSAEINVAL', None):
646                        raise
647                    events = []
648
649                # Reschedule tasks with completed I/O
650                for key, mask in events:
651                    rtask, wtask = key.data
652                    emask = key.events
653                    intfd = isinstance(key.fileobj, int)
654                    if mask & EVENT_READ:
655                        # Discussion: If the associated fileobj is *not* a
656                        # bare integer file descriptor, we keep a record
657                        # of the last I/O event in _last_io and leave the
658                        # task registered on the event loop.  If it
659                        # performs the same I/O operation again, it will
660                        # get a speed boost from not having to re-register
661                        # its event. However, it's not safe to use this
662                        # optimization with bare integer fds.  These fds
663                        # often get reused and there is a possibility that
664                        # a fd will get closed and reopened on a different
665                        # resource without it being detected by the
666                        # kernel.  For that case, its critical that we not
667                        # leave the fd on the event loop.
668                        rtask._last_io = None if intfd else (key.fileobj, EVENT_READ)
669                        reschedule_task(rtask)
670                        emask &= ~EVENT_READ
671                        rtask = None
672
673                    if mask & EVENT_WRITE:
674                        wtask._last_io = None if intfd else (key.fileobj, EVENT_WRITE)
675                        reschedule_task(wtask)
676                        emask &= ~EVENT_WRITE
677                        wtask = None
678
679                    # Unregister the task if fileobj is not an integer fd (see
680                    # note above).
681                    if intfd:
682                        if emask:
683                            selector_modify(key.fileobj, emask, (rtask, wtask))
684                        else:
685                            selector_unregister(key.fileobj)
686
687
688                # ------------------------------------------------------------
689                # Time handling (sleep/timeouts)
690                # ------------------------------------------------------------
691
692                current_time = time_monotonic()
693                for tm, (taskid, sleep_type) in sleepq.expired(current_time):
694                    # When a task wakes, verify that the timeout value matches that stored
695                    # on the task. If it differs, it means that the task completed its
696                    # operation, was cancelled, or is no longer concerned with this
697                    # sleep operation.  In that case, we do nothing
698                    task = tasks.get(taskid)
699
700                    if task is None:
701                        continue
702                    if tm != getattr(task, sleep_type):
703                        continue
704
705                    setattr(task, sleep_type, None)
706
707                    if sleep_type == 'sleep':
708                        task._trap_result = current_time
709                        reschedule_task(task)
710
711                    # If cancellation is allowed and the task is blocked, reschedule it
712                    elif task.allow_cancel and task.cancel_func:
713                        task.cancel_func()
714                        task._trap_result = TaskTimeout(current_time)
715                        reschedule_task(task)
716
717                    # Task is on the ready queue or can't be cancelled right now;
718                    # mark it as pending cancellation
719                    else:
720                        task.cancel_pending = TaskTimeout(current_time)
721
722                # ------------------------------------------------------------
723                # Run ready tasks
724                # ------------------------------------------------------------
725
726                for _ in range(len(ready)):
727                    active = current = ready_popleft()
728                    for a in _activations:
729                        a.running(active)
730                    active.state = 'RUNNING'
731                    active.cycles += 1
732
733                    # The current task runs until it suspends or terminates
734                    while current:
735                        try:
736                            trap = current.send(current._trap_result)
737                        except BaseException as e:
738                            # If any exception has occurred, the task is done.
739                            current = None
740
741                            # Wake all joining tasks and enter the terminated state.
742                            for wtask in active.joining._kernel_wake(len(active.joining)):
743                                reschedule_task(wtask)
744                            active.terminated = True
745                            active.state = 'TERMINATED'
746                            del tasks[active.id]
747                            active.timeout = None
748                            # Normal termination (set the result)
749                            if isinstance(e, StopIteration):
750                                active.result = e.value
751                            else:
752                                # Abnormal termination (set an exception)
753                                active.exception = e
754                                if (active != main_task and not isinstance(e, (CancelledError, SystemExit))):
755                                    log.error('Task Crash: %r', active, exc_info=True)
756                                if not isinstance(e, (Exception, CancelledError)):
757                                    raise
758                            break
759
760                        # Run the trap function.  This is never supposed to raise
761                        # an exception unless there's a fatal programming error in
762                        # the kernel itself.  Such errors cause Curio to die. They
763                        # are not reported back to tasks.
764                        current._trap_result = None
765                        try:
766                            traps[trap[0]](*trap[1:])
767                        except:
768                            # Disable any further use of the kernel on fatal crash.
769                            kernel._shutdown_funcs = None
770                            raise
771
772                    # --- The active task has suspended
773
774                    # Unregister any prior I/O listening
775                    if active._last_io:
776                        unregister_event(*active._last_io)
777                        active._last_io = None
778
779                    # Trigger scheduler activations (if any)
780                    for a in _activations:
781                        a.suspended(active, trap)
782                        if active.terminated:
783                            a.terminated(active)
784                    current = active = trap = None
785
786                # If the main task has terminated, we're done.
787                if main_task:
788                    if main_task.terminated:
789                        main_task.joined = True
790                        return main_task
791                else:
792                    return None
793
794        return kernel_run
795
796
797def run(corofunc, *args, with_monitor=False, selector=None,
798        debug=None, activations=None, **kernel_extra):
799    '''
800    Run the curio kernel with an initial task and execute until all
801    tasks terminate.  Returns the task's final result (if any). This
802    is a convenience function that should primarily be used for
803    launching the top-level task of a curio-based application.  It
804    creates an entirely new kernel, runs the given task to completion,
805    and concludes by shutting down the kernel, releasing all resources used.
806
807    Don't use this function if you're repeatedly launching a lot of
808    new tasks to run in curio. Instead, create a Kernel instance and
809    use its run() method instead.
810    '''
811    kernel = Kernel(selector=selector, debug=debug, activations=activations,
812                    **kernel_extra)
813
814    # Check if a monitor has been requested
815    if with_monitor or 'CURIOMONITOR' in os.environ:
816        from .monitor import Monitor
817        m = Monitor(kernel)
818        kernel._call_at_shutdown(m.close)
819        kernel.run(m.start)
820
821    with kernel:
822        return kernel.run(corofunc, *args)
823
824# An Activation is used to monitor and effect what happens
825# during task execution in the Curio kernel. They are often used to
826# implement tracers, debuggers, and other diagonistic tools.
827# See curio/debug.py for some specific examples.
828
829class Activation:
830
831    def activate(self, kernel):
832        '''
833        Called each time the kernel sets up its environment and is ready to run.
834        kernel is an instance of the kernel that's executing.
835        '''
836
837    def created(self, task):
838        '''
839        Called immediately after a task has been created.
840        '''
841
842    def running(self, task):
843        '''
844        Called right before the next execution cycle of a task.
845        '''
846
847    def suspended(self, task, trap):
848        '''
849        Called after the task has suspended due to a trap.
850        '''
851
852    def terminated(self, task):
853        '''
854        Called after a task has terminated, but prior to the task
855        being collected by any associated join() operation.
856        '''
857