1# curio/kernel.py 2# 3# Main execution kernel. 4# 5# Curio is based on a few overarching design principles that drive the code 6# you'll find here. 7# 8# 1. Environmental Isolation. 9# 10# Curio strictly separates the environment of async and synchronous 11# programming. All functionality related to async operation is 12# placed in async-function definitions. Async functions request 13# the services of the kernel using low-level yield statements 14# (traps). The kernel is an opaque black-box from the perspective 15# of synchronous code. There is only one available 16# operation--run(coro) which runs a new task. There are no other 17# mechanisms available for interacting with the kernel from 18# synchronous code. A good analogy might be the distinction 19# between user and protected mode in an OS. User programs run in 20# user-mode and the operating system kernel runs in protected mode. 21# The same thing happens here. User programs in Curio can only run 22# in async functions. Those programs can request the services of 23# the kernel. However, they're not granted any further access than 24# that (there is no API surface or anything that can be used). 25# 26# 2. Microkernels 27# 28# The low-level kernel is meant to be small, fast, and minimally 29# featureful. In fact, almost nothing interesting happens in the 30# kernel. Instead, almost every useful part of Curio gets 31# implemented in async functions found elsewhere. If you're trying 32# to add new features to Curio, don't add them to the kernel. Think 33# about how to create objects and functions that operate at the 34# async-function level instead. See files such as sync.py or 35# queue.py for examples. 36# 37# 3. Decoupling 38# 39# No part of Curio has direct linkage to the Kernel class (it's 40# not imported or used anywhere else in the code base). If you want, 41# you can make a completely custom Kernel object and have the 42# rest of Curio run on it. You just need to make sure you implement 43# the required traps. This is in contrast to libraries such as 44# asyncio where many parts of the implementation are required to 45# carry a reference to the underlying event loop. 46 47__all__ = [ 'Kernel', 'run' ] 48 49# -- Standard Library 50 51import socket 52import time 53import os 54import errno 55from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE 56from collections import deque 57import threading 58 59# Logger where uncaught exceptions from crashed tasks are logged 60import logging 61log = logging.getLogger(__name__) 62 63# -- Curio 64 65from .errors import * 66from .task import Task 67from .traps import _read_wait 68from . import meta 69from .timequeue import TimeQueue 70 71 72class Kernel(object): 73 ''' 74 Curio run-time kernel. The selector argument specifies a 75 different I/O selector. The debug argument specifies a list of 76 debugger objects to apply. For example: 77 78 from curio.debug import schedtrace, traptrace 79 k = Kernel(debug=[schedtrace, traptrace]) 80 81 Use the kernel run() method to submit work to the kernel. 82 ''' 83 84 def __init__(self, *, selector=None, debug=None, activations=None, taskcls=Task, 85 max_select_timeout=None if os.name != 'nt' else 1.0): 86 87 # Functions to call at shutdown 88 self._shutdown_funcs = [] 89 90 # I/O Selector setup 91 self._selector = selector if selector else DefaultSelector() 92 self._call_at_shutdown(self._selector.close) 93 94 # Task table 95 self._tasks = {} 96 97 # Coroutine runner function (created upon first call to run()) 98 self._runner = None 99 100 # Activations 101 self._activations = activations if activations else [] 102 103 # Debugging (activations in disguise) 104 if debug: 105 from .debug import _create_debuggers 106 self._activations.extend(_create_debuggers(debug)) 107 108 # Task creation class 109 self._taskcls = taskcls 110 111 self._max_select_timeout = max_select_timeout 112 113 114 def __del__(self): 115 if self._shutdown_funcs is not None: 116 raise RuntimeError( 117 'Curio kernel not properly terminated. Please use Kernel.run(shutdown=True)') 118 119 def __enter__(self): 120 return self 121 122 def __exit__(self, ty, val, tb): 123 if self._shutdown_funcs is not None: 124 self.run(shutdown=True) 125 126 def _call_at_shutdown(self, func): 127 self._shutdown_funcs.append(func) 128 129 130 # ---------- 131 # Submit a new task to the kernel 132 133 def run(self, corofunc=None, *args, shutdown=False): 134 if self._shutdown_funcs is None: 135 raise RuntimeError("Can't run a kernel that's been shut down or crashed. Create a new kernel.") 136 137 coro = meta.instantiate_coroutine(corofunc, *args) if corofunc else None 138 with meta.running(self): 139 # Make the kernel runtime environment (if needed) 140 if not self._runner: 141 self._runner = self._make_kernel_runtime() 142 143 ret_val = ret_exc = None 144 # Run the supplied coroutine (if any) 145 if coro or not shutdown: 146 task = self._runner(coro) 147 if task: 148 ret_exc = task.exception 149 ret_val = task.result if not ret_exc else None 150 del task 151 152 # If shutdown has been requested, run the shutdown process 153 if shutdown: 154 # For "reasons" related to task scheduling, the task 155 # of shutting down all remaining tasks is best managed 156 # by a launching a task dedicated to carrying out the task (sic) 157 async def _shutdown_tasks(tocancel): 158 for task in tocancel: 159 await task.cancel() 160 161 tocancel = sorted(self._tasks.values(), key=lambda t: t.id, reverse=True) 162 self._runner(_shutdown_tasks(tocancel)) 163 assert not self._tasks, "New tasks created during shutdown" 164 self._runner = None 165 166 # Call registered shutdown functions 167 for func in self._shutdown_funcs: 168 func() 169 self._shutdown_funcs = None 170 171 if ret_exc: 172 raise ret_exc 173 else: 174 return ret_val 175 176 # ------------------------------------------------------------ 177 # Kernel runtime 178 # 179 # This function creates the kernel execution environment. It 180 # returns a single function (a closure) that executes a coroutine. 181 # 182 # At first glance, this function is going to look giant and 183 # insane. It is implementing the kernel runtime as a self-contained 184 # black box. There is no external API. The only possible 185 # communication is via traps defined in curio/traps.py. 186 # It's best to think of this as a "program within a program". 187 188 def _make_kernel_runtime(kernel): 189 190 # Motto: "What happens in the kernel stays in the kernel" 191 192 # ---- Kernel State 193 current = None # Currently running task 194 selector = kernel._selector # Event selector 195 ready = deque() # Ready queue 196 tasks = kernel._tasks # Task table 197 sleepq = TimeQueue() # Sleeping task queue 198 wake_queue = deque() # Thread wake queue 199 _activations = [] 200 201 # ---- Bound methods 202 selector_register = selector.register 203 selector_unregister = selector.unregister 204 selector_modify = selector.modify 205 selector_select = selector.select 206 selector_getkey = selector.get_key 207 selector_max_timeout = kernel._max_select_timeout 208 209 ready_popleft = ready.popleft 210 ready_append = ready.append 211 time_monotonic = time.monotonic 212 taskcls = kernel._taskcls 213 214 # ------------------------------------------------------------ 215 # In-kernel task used for processing futures. 216 # 217 # Internal task that monitors the loopback socket--allowing the kernel to 218 # awake for non-I/O events. 219 220 # Loop-back sockets 221 notify_sock = None 222 wait_sock = None 223 224 async def _kernel_task(): 225 wake_queue_popleft = wake_queue.popleft 226 while True: 227 await _read_wait(wait_sock) 228 data = wait_sock.recv(1000) 229 230 # Process any waking tasks. These are tasks that have 231 # been awakened externally to the event loop (e.g., by 232 # separate threads, Futures, etc.). 233 while wake_queue: 234 task, future = wake_queue_popleft() 235 # If the future associated with wakeup no longer 236 # matches the future stored on the task, wakeup is 237 # abandoned. It means that a timeout or 238 # cancellation event occurred in the time interval 239 # between the call to wake() and the 240 # subsequent processing of the waking task 241 if future and task.future is not future: 242 continue 243 task.future = None 244 task.state = 'READY' 245 task.cancel_func = None 246 ready_append(task) 247 248 # Force the kernel to wake, possibly scheduling a task to run. 249 # This method is called by threads running concurrently to the 250 # curio kernel. For example, it's triggered upon completion of 251 # Futures created by thread pools and processes. It's inherently 252 # dangerous for any kind of operation on the kernel to be 253 # performed by a separate thread. Thus, the *only* thing that 254 # happens here is that the task gets appended to a deque and a 255 # notification message is written to the kernel notification 256 # socket. append() and pop() operations on deques are thread safe 257 # and do not need additional locking. See 258 # https://docs.python.org/3/library/collections.html#collections.deque 259 # ---------- 260 def wake(task=None, future=None): 261 if task: 262 wake_queue.append((task, future)) 263 264 notify_sock.send(b'\x00') 265 266 def init_loopback(): 267 nonlocal notify_sock, wait_sock 268 notify_sock, wait_sock = socket.socketpair() 269 wait_sock.setblocking(False) 270 notify_sock.setblocking(False) 271 kernel._call_at_shutdown(notify_sock.close) 272 kernel._call_at_shutdown(wait_sock.close) 273 274 # ------------------------------------------------------------ 275 # Task management functions. 276 # 277 278 # Create a new task. Putting it on the ready queue 279 def new_task(coro): 280 task = taskcls(coro) 281 tasks[task.id] = task 282 reschedule_task(task) 283 for a in _activations: 284 a.created(task) 285 return task 286 287 # Reschedule a task, putting it back on the ready queue. 288 def reschedule_task(task): 289 assert task not in ready 290 291 ready_append(task) 292 task.state = 'READY' 293 task.cancel_func = None 294 295 # Suspend the current task 296 def suspend_task(state, cancel_func): 297 nonlocal current 298 current.state = state 299 current.cancel_func = cancel_func 300 301 # Unregister previous I/O request. Discussion follows: 302 # 303 # When a task performs I/O, it registers itself with the underlying 304 # I/O selector. When the task is reawakened, it unregisters itself 305 # and prepares to run. However, in many network applications, the 306 # task will perform a small amount of work and then go to sleep on 307 # exactly the same I/O resource that it was waiting on before. For 308 # example, a client handling task in a server will often spend most 309 # of its time waiting for incoming data on a single socket. 310 # 311 # Instead of always unregistering the task from the selector, we 312 # can defer the unregistration process until after the task goes 313 # back to sleep again. If it happens to be sleeping on the same 314 # resource as before, there's no need to unregister it--it will 315 # still be registered from the last I/O operation. 316 # 317 # The code here performs the unregister step for a task that 318 # ran, but is now sleeping for a *different* reason than repeating the 319 # prior I/O operation. There is coordination with code in trap_io(). 320 321 if current._last_io: 322 unregister_event(*current._last_io) 323 current._last_io = None 324 325 current = None 326 327 # Check if task has pending cancellation 328 def check_cancellation(): 329 if current.allow_cancel and current.cancel_pending: 330 current._trap_result = current.cancel_pending 331 current.cancel_pending = None 332 return True 333 else: 334 return False 335 336 # Set a timeout or sleep event on the current task 337 def set_timeout(clock, sleep_type='timeout'): 338 if clock is None: 339 sleepq.cancel((current.id, sleep_type), getattr(current, sleep_type)) 340 else: 341 sleepq.push((current.id, sleep_type), clock) 342 setattr(current, sleep_type, clock) 343 344 # ------------------------------------------------------------ 345 # I/O Support functions 346 # 347 348 def register_event(fileobj, event, task): 349 try: 350 key = selector_getkey(fileobj) 351 mask, (rtask, wtask) = key.events, key.data 352 if event == EVENT_READ and rtask: 353 raise ReadResourceBusy(f"Multiple tasks can't wait to read on the same file descriptor {fileobj}") 354 if event == EVENT_WRITE and wtask: 355 raise WriteResourceBusy(f"Multiple tasks can't wait to write on the same file descriptor {fileobj}") 356 357 selector_modify(fileobj, mask | event, 358 (task, wtask) if event == EVENT_READ else (rtask, task)) 359 selector_getkey(fileobj) 360 except KeyError: 361 selector_register(fileobj, event, 362 (task, None) if event == EVENT_READ else (None, task)) 363 364 def unregister_event(fileobj, event): 365 key = selector_getkey(fileobj) 366 mask, (rtask, wtask) = key.events, key.data 367 mask &= ~event 368 if not mask: 369 selector_unregister(fileobj) 370 else: 371 selector_modify(fileobj, mask, 372 (None, wtask) if event == EVENT_READ else (rtask, None)) 373 374 # ------------------------------------------------------------ 375 # Traps 376 # 377 # These implement the low-level functionality that is 378 # triggered by user-level code. They are never invoked directly 379 # and there is no public API outside the kernel. Instead, 380 # coroutines use a statement such as 381 # 382 # yield ('trap_io', sock, EVENT_READ, 'READ_WAIT') 383 # 384 # to invoke a specific trap. 385 # ------------------------------------------------------------ 386 387 # ---------------------------------------- 388 # Wait for I/O 389 def trap_io(fileobj, event, state): 390 if check_cancellation(): 391 return 392 393 # See comment about deferred unregister in suspend_task(). If the 394 # requested I/O operation is *different* than the last I/O operation 395 # that was performed by the task, we need to unregister the last I/O 396 # resource used and register a new one with the selector. 397 if current._last_io != (fileobj, event): 398 if current._last_io: 399 unregister_event(*current._last_io) 400 try: 401 register_event(fileobj, event, current) 402 except CurioError as e: 403 current._trap_result = e 404 return 405 406 # This step indicates that we have managed any deferred I/O management 407 # for the task. Otherwise, I/O will be unregistered. 408 current._last_io = None 409 suspend_task(state, lambda: unregister_event(fileobj, event)) 410 411 # ---------------------------------------- 412 # Release any kernel resources associated with fileobj. 413 def trap_io_release(fileobj): 414 if current._last_io: 415 unregister_event(*current._last_io) 416 current._last_io = None 417 current._trap_result = None 418 419 # ---------------------------------------- 420 # Return tasks currently waiting on a file obj. 421 def trap_io_waiting(fileobj): 422 try: 423 key = selector_getkey(fileobj) 424 rtask, wtask = key.data 425 rtask = rtask if rtask and rtask.cancel_func else None 426 wtask = wtask if wtask and wtask.cancel_func else None 427 current._trap_result = (rtask, wtask) 428 except KeyError: 429 current._trap_result = (None, None) 430 431 # ---------------------------------------- 432 # Wait on a Future 433 def trap_future_wait(future, event): 434 if check_cancellation(): 435 return 436 437 current.future = future 438 439 # Discussion: Each task records the future that it is 440 # currently waiting on. The completion callback below only 441 # attempts to wake the task if its stored Future is exactly 442 # the same one that was stored above. Due to support for 443 # cancellation and timeouts, it's possible that a task might 444 # abandon its attempt to wait for a Future and go on to 445 # perform other operations, including waiting for different 446 # Future in the future (got it?). However, a running thread 447 # or process still might go on to eventually complete the 448 # earlier work. In that case, it will trigger the callback, 449 # find that the task's current Future is now different, and 450 # discard the result. 451 452 future.add_done_callback(lambda fut, task=current: wake(task, fut)) 453 454 # An optional threading.Event object can be passed and set to 455 # start a worker thread. This makes it possible to have a lock-free 456 # Future implementation where worker threads only start after the 457 # callback function has been set above. 458 if event: 459 event.set() 460 461 suspend_task('FUTURE_WAIT', 462 lambda task=current: 463 setattr(task, 'future', future.cancel() and None)) 464 465 # ---------------------------------------- 466 # Add a new task to the kernel 467 def trap_spawn(coro): 468 task = new_task(coro) 469 task.parentid = current.id 470 current._trap_result = task 471 472 # ---------------------------------------- 473 # Cancel a task 474 def trap_cancel_task(task, exc=TaskCancelled, val=None): 475 if task.cancelled: 476 return 477 478 task.cancelled = True 479 480 # Cancelling a task also cancels any currently pending timeout. 481 # If a task is being cancelled, the delivery of a timeout is 482 # somewhat immaterial--the task is already being cancelled. 483 task.timeout = None 484 485 # Set the cancellation exception 486 if isinstance(exc, BaseException): 487 task.cancel_pending = exc 488 else: 489 task.cancel_pending = exc(exc.__name__ if val is None else val) 490 491 # If the task doesn't allow the delivery of a cancellation exception right now 492 # we're done. It's up to the task to check for it later 493 if not task.allow_cancel: 494 return 495 496 # If the task doesn't have a cancellation function set, it means the task 497 # is on the ready-queue. It's not safe to deliver a cancellation exception 498 # to it right now. Instead, we simply return. It will get cancelled 499 # the next time it performs a blocking operation 500 if not task.cancel_func: 501 return 502 503 # Cancel and reschedule the task 504 task.cancel_func() 505 task._trap_result = task.cancel_pending 506 reschedule_task(task) 507 task.cancel_pending = None 508 509 # ---------------------------------------- 510 # Wait on a scheduler primitive 511 def trap_sched_wait(sched, state): 512 if check_cancellation(): 513 return 514 suspend_task(state, sched._kernel_suspend(current)) 515 516 # ---------------------------------------- 517 # Reschedule one or more tasks from a scheduler primitive 518 def trap_sched_wake(sched, n): 519 tasks = sched._kernel_wake(n) 520 for task in tasks: 521 reschedule_task(task) 522 523 # ---------------------------------------- 524 # Return the current value of the kernel clock 525 def trap_clock(): 526 current._trap_result = time_monotonic() 527 528 # ---------------------------------------- 529 # Sleep for a specified period. Returns value of monotonic clock. 530 def trap_sleep(clock): 531 nonlocal current 532 if check_cancellation(): 533 return 534 535 if clock <= 0: 536 reschedule_task(current) 537 current._trap_result = time_monotonic() 538 current = None 539 return 540 541 set_timeout(clock + time_monotonic(), 'sleep') 542 suspend_task('TIME_SLEEP', 543 lambda task=current: (sleepq.cancel((task.id, 'sleep'), task.sleep), setattr(task, 'sleep', None))) 544 545 # ---------------------------------------- 546 # Set a timeout to be delivered to the calling task 547 def trap_set_timeout(timeout): 548 old_timeout = current.timeout 549 if timeout is None: 550 # If no timeout period is given, leave the current timeout in effect 551 pass 552 else: 553 set_timeout(timeout) 554 if old_timeout and current.timeout > old_timeout: 555 current.timeout = old_timeout 556 current._trap_result = old_timeout 557 558 # ---------------------------------------- 559 # Clear a previously set timeout 560 def trap_unset_timeout(previous): 561 # Here's an evil corner case. Suppose the previous timeout in effect 562 # has already expired? If so, then we need to arrange for a timeout 563 # to be generated. However, this has to happen on the *next* blocking 564 # call, not on this trap. That's because the "unset" timeout feature 565 # is usually done in the finalization stage of the previous timeout 566 # handling. If we were to raise a TaskTimeout here, it would get mixed 567 # up with the prior timeout handling and all manner of head-explosion 568 # will occur. 569 570 set_timeout(None) 571 current._trap_result = now = time_monotonic() 572 if previous and previous >= 0 and previous < now: 573 # Perhaps create a TaskTimeout pending exception here. 574 set_timeout(previous) 575 else: 576 set_timeout(previous) 577 current.timeout = previous 578 # But there's one other evil corner case. It's possible that 579 # a timeout could be reset while a TaskTimeout exception 580 # is pending. If that happens, it means that the task has 581 # left the timeout block. We should probably take away the 582 # pending exception. 583 if isinstance(current.cancel_pending, TaskTimeout): 584 current.cancel_pending = None 585 586 # ---------------------------------------- 587 # Return the running kernel 588 def trap_get_kernel(): 589 current._trap_result = kernel 590 591 # ---------------------------------------- 592 # Return the currently running task 593 def trap_get_current(): 594 current._trap_result = current 595 596 # ------------------------------------------------------------ 597 # Final setup. 598 # ------------------------------------------------------------ 599 600 # Create the traps tables 601 kernel._traps = traps = { key:value for key, value in locals().items() 602 if key.startswith('trap_') } 603 604 # Initialize activations 605 kernel._activations = _activations = \ 606 [ act() if (isinstance(act, type) and issubclass(act, Activation)) else act 607 for act in kernel._activations ] 608 609 for act in _activations: 610 act.activate(kernel) 611 612 # Initialize the loopback task (if not already initialized) 613 init_loopback() 614 task = new_task(_kernel_task()) 615 task.daemon = True 616 617 # ------------------------------------------------------------ 618 # Main Kernel Loop. Runs the supplied coroutine until it 619 # terminates. If no coroutine is supplied, it runs one cycle 620 # of the kernel. 621 # ------------------------------------------------------------ 622 def kernel_run(coro): 623 nonlocal current 624 main_task = new_task(coro) if coro else None 625 del coro 626 trap = None 627 628 while True: 629 # ------------------------------------------------------------ 630 # I/O Polling/Waiting 631 # ------------------------------------------------------------ 632 633 if ready or not main_task: 634 timeout = 0 635 else: 636 current_time = time_monotonic() 637 timeout = sleepq.next_deadline(current_time) 638 if selector_max_timeout and (timeout is None or timeout > selector_max_timeout): 639 timeout = selector_max_timeout 640 try: 641 events = selector_select(timeout) 642 except OSError as e: 643 # If there is nothing to select, windows throws an 644 # OSError, so just set events to an empty list. 645 if e.errno != getattr(errno, 'WSAEINVAL', None): 646 raise 647 events = [] 648 649 # Reschedule tasks with completed I/O 650 for key, mask in events: 651 rtask, wtask = key.data 652 emask = key.events 653 intfd = isinstance(key.fileobj, int) 654 if mask & EVENT_READ: 655 # Discussion: If the associated fileobj is *not* a 656 # bare integer file descriptor, we keep a record 657 # of the last I/O event in _last_io and leave the 658 # task registered on the event loop. If it 659 # performs the same I/O operation again, it will 660 # get a speed boost from not having to re-register 661 # its event. However, it's not safe to use this 662 # optimization with bare integer fds. These fds 663 # often get reused and there is a possibility that 664 # a fd will get closed and reopened on a different 665 # resource without it being detected by the 666 # kernel. For that case, its critical that we not 667 # leave the fd on the event loop. 668 rtask._last_io = None if intfd else (key.fileobj, EVENT_READ) 669 reschedule_task(rtask) 670 emask &= ~EVENT_READ 671 rtask = None 672 673 if mask & EVENT_WRITE: 674 wtask._last_io = None if intfd else (key.fileobj, EVENT_WRITE) 675 reschedule_task(wtask) 676 emask &= ~EVENT_WRITE 677 wtask = None 678 679 # Unregister the task if fileobj is not an integer fd (see 680 # note above). 681 if intfd: 682 if emask: 683 selector_modify(key.fileobj, emask, (rtask, wtask)) 684 else: 685 selector_unregister(key.fileobj) 686 687 688 # ------------------------------------------------------------ 689 # Time handling (sleep/timeouts) 690 # ------------------------------------------------------------ 691 692 current_time = time_monotonic() 693 for tm, (taskid, sleep_type) in sleepq.expired(current_time): 694 # When a task wakes, verify that the timeout value matches that stored 695 # on the task. If it differs, it means that the task completed its 696 # operation, was cancelled, or is no longer concerned with this 697 # sleep operation. In that case, we do nothing 698 task = tasks.get(taskid) 699 700 if task is None: 701 continue 702 if tm != getattr(task, sleep_type): 703 continue 704 705 setattr(task, sleep_type, None) 706 707 if sleep_type == 'sleep': 708 task._trap_result = current_time 709 reschedule_task(task) 710 711 # If cancellation is allowed and the task is blocked, reschedule it 712 elif task.allow_cancel and task.cancel_func: 713 task.cancel_func() 714 task._trap_result = TaskTimeout(current_time) 715 reschedule_task(task) 716 717 # Task is on the ready queue or can't be cancelled right now; 718 # mark it as pending cancellation 719 else: 720 task.cancel_pending = TaskTimeout(current_time) 721 722 # ------------------------------------------------------------ 723 # Run ready tasks 724 # ------------------------------------------------------------ 725 726 for _ in range(len(ready)): 727 active = current = ready_popleft() 728 for a in _activations: 729 a.running(active) 730 active.state = 'RUNNING' 731 active.cycles += 1 732 733 # The current task runs until it suspends or terminates 734 while current: 735 try: 736 trap = current.send(current._trap_result) 737 except BaseException as e: 738 # If any exception has occurred, the task is done. 739 current = None 740 741 # Wake all joining tasks and enter the terminated state. 742 for wtask in active.joining._kernel_wake(len(active.joining)): 743 reschedule_task(wtask) 744 active.terminated = True 745 active.state = 'TERMINATED' 746 del tasks[active.id] 747 active.timeout = None 748 # Normal termination (set the result) 749 if isinstance(e, StopIteration): 750 active.result = e.value 751 else: 752 # Abnormal termination (set an exception) 753 active.exception = e 754 if (active != main_task and not isinstance(e, (CancelledError, SystemExit))): 755 log.error('Task Crash: %r', active, exc_info=True) 756 if not isinstance(e, (Exception, CancelledError)): 757 raise 758 break 759 760 # Run the trap function. This is never supposed to raise 761 # an exception unless there's a fatal programming error in 762 # the kernel itself. Such errors cause Curio to die. They 763 # are not reported back to tasks. 764 current._trap_result = None 765 try: 766 traps[trap[0]](*trap[1:]) 767 except: 768 # Disable any further use of the kernel on fatal crash. 769 kernel._shutdown_funcs = None 770 raise 771 772 # --- The active task has suspended 773 774 # Unregister any prior I/O listening 775 if active._last_io: 776 unregister_event(*active._last_io) 777 active._last_io = None 778 779 # Trigger scheduler activations (if any) 780 for a in _activations: 781 a.suspended(active, trap) 782 if active.terminated: 783 a.terminated(active) 784 current = active = trap = None 785 786 # If the main task has terminated, we're done. 787 if main_task: 788 if main_task.terminated: 789 main_task.joined = True 790 return main_task 791 else: 792 return None 793 794 return kernel_run 795 796 797def run(corofunc, *args, with_monitor=False, selector=None, 798 debug=None, activations=None, **kernel_extra): 799 ''' 800 Run the curio kernel with an initial task and execute until all 801 tasks terminate. Returns the task's final result (if any). This 802 is a convenience function that should primarily be used for 803 launching the top-level task of a curio-based application. It 804 creates an entirely new kernel, runs the given task to completion, 805 and concludes by shutting down the kernel, releasing all resources used. 806 807 Don't use this function if you're repeatedly launching a lot of 808 new tasks to run in curio. Instead, create a Kernel instance and 809 use its run() method instead. 810 ''' 811 kernel = Kernel(selector=selector, debug=debug, activations=activations, 812 **kernel_extra) 813 814 # Check if a monitor has been requested 815 if with_monitor or 'CURIOMONITOR' in os.environ: 816 from .monitor import Monitor 817 m = Monitor(kernel) 818 kernel._call_at_shutdown(m.close) 819 kernel.run(m.start) 820 821 with kernel: 822 return kernel.run(corofunc, *args) 823 824# An Activation is used to monitor and effect what happens 825# during task execution in the Curio kernel. They are often used to 826# implement tracers, debuggers, and other diagonistic tools. 827# See curio/debug.py for some specific examples. 828 829class Activation: 830 831 def activate(self, kernel): 832 ''' 833 Called each time the kernel sets up its environment and is ready to run. 834 kernel is an instance of the kernel that's executing. 835 ''' 836 837 def created(self, task): 838 ''' 839 Called immediately after a task has been created. 840 ''' 841 842 def running(self, task): 843 ''' 844 Called right before the next execution cycle of a task. 845 ''' 846 847 def suspended(self, task, trap): 848 ''' 849 Called after the task has suspended due to a trap. 850 ''' 851 852 def terminated(self, task): 853 ''' 854 Called after a task has terminated, but prior to the task 855 being collected by any associated join() operation. 856 ''' 857