1Reference Manual 2================ 3 4Coroutines 5---------- 6 7Curio executes coroutines. A coroutine is a function defined using 8``async def``:: 9 10 async def hello(name): 11 return 'Hello ' + name 12 13Coroutines call other coroutines using ``await``:: 14 15 async def main(name): 16 s = await hello(name) 17 print(s) 18 19Coroutines never run on their own. 20They always execute under the supervision of a manager (e.g., an 21event-loop, a kernel, etc.). In Curio, the initial coroutine is 22executed using ``run()``:: 23 24 import curio 25 curio.run(main, 'Guido') 26 27When executing, a coroutine is encapsulated by a "Task." 28 29Basic Execution 30--------------- 31 32The following function runs an initial coroutine: 33 34.. function:: run(corofunc, *args, debug=None, selector=None, with_monitor=False, taskcls=Task) 35 36 Run *corofunc* and return its result. *args* are the arguments 37 provided to *corofunc*. *with_monitor* enables the task monitor. 38 *selector* is an optional selector from the :mod:`selectors 39 <python:selectors>` standard library. *debug* is a list of 40 debugging features (see the section on debugging). *taskcls* is 41 the class used to encapsulate coroutines. If ``run()`` is called 42 when a task is already running, a ``RuntimeError`` is raised. 43 44If you are going to repeatedly execute coroutines one after the other, it 45is more efficient to create a ``Kernel`` instance and submit 46them using the ``run()`` method. 47 48.. class:: Kernel(selector=None, debug=None, taskcls=Task): 49 50 Create a runtime kernel. The arguments are the same 51 as described above for :func:`run()`. 52 53There is only one method that may be used on a :class:`Kernel` instance. 54 55.. method:: Kernel.run(corofunc=None, *args, shutdown=False) 56 57 Run *corofunc* and return its result. 58 *args* are the arguments given to *corofunc*. If 59 *shutdown* is ``True``, the kernel cancels all remaining tasks 60 and performs a clean shutdown upon return. Calling this method with *corofunc* 61 set to ``None`` executes a single scheduling cycle of background tasks 62 before returning immediately. Raises a 63 ``RuntimeError`` if called on an already running kernel or if an attempt is 64 made to run more than one kernel in the same thread. 65 66A kernel is commonly used as a context manager. For example:: 67 68 with Kernel() as kernel: 69 kernel.run(corofunc1) 70 kernel.run(corofunc2) 71 ... 72 # Kernel shuts down here 73 74When submitting work, you can either provide an async 75function and arguments or you can provide an already instantiated 76coroutine. Both of these ``run()`` invocations work:: 77 78 async def hello(name): 79 print('hello', name) 80 81 run(hello, 'Guido') # Preferred 82 run(hello('Guido')) # Ok 83 84This convention is observed by nearly all other functions that accept 85coroutines (e.g., spawning tasks, waiting for timeouts, etc.). 86 87Tasks 88----- 89 90The following functions manage the execution of concurrent tasks. 91 92.. asyncfunction:: spawn(corofunc, *args, daemon=False) 93 94 Create a new task that concurrently executes the async function *corofunc*. *args* 95 are the arguments provided to *corofunc*. Returns a :class:`Task` 96 instance as a result. The *daemon* option specifies 97 that the task is never joined and that its result may be 98 disregarded. 99 100.. asyncfunction:: current_task() 101 102 Returns the :class:`Task` instance corresponding to the caller. 103 104:func:`spawn` and :func:`current_task` return a :class:`Task` instance ``t`` 105with the following methods and attributes: 106 107.. list-table:: 108 :widths: 40 60 109 :header-rows: 0 110 111 * - ``await t.join()`` 112 - Wait for the task to terminate and return its result. 113 Raises :exc:`curio.TaskError` if the task failed with an 114 exception. The ``__cause__`` attribute 115 contains the actual exception raised by the task when it crashed. 116 * - ``await t.wait()`` 117 - Waits for task to terminate, but returns no value. 118 119 * - ``await t.cancel(*, blocking=True, exc=TaskCancelled)`` 120 - Cancels the task by raising a :exc:`curio.TaskCancelled` exception 121 (or the exception specified by *exc*). If ``blocking=True`` (the 122 default), waits for the task to actually terminate. A task may 123 only be cancelled once. If invoked more than once, the second 124 request waits until the task is cancelled from the first request. 125 If the task has already terminated, this method does nothing and 126 returns immediately. Note: uncaught exceptions that occur as a 127 result of cancellation are logged, but not propagated out of the 128 ``Task.cancel()`` method. 129 130 * - ``t.traceback()`` 131 - Creates a stack traceback string. Useful for debugging. 132 * - ``t.where()`` 133 - Return (filename, lineno) where the task is executing. 134 * - ``t.id`` 135 - The task's integer id. Monotonically increases. 136 * - ``t.coro`` 137 - The coroutine associated with the task. 138 * - ``t.daemon`` 139 - Boolean flag that indicates whether or not a task is daemonic. 140 * - ``t.state`` 141 - The name of the task's current state. Useful for debugging. 142 * - ``t.cycles`` 143 - The number of scheduling cycles the task has completed. 144 * - ``t.result`` 145 - A property holding the task result. If accessed before the task terminates, 146 a ``RuntimeError`` exception is raised. If a task crashed with an exception, 147 that exception is reraised on access. 148 * - ``t.exception`` 149 - Exception raised by a task, if any. ``None`` otherwise. 150 * - ``t.cancelled`` 151 - A boolean flag that indicates whether or not the task was cancelled. 152 * - ``t.terminated`` 153 - A boolean flag that indicates whether or not the task has terminated. 154 155Task Groups 156----------- 157 158Tasks may be grouped together to better manage their execution and 159collect results. To do this, create a ``TaskGroup`` instance. 160 161.. class:: TaskGroup(tasks=(), *, wait=all) 162 163 A class representing a group of executing tasks. *tasks* is an 164 optional set of existing tasks to put into the group. 165 *wait* specifies the policy used by ``join()`` to wait for tasks. If *wait* is 166 ``all``, then wait for all tasks to complete. If *wait* is 167 ``any`` then wait for any task to terminate and cancel any 168 remaining tasks. If *wait* is ``object``, then wait for any task 169 to return a non-None object, cancelling all remaining 170 tasks afterwards. If *wait* is ``None``, then immediately cancel all running tasks. 171 Task groups do not form a hierarchy or have any kind of relationship to 172 other previously created task groups or tasks. Moreover, Tasks created by 173 the top level ``spawn()`` function are not placed into any task group. 174 To create a task in a group, it should be created using ``TaskGroup.spawn()`` 175 or explicitly added using ``TaskGroup.add_task()``. 176 177The following methods and attributes are supported on a ``TaskGroup`` instance ``g``: 178 179.. list-table:: 180 :widths: 40 60 181 :header-rows: 0 182 183 * - ``await g.spawn(corofunc, *args, daemon=False)`` 184 - Create a new task in the group. Returns a ``Task`` instance. 185 *daemon* specifies whether or not the result of the task is 186 disregarded. Daemonic tasks are both ignored and cancelled by the ``join()`` 187 method. 188 * - ``await g.add_task(task)`` 189 - Add an already existing task to the group. 190 * - ``await g.next_done()`` 191 - Wait for and return the next completed task. Return ``None`` if no more tasks remain. 192 * - ``await g.next_result()`` 193 - Wait for and return the result of the next completed task. 194 If the task failed with an exception, the exception is raised. 195 * - ``await g.join()`` 196 - Wait for all tasks in the group to terminate according to the wait policy 197 set for the group. If any of the monitored tasks exits with an exception or 198 if the ``join()`` operation itself is cancelled, all remaining tasks in the 199 group are cancelled. If a ``TaskGroup`` is used as a 200 context manager, the ``join()`` method is called on block exit. 201 * - ``await g.cancel_remaining()`` 202 - Cancel and remove all remaining non-daemonic tasks from the group. 203 * - ``g.completed`` 204 - The first task that completed with a valid result after calling ``join()``. 205 * - ``g.result`` 206 - The result of the first task that completed after calling ``join()``. 207 May raise an exception if the task exited with an exception. 208 * - ``g.exception`` 209 - Exception raised by the first task that completed (if any). 210 * - ``g.results`` 211 - A list of all results collected by ``join()``, 212 ordered by task id. May raise an exception if any task 213 exited with an exception. 214 * - ``g.exceptions`` 215 - A list of all exceptions collected by ``join()``. 216 * - ``g.tasks`` 217 - A list of all non-daemonic tasks managed by the group, ordered by task id. 218 Does not include tasks where ``Task.join()`` or ``Task.cancel()`` 219 has been directly called already. 220 221The preferred way to use a ``TaskGroup`` is as a context manager. 222Here are a few common usage patterns:: 223 224 # Spawn multiple tasks and collect all of their results 225 async with TaskGroup(wait=all) as g: 226 await g.spawn(coro1) 227 await g.spawn(coro2) 228 await g.spawn(coro3) 229 print('Results:', g.results) 230 231 # Spawn multiple tasks and collect the result of the first one 232 # that completes--cancelling other tasks 233 async with TaskGroup(wait=any) as g: 234 await g.spawn(coro1) 235 await g.spawn(coro2) 236 await g.spawn(coro3) 237 print('Result:', g.result) 238 239 # Spawn multiple tasks and collect their results as they complete 240 async with TaskGroup() as g: 241 await g.spawn(coro1) 242 await g.spawn(coro2) 243 await g.spawn(coro3) 244 async for task in g: 245 print(task, 'completed.', task.result) 246 247In these examples, access to the ``result`` or ``results`` attribute 248may raise an exception if a task failed for some reason. 249 250If an exception is raised inside the task group context, all managed 251tasks are cancelled and the exception is propagated. For example:: 252 253 try: 254 async with TaskGroup() as g: 255 t1 = await g.spawn(func1) 256 t2 = await g.spawn(func2) 257 t3 = await g.spawn(func3) 258 raise RuntimeError() 259 except RuntimeError: 260 # All launched tasks will have terminated or been cancelled here 261 assert t1.terminated 262 assert t2.terminated 263 assert t3.terminated 264 265It is important to emphasize that no tasks placed in a task group survive past 266the ``join()`` operation or exit from a context manager. This includes 267any daemonic tasks running in the background. 268 269Time 270---- 271 272Curio manages time with an internal monotonic clock. The following functions 273are provided: 274 275.. asyncfunction:: sleep(seconds) 276 277 Sleep for a specified number of seconds. If the number of seconds is 0, 278 execution switches to the next ready task (if any). Returns the current clock value. 279 280.. asyncfunction:: clock() 281 282 Returns the current value of the monotonic clock. Use this to get a 283 base clock value for the ``wake_at()`` function. 284 285Timeouts 286-------- 287Any blocking operation can be cancelled by a timeout. 288 289.. asyncfunction:: timeout_after(seconds, corofunc=None, *args) 290 291 Execute ``corofunc(*args)`` and return its result. If no result is 292 returned before *seconds* have elapsed, a 293 :py:exc:`curio.TaskTimeout` exception is raised on the current 294 blocking operation. If *corofunc* is ``None``, the function 295 returns an asynchronous context manager that applies a timeout to a 296 block of statements. 297 298Every call to ``timeout_after()`` must have a matching exception handler 299to catch the resulting timeout. For example:: 300 301 try: 302 result = await timeout_after(10, coro, arg1, arg2) 303 except TaskTimeout: 304 ... 305 306 # Alternative (context-manager) 307 try: 308 async with timeout_after(10): 309 result = coro(arg1, arg2) 310 ... 311 except TaskTimeout: 312 ... 313 314When timeout operations are nested, the resulting ``TaskTimeout`` 315exception is paired to the matching ``timeout_after()`` operation that 316produced it. Consider this subtle example:: 317 318 async def main(): 319 try: 320 async with timeout_after(1): # Expires first 321 try: 322 async with timeout_after(5): 323 await sleep(1000) 324 except TaskTimeout: # (a) Does NOT match 325 print("Inner timeout") 326 except TaskTimeout: # (b) Matches! 327 print("Outer timeout") 328 329 run(main) 330 331If you run this, you will see output of "Outer timeout" from the 332exception handler at (b). This is because the outer timeout is the one 333that expired. The exception handler at (a) does not match (at that 334point, the exception being reported is 335:py:exc:`curio.TimeoutCancellationError` which indicates 336that a timeout/cancellation has occurred somewhere, but that it is NOT 337due to the inner-most timeout). 338 339If a nested ``timeout_after()`` is used without a matching except 340clause, a timeout is reported as a 341:py:exc:`curio.UncaughtTimeoutError` exception. Remember that all 342timeouts should have a matching exception handler. 343 344If you don't care about exception handling, you can also use the following 345functions: 346 347.. asyncfunction:: ignore_after(seconds, corofunc=None, *args, timeout_result=None) 348 349 Execute ``corofunc(*args)`` and return its result. If *seconds* elapse, the 350 operation is cancelled with a :py:exc:`curio.TaskTimeout` exception, but 351 the exception is discarded and the value of *timeout_result* is returned. 352 If *corofunc* is ``None``, returns an asynchronous context manager that 353 applies a timeout to a block of statements. For this case, the resulting 354 context manager object has an ``expired`` attribute set to ``True`` if 355 time expired. 356 357Here are some examples:: 358 359 result = await ignore_after(5, coro, args) 360 if result is None: 361 # Timeout occurred (if you care) 362 ... 363 364 # Execute multiple statements with a 5 second timeout 365 async with ignore_after(5) as s: 366 await coro1(args) 367 await coro2(args) 368 369 if s.expired: 370 # Timeout occurred 371 ... 372 373The ``ignore_after()`` function is just a convenience layer to simplify exception 374handling. All of the timeout-related functions can be composed and layered 375together in any configuration and it should still work. 376 377Cancellation Control 378-------------------- 379 380Sometimes it is necessary to disable or control cancellation on critical operations. The 381following functions can control this: 382 383.. asyncfunction:: disable_cancellation(corofunc=None, *args) 384 385 Disables the delivery of cancellation-related exceptions while 386 executing *corofunc*. *args* are the arguments to *corofunc*. 387 The result of *corofunc* is returned. Any pending cancellation 388 is delivered to the first-blocking operation after 389 cancellation is reenabled. If *corofunc* is ``None``, a 390 context manager is returned that shields a block of 391 statements from cancellation. 392 393.. asyncfunction:: check_cancellation(exc=None) 394 395 Explicitly check if a cancellation is pending for the calling task. If 396 cancellation is enabled, any pending exception is raised 397 immediately. If cancellation is disabled, it returns the pending 398 cancellation exception instance (if any) or ``None``. If ``exc`` 399 is supplied and it matches the type of the pending exception, the 400 exception is returned and any pending cancellation exception is 401 cleared. 402 403.. asyncfunction:: set_cancellation(exc) 404 405 Set the pending cancellation exception for the calling task to ``exc``. 406 If cancellation is enabled, it will be raised immediately on the next 407 blocking operation. Returns any previously set, but pending cancellation 408 exception. 409 410A common use of these functions is to more precisely control cancellation 411points. Here is an example that shows how to check for cancellation at 412a specific code location (a):: 413 414 async def coro(): 415 async with disable_cancellation(): 416 while True: 417 await coro1() 418 await coro2() 419 if await check_cancellation(): # (a) 420 break # Bail out! 421 422 await check_cancellation() # Cancellation (if any) delivered here 423 424If you only need to shield a single operation, you can write statements like this:: 425 426 async def coro(): 427 ... 428 await disable_cancellation(some_operation, x, y, z) 429 ... 430 431Note: It is not possible for cancellation to be reenabled inside code 432where it has been disabled. 433 434Synchronization Primitives 435-------------------------- 436.. currentmodule:: None 437 438The following synchronization primitives are available. Their behavior 439is identical to their equivalents in the :mod:`threading` module. However, none 440of these primitives are safe to use with threads. 441 442.. class:: Event() 443 444 An event object. 445 446An :class:`Event` instance ``e`` supports the following methods: 447 448.. list-table:: 449 :widths: 40 60 450 :header-rows: 0 451 452 * - ``e.is_set()`` 453 - Return ``True`` if set 454 * - ``e.clear()`` 455 - Clear the event value 456 * - ``await e.wait()`` 457 - Wait for the event to be set 458 * - ``await e.set()`` 459 - Set the event. Wake all waiting tasks (if any) 460 461 462.. class:: Result() 463 464 A synchronized result object. This is like an `Event` except 465 that it additionally carries a value or exception. 466 467An :class:`Result` instance ``r`` supports the following methods: 468 469.. list-table:: 470 :widths: 40 60 471 :header-rows: 0 472 473 * - ``r.is_set()`` 474 - Return ``True`` if a result value or exception has been set 475 * - ``await r.set_value(value)`` 476 - Set the result value, waking any waiters. 477 * - ``await r.set_exception(exc)`` 478 - Set the result exception, waking any waiters. 479 * - ``await r.unwrap()`` 480 - Wait and return the set value. If an exception was set, it is reraised. 481 482``Lock``, ``RLock``, ``Semaphore`` classes that allow for mutual exclusion and 483inter-task coordination. 484 485.. class:: Lock() 486 487 A mutual exclusion lock. 488 489.. class:: RLock() 490 491 A recursive mutual-exclusion lock that can be acquired multiple times within the 492 same task. 493 494.. class:: Semaphore(value=1) 495 496 Semaphores are based on a counter. ``acquire()`` and ``release()`` 497 decrement and increment the counter respectively. If the counter is 0, 498 ``acquire()`` blocks until the value is incremented by another task. The ``value`` 499 attribute of a semaphore is a read-only property holding the current value of the internal 500 counter. 501 502An instance ``lock`` of any of the above classes supports the following methods: 503 504.. list-table:: 505 :widths: 40 60 506 :header-rows: 0 507 508 * - ``await lock.acquire()`` 509 - Acquire the lock 510 * - ``await lock.release()`` 511 - Release the lock. 512 * - ``lock.locked()`` 513 - Return ``True`` if the lock is currently held. 514 515The preferred way to use a Lock is as an asynchronous context manager. For example:: 516 517 import curio 518 lock = curio.Lock() 519 520 async def sometask(): 521 async with lock: 522 print("Have the lock") 523 ... 524 525 526.. class:: Condition(lock=None) 527 528 Condition variable. *lock* is the underlying lock to use. If ``None``, then 529 a :class:`Lock` object is used. 530 531An instance ``cv`` of :class:`Condition` supports the following methods: 532 533 534.. list-table:: 535 :widths: 40 60 536 :header-rows: 0 537 538 * - ``await cv.acquire()`` 539 - Acquire the underlying lock 540 * - ``await cv.release()`` 541 - Release the underlying lock. 542 * - ``cv.locked()`` 543 - Return ``True`` if the lock is currently held. 544 * - ``await cv.wait()`` 545 - Wait on the condition variable. Releases the underlying lock. 546 * - ``await cv.wait_for(pred)`` 547 - Wait on the condition variable until a supplied predicate function returns ``True``. 548 ``pred`` is a callable that takes no arguments. 549 * - ``await cv.notify(n=1)`` 550 - Notify one or more tasks, cause them to wake from ``cv.wait()``. 551 * - ``await cv.notify_all()`` 552 - Notify all waiting tasks. 553 554Proper use of a condition variable is tricky. The following example shows how to implement 555producer-consumer synchronization on top of a ``collections.deque`` object:: 556 557 import curio 558 from collections import deque 559 560 async def consumer(items, cond): 561 while True: 562 async with cond: 563 while not items: # (a) 564 await cond.wait() # Wait for items 565 item = items.popleft() 566 print('Got', item) 567 568 async def producer(items, cond): 569 for n in range(10): 570 async with cond: 571 items.append(n) 572 await cond.notify() 573 await curio.sleep(1) 574 575 async def main(): 576 items = deque() 577 cond = curio.Condition() 578 await curio.spawn(producer, items, cond) 579 await curio.spawn(consumer, items, cond) 580 581 curio.run(main()) 582 583In this code, it is critically important that the ``wait()`` and 584``notify()`` operations take place in a block where the condition 585variable has been properly acquired. Also, the ``while``-loop at (a) 586is not a typo. Condition variables are often used to "signal" that 587some condition has become true, but it is standard practice to re-test 588the condition before proceding (it might be the case that a 589condition was only briefly transient and by the time a notified task 590awakes, the condition no longer holds). 591 592Queues 593------ 594 595To communicate between tasks, use a :class:`Queue`. 596 597.. class:: Queue(maxsize=0) 598 599 Creates a queue with a maximum number of elements in *maxsize*. If not 600 specified, the queue can hold an unlimited number of items. 601 602An instance ``q`` of :class:`Queue` supports the following methods: 603 604.. list-table:: 605 :widths: 40 60 606 :header-rows: 0 607 608 * - ``q.empty()`` 609 - Return ``True`` if the queue is empty. 610 * - ``q.full()`` 611 - Return ``True`` if the queue is full. 612 * - ``q.size()`` 613 - Return number of items currently in the queue. 614 * - ``await q.get()`` 615 - Return an item from the queue. Block if no items are available. 616 * - ``await q.put(item)`` 617 - Put an item on the queue. Blocks if the queue is at capacity. 618 * - ``await q.join()`` 619 - Wait for all elements to be processed. Consumers must call 620 ``q.task_done()`` to indicate the completion of each element. 621 * - ``await q.task_done()`` 622 - Indicate that the processing has finished for an item. If all 623 items have been processed and there are tasks waiting on 624 ``q.join()``, they will be awakened. 625 626Here is an example of using queues in a producer-consumer problem:: 627 628 import curio 629 630 async def producer(queue): 631 for n in range(10): 632 await queue.put(n) 633 await queue.join() 634 print('Producer done') 635 636 async def consumer(queue): 637 while True: 638 item = await queue.get() 639 print('Consumer got', item) 640 await queue.task_done() 641 642 async def main(): 643 q = curio.Queue() 644 prod_task = await curio.spawn(producer(q)) 645 cons_task = await curio.spawn(consumer(q)) 646 await prod_task.join() 647 await cons_task.cancel() 648 649 curio.run(main()) 650 651The following variants of the basic :class:`Queue` class are also provided: 652 653.. class:: PriorityQueue(maxsize=0) 654 655 Creates a priority queue with a maximum number of elements in *maxsize*. 656 The priority of items is determined by standard relational operators 657 such as ``<`` and ``<=``. Lowest priority items are returned first. 658 659.. class:: LifoQueue(maxsize=0) 660 661 A queue with "Last In First Out" retrieval policy. In other words, a stack. 662 663 664Universal Synchronizaton 665------------------------ 666 667Sometimes it is necessary to synchronize Curio with threads and foreign event loops. 668For this, use the following queue and event classes. 669 670.. class:: UniversalQueue(maxsize=0, withfd=False) 671 672 A queue that can be simultaneously used from Curio tasks, threads, 673 and ``asyncio``. The same programming API is used in all 674 environments, but ``await`` is required for asynchronous 675 operations. If used to coordinate Curio and ``asyncio``, they must 676 be executing in separate threads. The ``withfd`` option specifies 677 whether or not the queue should optionally set up an I/O loopback 678 that allows it to be polled by a foreign event loop. When 679 ``withfd`` is ``True``, adding something to the queue writes a 680 single byte of data to the I/O loopback. Removing an item with ``get()`` 681 reads this byte. 682 683.. class:: UniversalEvent() 684 685 An event object that can be used from Curio tasks, threads, and 686 ``asyncio``. The same programming interface is used in both. 687 Asynchronous operations must be prefaced by ``await``. If used 688 to coordinate Curio and ``asyncio``, they must be executing in 689 separate threads. 690 691.. class:: UniversalResult() 692 693 A result object that can be used from Curio tasks, threads, and 694 ``asyncio``. A result is somewhat similar to an event, but it 695 additionally carries an attached value or exception. To set the 696 result, use ``set_value()`` or ``set_exception()``. To return the 697 result, blocking if necessary, use ``unwrap()``. If used in an 698 asynchronous environment, these operations must be prefaced by 699 ``await``. If used to coordinate Curio and ``asyncio``, they must 700 be executing in separate threads. A ``UniversalResult()`` is 701 somewhat similar to a ``Future`` in usage, but it has a much more 702 restrictive API. 703 704Here is an example of a producer-consumer problem with a ``UniversalQueue`` 705involving Curio, threads, and ``asyncio`` all running at once:: 706 707 from curio import run, UniversalQueue, spawn, run_in_thread 708 709 import time 710 import threading 711 import asyncio 712 713 # An async task 714 async def consumer(name, q): 715 print(f'{name} consumer starting') 716 while True: 717 item = await q.get() 718 if item is None: 719 break 720 print(f'{name} got: {item}') 721 await q.task_done() 722 print(f'{name} consumer done') 723 await q.put(None) 724 725 # A threaded producer 726 def producer(q): 727 for i in range(10): 728 q.put(i) 729 time.sleep(1) 730 q.join() 731 print('Producer done') 732 733 async def main(): 734 q = UniversalQueue() 735 # A Curio consumer 736 t1 = await spawn(consumer('curio', q)) 737 738 # An asyncio consumer 739 t2 = threading.Thread(target=asyncio.run, args=[consumer('asyncio', q)]) 740 t2.start() 741 742 # A threaded producer 743 t3 = threading.Thread(target=producer, args=[q]) 744 t3.start() 745 await run_in_thread(t3.join) 746 747 # Shutdown with a sentinel 748 await q.put(None) 749 await t1.join() 750 await run_in_thread(t2.join) 751 752 run(main()) 753 754In this code, the ``consumer()`` coroutine is used simultaneously in 755Curio and ``asyncio``. ``producer()`` is an ordinary thread. 756 757Here is example that has Curio wait for the result produced 758by a thread using a ``UniversalResult`` object:: 759 760 import threading 761 import curio 762 import time 763 764 def worker(x, y, result): 765 time.sleep(10) 766 try: 767 result.set_value(x+y) 768 except Exception as err: 769 result.set_exception(err) 770 771 async def main(): 772 result = curio.UniversalResult() 773 threading.Thread(target=worker, args=[2,3,result]).start() 774 print("Result:", await result.unwrap()) 775 776 curio.run(main) 777 778When in doubt, queues, events, and results are the preferred mechanism 779of coordinating Curio with foreign environments. Higher-level 780abstractions can often be built from these. 781 782Blocking Operations and External Work 783------------------------------------- 784 785Sometimes you need to perform work that takes a long time to complete 786or otherwise blocks the progress of other tasks. This includes 787CPU-intensive calculations and blocking operations carried out by 788foreign libraries. Use the following functions to do that: 789 790.. asyncfunction:: run_in_process(callable, *args) 791 792 Run ``callable(*args)`` in a separate process and returns the 793 result. If cancelled, the underlying worker process is immediately 794 cancelled by a ``SIGTERM`` signal. The given callable executes in 795 an entirely independent Python interpreter and there is no shared 796 global state. The separate process is launched using the "spawn" 797 method of the ``multiprocessing`` module. 798 799.. asyncfunction:: run_in_thread(callable, *args) 800 801 Run ``callable(*args)`` in a separate thread and return 802 the result. If the calling task is cancelled, the underlying 803 worker thread (if started) is set aside and sent a termination 804 request. However, since there is no underlying mechanism to 805 forcefully kill threads, the thread won't recognize the termination 806 request until it runs the requested work to completion. It's 807 important to note that a cancellation won't block other tasks 808 from using threads. Instead, cancellation produces a kind of 809 "zombie thread" that executes the requested work, discards the 810 result, and then disappears. For reliability, work submitted to 811 threads should have a timeout or some other mechanism that 812 puts a bound on execution time. 813 814.. asyncfunction:: block_in_thread(callable, *args) 815 816 The same as ``run_in_thread()``, but guarantees that only 817 one background thread is used for each unique callable 818 regardless of how many tasks simultaneously try to 819 carry out the same operation at once. Only use this function if there is 820 an expectation that the provided callable is going to 821 block for an undetermined amount of time and that there 822 might be a large amount of contention from multiple tasks on the same 823 resource. The primary use is on waiting operations involving 824 foreign locks and queues. For example, if you launched a hundred 825 Curio tasks and they all decided to block on a shared thread queue, 826 using this would be much more efficient than ``run_in_thread()``. 827 828.. asyncfunction:: run_in_executor(exc, callable, *args) 829 830 Run ``callable(*args)`` callable in a user-supplied 831 executor and returns the result. *exc* is an executor from the 832 :py:mod:`concurrent.futures` module in the standard library. This 833 executor is expected to implement a 834 :meth:`~concurrent.futures.Executor.submit` method that executes 835 the given callable and returns a 836 :class:`~concurrent.futures.Future` instance for collecting its 837 result. 838 839 840When performing external work, it's almost always better to use the 841:func:`run_in_process` and :func:`run_in_thread` functions instead 842of :func:`run_in_executor`. These functions have no external library 843dependencies, have less communication overhead, and more 844predictable cancellation semantics. 845 846The following values in :mod:`curio.workers` define how many 847worker threads and processes are used. If you are going to 848change these values, do it before any tasks are executed. 849 850.. data:: MAX_WORKER_THREADS 851 852 Specifies the maximum number of threads used by a single kernel 853 using the :func:`run_in_thread` function. Default value is 64. 854 855.. data:: MAX_WORKER_PROCESSES 856 857 Specifies the maximum number of processes used by a single kernel 858 using the :func:`run_in_process` function. Default value is the 859 number of CPUs on the host system. 860 861I/O Classes 862----------- 863 864I/O in Curio is managed by a collection of classes in :mod:`curio.io`. 865These classes act as asynchronous proxies around sockets, streams, and 866ordinary files. The programming interface is meant to be the same as 867in normal synchronous Python code. 868 869Socket 870^^^^^^ 871 872The :class:`Socket` class wraps an existing socket-like object with 873an async interface. 874 875.. class:: Socket(sockobj) 876 877 Creates a proxy around an existing socket *sockobj*. *sockobj* is 878 put in non-blocking mode when wrapped. *sockobj* is not closed unless 879 the created ``Socket`` instance is explicitly closed or used as a 880 context manager. 881 882The following methods are redefined on an instance ``s`` of :class:`Socket`. 883 884.. list-table:: 885 :widths: 60 40 886 :header-rows: 0 887 888 * - ``await s.recv(maxbytes, flags=0)`` 889 - Receive up to *maxbytes* of data. 890 * - ``await s.recv_into(buffer, nbytes=0, flags=0)`` 891 - Receive up to *nbytes* of data into a buffer. 892 * - ``await s.recvfrom(maxsize, flags=0)`` 893 - Receive up to *maxbytes* of data. Returns a tuple ``(data, client_address)``. 894 * - ``await s.recvfrom_into(buffer, nbytes=0, flags=0)`` 895 - Receive up to *nbytes* of data into a buffer. 896 * - ``await s.recvmsg(bufsize, ancbufsize=0, flags=0)`` 897 - Receive normal and ancillary data. 898 * - ``await s.recvmsg_into(buffers, ancbufsize=0, flags=0)`` 899 - Receive normal and ancillary data into a buffer. 900 * - ``await s.send(data, flags=0)`` 901 - Send data. Returns the number of bytes sent. 902 * - ``await s.sendall(data, flags=0)`` 903 - Send all of the data in *data*. If cancelled, the ``bytes_sent`` attribute of the 904 exception contains the number of bytes sent. 905 * - ``await s.sendto(data, address)`` 906 - Send data to the specified address. 907 * - ``await s.sendto(data, flags, address)`` 908 - Send data to the specified address (alternate). 909 * - ``await s.sendmsg(buffers, ancdata=(), flags=0, address=None)`` 910 - Send normal and ancillary data to the socket. 911 * - ``await s.accept()`` 912 - Wait for a new connection. Returns a tuple ``(sock, address)`` where ``sock`` 913 is an instance of ``Socket``. 914 * - ``await s.connect(address)`` 915 - Make a connection. 916 * - ``await s.connect_ex(address)`` 917 - Make a connection and return an error code instead of raising an exception. 918 * - ``await s.close()`` 919 - Close the connection. 920 * - ``await s.shutdown(how)`` 921 - Shutdown the socket. *how* is one of 922 ``SHUT_RD``, ``SHUT_WR``, or ``SHUT_RDWR``. 923 * - ``await s.do_handshake()`` 924 - Perform an SSL client handshake (only on SSL sockets). 925 * - ``s.makefile(mode, buffering=0)`` 926 - Make a :class:`curio.io.FileStream` instance wrapping the socket. 927 Prefer to use :meth:`Socket.as_stream` instead. Not supported on Windows. 928 * - ``s.as_stream()`` 929 - Wrap the socket as a stream using :class:`curio.io.SocketStream`. 930 * - ``s.blocking()`` 931 - A context manager that returns the internal socket placed into blocking mode. 932 933Any socket method not listed here (e.g., ``s.setsockopt()``) will be 934delegated directly to the underlying socket as an ordinary method. 935:class:`Socket` objects may be used as an asynchronous context manager 936which cause the underlying socket to be closed when done. 937 938Streams 939^^^^^^^ 940 941A stream is an asynchronous file-like object that wraps around an 942object that natively implements non-blocking I/O. Curio implements 943two basic classes: 944 945.. class:: FileStream(fileobj) 946 947 Create a file-like wrapper around an existing file as might be 948 created by the built-in ``open()`` function or 949 ``socket.makefile()``. *fileobj* must be in in binary mode and 950 must support non-blocking I/O. The file is placed into 951 non-blocking mode using ``os.set_blocking(fileobj.fileno())``. 952 *fileobj* is not closed unless the resulting instance is explicitly 953 closed or used as a context manager. Not supported on Windows. 954 955.. class:: SocketStream(sockobj) 956 957 Create a file-like wrapper around a socket. *sockobj* is an 958 existing socket-like object. The socket is put into non-blocking mode. 959 *sockobj* is not closed unless the resulting instance is explicitly 960 closed or used as a context manager. Instantiated by ``Socket.as_stream()``. 961 962An instance ``s`` of either stream class implement the following methods: 963 964 965.. list-table:: 966 :widths: 40 60 967 :header-rows: 0 968 969 * - ``await s.read(maxbytes=-1)`` 970 - Read up to *maxbytes* of data on the file. If omitted, reads as 971 much data as is currently available. 972 * - ``await s.readall()`` 973 - Return all data up to EOF. 974 * - ``await s.read_exactly(n)`` 975 - Read exactly n bytes of data. 976 * - ``await s.readline()`` 977 - Read a single line of data. 978 * - ``await s.readlines()`` 979 - Read all of the lines. If cancelled, the ``lines_read`` attribute of 980 the exception contains all lines read. 981 * - ``await s.write(bytes)`` 982 - Write all of the data in *bytes*. 983 * - ``await s.writelines(lines)`` 984 - Writes all of the lines in *lines*. If cancelled, the ``bytes_written`` 985 attribute of the exception contains the total bytes written so far. 986 * - ``await s.flush()`` 987 - Flush any unwritten data from buffers. 988 * - ``await s.close()`` 989 - Flush any unwritten data and close the file. Not called on garbage collection. 990 * - ``s.blocking()`` 991 - A context manager that temporarily places the stream into blocking mode and 992 returns the raw file object used internally. Note: for 993 ``SocketStream`` this creates a file using 994 ``open(sock.fileno(), 'rb+', closefd=False)`` which is not 995 supported on Windows. 996 997Other methods (e.g., ``tell()``, ``seek()``, ``setsockopt()``, etc.) are available 998if the underlying ``fileobj`` or ``sockobj`` provides them. A ``Stream`` may be used as an asynchronous context manager. 999 1000Files 1001^^^^^ 1002 1003The :mod:`curio.file` module provides an asynchronous compatible 1004replacement for the built-in ``open()`` function and associated file 1005objects. Use this to read and write traditional files on the 1006filesystem while avoiding blocking. How this is accomplished is an 1007implementation detail (although threads are used in the initial 1008version). 1009 1010.. function:: aopen(*args, **kwargs) 1011 1012 Creates a :class:`curio.file.AsyncFile` wrapper around a traditional file object as 1013 returned by Python's builtin ``open()`` function. The arguments are exactly the 1014 same as for ``open()``. The returned file object must be used as an asynchronous 1015 context manager. 1016 1017.. class:: AsyncFile(fileobj) 1018 1019 This class represents an asynchronous file as returned by the ``aopen()`` 1020 function. Normally, instances are created by the ``aopen()`` function. 1021 However, it can be wrapped around an already-existing file object. 1022 1023The following methods are redefined on :class:`AsyncFile` objects to be 1024compatible with coroutines. Any method not listed here will be 1025delegated directly to the underlying file. These methods take the same arguments 1026as the underlying file object. Be aware that not all of these methods are 1027available on all kinds of files (e.g., ``read1()``, ``readinto()`` and similar 1028methods are only available in binary-mode files). 1029 1030.. list-table:: 1031 :widths: 50 50 1032 :header-rows: 0 1033 1034 * - ``await f.read(maxbytes=-1)`` 1035 - Read up to *maxbytes* of data on the file. If omitted, reads as 1036 much data as is currently available. 1037 * - ``await f.read1(maxbytes=-1)`` 1038 - Same as ``read()``, but uses a single system call. 1039 * - ``await f.readline(maxbytes=-1)`` 1040 - Read a line of input. 1041 * - ``await f.readlines(maxbytes=-1)`` 1042 - Read all lines of input data 1043 * - ``await f.readinto(buffer)`` 1044 - Read data into a buffer. 1045 * - ``await f.readinto1(buffer)`` 1046 - Read data into a buffer using a single system call. 1047 * - ``await f.readall()`` 1048 - Read all available data up to EOF. 1049 * - ``await f.write(data)`` 1050 - Write data 1051 * - ``await f.writelines(lines)`` 1052 - Write all lines. 1053 * - ``await f.truncate(pos=None)`` 1054 - Truncate the file to a given size/position. If ``None``, file is truncated at position 1055 of current file pointer. 1056 * - ``await f.seek(offset, whence=os.SEEK_SET)`` 1057 - Seek to a new file position. 1058 * - ``await f.tell()`` 1059 - Report current file pointer. 1060 * - ``await f.flush()`` 1061 - Flush data to a file 1062 * - ``await f.close()`` 1063 - Flush remaining data and close. 1064 1065The preferred way to use an :class:`AsyncFile` object is as an asynchronous context manager. 1066For example:: 1067 1068 async with aopen(filename) as f: 1069 # Use the file 1070 data = await f.read() 1071 1072:class:`AsyncFile` objects may also be used with asynchronous iteration. 1073For example:: 1074 1075 async with aopen(filename) as f: 1076 async for line in f: 1077 ... 1078 1079:class:`AsyncFile` objects are intentionally incompatible with code 1080that uses files in a synchronous manner. Partly, this is to help 1081avoid unintentional errors in your program where blocking might 1082occur without you realizing it. If you know what you're doing and you 1083need to access the underlying file in synchronous code, use the 1084`blocking()` context manager like this:: 1085 1086 async with aopen(filename) as f: 1087 ... 1088 # Pass to synchronous code (danger: might block) 1089 with f.blocking() as sync_f: 1090 # Use synchronous I/O operations 1091 data = sync_f.read() 1092 ... 1093 1094At first glance, the API to streams and files might look identical. The 1095difference concerns internal implementation. A stream works natively 1096with non-blocking I/O. An ``AsyncFile`` uses a combination of threads and 1097synchronous calls to provide an async-compatible API. Given a choice, 1098you should use streams. However, some systems don't provide non-blocking 1099implementations of certain system calls. In those cases, an ``AsyncFile`` is a 1100fallback. 1101 1102Networking 1103---------- 1104 1105Curio provides a number of submodules for different kinds of network 1106programming. 1107 1108High Level Networking 1109^^^^^^^^^^^^^^^^^^^^^ 1110 1111The following functions are use to make network connections and implement 1112socket-based servers. 1113 1114.. asyncfunction:: open_connection(host, port, *, ssl=None, source_addr=None, server_hostname=None, alpn_protocols=None) 1115 1116 Creates an outgoing connection to a server at *host* and 1117 *port*. This connection is made using the 1118 :py:func:`socket.create_connection` function and might be IPv4 or 1119 IPv6 depending on the network configuration (although you're not 1120 supposed to worry about it). *ssl* specifies whether or not SSL 1121 should be used. *ssl* can be ``True`` or an instance of 1122 :class:`curio.ssl.SSLContext`. *source_addr* specifies the source 1123 address to use on the socket. *server_hostname* specifies the 1124 hostname to check against when making SSL connections. It is 1125 highly advised that this be supplied to avoid man-in-the-middle 1126 attacks. *alpn_protocols* specifies a list of protocol names 1127 for use with the TLS ALPN extension (RFC7301). A typical value 1128 might be ``['h2', 'http/1.1']`` for negotiating either a HTTP/2 1129 or HTTP/1.1 connection. 1130 1131.. asyncfunction:: open_unix_connection(path, *, ssl=None, server_hostname=None, alpn_protocols=None) 1132 1133 Creates a connection to a Unix domain socket with optional SSL applied. 1134 1135.. asyncfunction:: tcp_server(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True, reuse_port=False) 1136 1137 Runs a server for receiving TCP connections on 1138 a given host and port. *client_connected_task* is a coroutine that 1139 is to be called to handle each connection. Family specifies the 1140 address family and is either :data:`socket.AF_INET` or 1141 :data:`socket.AF_INET6`. *backlog* is the argument to the 1142 :py:meth:`socket.socket.listen` method. *ssl* specifies an 1143 :class:`curio.ssl.SSLContext` instance to use. *reuse_address* 1144 specifies whether to use the ``SO_REUSEADDR`` socket option. *reuse_port* 1145 specifies whether to use the ``SO_REUSEPORT`` socket option. 1146 1147.. asyncfunction:: unix_server(path, client_connected_task, *, backlog=100, ssl=None) 1148 1149 Runs a Unix domain server on a given 1150 path. *client_connected_task* is a coroutine to execute on each 1151 connection. *backlog* is the argument given to the 1152 :py:meth:`socket.socket.listen` method. *ssl* is an optional 1153 :class:`curio.ssl.SSLContext` to use if setting up an SSL 1154 connection. 1155 1156.. asyncfunction:: run_server(sock, client_connected_task, ssl=None) 1157 1158 Runs a server on a given socket. *sock* is a socket already 1159 configured to receive incoming connections. *client_connected_task* and 1160 *ssl* have the same meaning as for the ``tcp_server()`` and ``unix_server()`` 1161 functions. If you need to perform some kind of special socket 1162 setup, not possible with the normal ``tcp_server()`` function, you can 1163 create the underlying socket yourself and then call this function 1164 to run a server on it. 1165 1166.. function:: tcp_server_socket(host, port, family=AF_INET, backlog=100, reuse_address=True, reuse_port=False) 1167 1168 Creates and returns a TCP socket. Arguments are the same as for the 1169 ``tcp_server()`` function. The socket is suitable for use with other 1170 async operations as well as the ``run_server()`` function. 1171 1172.. function:: unix_server_socket(path, backlog=100) 1173 1174 Creates and returns a Unix socket. Arguments are the same as for the 1175 ``unix_server()`` function. The socket is suitable for use with other 1176 async operations as well as the ``run_server()`` function. 1177 1178 1179Message Passing and Channels 1180^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 1181 1182Curio provides a :class:`Channel` class that can be used to perform message 1183passing between interpreters running in separate processes. Message passing 1184uses the same protocol as the ``multiprocessing`` standard library. 1185 1186.. class:: Channel(address, family=socket.AF_INET) 1187 1188 Represents a communications endpoint for message passing. 1189 *address* is the address and *family* is the protocol 1190 family. 1191 1192The following methods are used to establish a connection on a :class:`Channel` instance ``ch``. 1193 1194.. list-table:: 1195 :widths: 50 50 1196 :header-rows: 0 1197 1198 * - ``await ch.accept(*, authkey=None)`` 1199 - Wait for an incoming connection and return a 1200 :class:`Connection` instance. *authkey* is an optional 1201 authentication key. 1202 * - ``await ch.connect(*, authkey=None)`` 1203 - Make an outgoing connection and return a :class:`Connection` instance. 1204 *authkey* is an optional authentication key. 1205 * - ``ch.bind()`` 1206 - Performs the address binding step of the ``accept()`` method. 1207 Use this to have the host operating system to assign a port 1208 number. For example, use an address of ``('localhost', socket.INADDR_ANY)`` and call ``bind()``. 1209 Afterwards, ``ch.address`` contains the assigned address. 1210 * - ``await ch.close()`` 1211 - Close the channel. 1212 1213The ``connect()`` and ``accept()`` methods of :class:`Channel` instances return an 1214instance of the :class:`Connection` class: 1215 1216.. class:: Connection(reader, writer) 1217 1218 Represents a connection on which message passing of Python objects is 1219 supported. *reader* and *writer* are I/O streams on which reading 1220 and writing are to take place (for example, instances of ``SocketStream`` 1221 or ``FileStream``). 1222 1223An instance ``c`` of :class:`Connection` supports the following methods: 1224 1225.. list-table:: 1226 :widths: 55 45 1227 :header-rows: 0 1228 1229 * - ``await c.close()`` 1230 - Close the connection. 1231 * - ``await c.recv()`` 1232 - Receive a Python object. 1233 * - ``await c.recv_bytes()`` 1234 - Receive a raw message of bytes. 1235 * - ``await c.send(obj)`` 1236 - Send a Python object. 1237 * - ``await c.send_bytes(buf, offset=0, size=None)`` 1238 - Send a buffer of bytes as a single message. *offset* and *size* specify 1239 an optional byte offset and size into the underlying memory buffer. 1240 * - ``await c.authenticate_server(authkey)`` 1241 - Authenticate server endpoint. 1242 * - ``await c.authenticate_client(authkey)`` 1243 - Authenticate client endpoint. 1244 1245A :class:`Connection` instance may also be used as a context manager. 1246 1247Here is an example of a producer program using channels:: 1248 1249 # producer.py 1250 from curio import Channel, run 1251 1252 async def producer(ch): 1253 c = await ch.accept(authkey=b'peekaboo') 1254 for i in range(10): 1255 await c.send(i) 1256 await c.send(None) # Sentinel 1257 1258 if __name__ == '__main__': 1259 ch = Channel(('localhost', 30000)) 1260 run(producer(ch)) 1261 1262Here is an example of a corresponding consumer program using a channel:: 1263 1264 # consumer.py 1265 from curio import Channel, run 1266 1267 async def consumer(ch): 1268 c = await ch.connect(authkey=b'peekaboo') 1269 while True: 1270 msg = await c.recv() 1271 if msg is None: 1272 break 1273 print('Got:', msg) 1274 1275 if __name__ == '__main__': 1276 ch = Channel(('localhost', 30000)) 1277 run(consumer(ch)) 1278 1279socket module 1280^^^^^^^^^^^^^ 1281 1282The :mod:`curio.socket` module provides a wrapper around selected functions in the built-in 1283:mod:`socket` module--allowing it to be used as a stand-in in 1284Curio-related code. The module provides exactly the same 1285functionality except that certain operations have been replaced by 1286asynchronous equivalents. 1287 1288.. function:: socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None) 1289 1290 Creates a :class:`curio.io.Socket` wrapper the around :class:`socket` objects created in the built-in :mod:`socket` 1291 module. The arguments for construction are identical and have the same meaning. 1292 The resulting :class:`socket` instance is set in non-blocking mode. 1293 1294The following module-level functions have been modified so that the returned socket 1295objects are compatible with Curio: 1296 1297.. function:: socketpair(family=AF_UNIX, type=SOCK_STREAM, proto=0) 1298.. function:: fromfd(fd, family, type, proto=0) 1299.. function:: create_connection(address, source_address) 1300 1301The following module-level functions have been redefined as coroutines so that they 1302don't block the kernel when interacting with DNS. This is accomplished through 1303the use of threads. 1304 1305.. asyncfunction:: getaddrinfo(host, port, family=0, type=0, proto=0, flags=0) 1306.. asyncfunction:: getfqdn(name) 1307.. asyncfunction:: gethostbyname(hostname) 1308.. asyncfunction:: gethostbyname_ex(hostname) 1309.. asyncfunction:: gethostname() 1310.. asyncfunction:: gethostbyaddr(ip_address) 1311.. asyncfunction:: getnameinfo(sockaddr, flags) 1312 1313 1314ssl module 1315^^^^^^^^^^ 1316 1317The :mod:`curio.ssl` module provides Curio-compatible functions for creating an SSL 1318wrapped Curio socket. The following functions are redefined (and have the same 1319calling signature as their counterparts in the standard :mod:`ssl` module: 1320 1321.. asyncfunction:: wrap_socket(*args, **kwargs) 1322 1323.. asyncfunction:: get_server_certificate(*args, **kwargs) 1324 1325.. function:: create_default_context(*args, **kwargs) 1326 1327.. class:: SSLContext 1328 1329 A redefined and modified variant of :class:`ssl.SSLContext` so that the 1330 :meth:`wrap_socket` method returns a socket compatible with Curio. 1331 1332Don't attempt to use the :mod:`curio.ssl` module without a careful read of Python's official documentation 1333at https://docs.python.org/3/library/ssl.html. 1334 1335It is usually easier to apply SSL to a 1336connection using the high level network functions previously described. 1337For example, here's how you make an outgoing SSL connection:: 1338 1339 sock = await curio.open_connection('www.python.org', 443, 1340 ssl=True, 1341 server_hostname='www.python.org') 1342 1343Here's how you create a server that uses SSL:: 1344 1345 import curio 1346 from curio import ssl 1347 1348 KEYFILE = "privkey_rsa" # Private key 1349 CERTFILE = "certificate.crt" # Server certificat 1350 1351 async def handler(client, addr): 1352 ... 1353 1354 if __name__ == '__main__': 1355 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 1356 ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE) 1357 curio.run(curio.tcp_server('', 10000, handler, ssl=ssl_context)) 1358 1359 1360Subprocesses 1361------------ 1362 1363The :mod:`curio.subprocess` module implements the same functionality as the built-in 1364:mod:`subprocess` module. 1365 1366.. class:: Popen(*args, **kwargs) 1367 1368 A wrapper around the :class:`subprocess.Popen` class. The same arguments are 1369 accepted. On the resulting :class:`~subprocess.Popen` instance, the 1370 :attr:`~subprocess.Popen.stdin`, :attr:`~subprocess.Popen.stdout`, and 1371 :attr:`~subprocess.Popen.stderr` file attributes have been wrapped by the 1372 :class:`curio.io.FileStream` class. You can use these in an asynchronous 1373 context. 1374 1375The following methods of :class:`Popen` have been replaced by asynchronous equivalents: 1376 1377.. asyncmethod:: Popen.wait() 1378 1379 Wait for a subprocess to exit. Cancellation does not terminate the process. 1380 1381.. asyncmethod:: Popen.communicate(input=b'') 1382 1383 Communicate with the subprocess, sending the specified input on standard input. 1384 Returns a tuple ``(stdout, stderr)`` with the resulting output of standard output 1385 and standard error. If cancelled, the resulting exception has ``stdout`` and 1386 ``stderr`` attributes that contain the output read prior to cancellation. 1387 Cancellation does not terminate the underlying subprocess. 1388 1389The following functions are also available. They accept the same arguments as their 1390equivalents in the :mod:`subprocess` module: 1391 1392.. asyncfunction:: run(args, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False) 1393 1394 Run a command in a subprocess. Returns a :class:`subprocess.CompletedProcess` instance. 1395 If cancelled, the underlying process is terminated using the process ``kill()`` method. 1396 The resulting exception will have ``stdout`` and ``stderr`` attributes containing 1397 output read prior to cancellation. 1398 1399.. asyncfunction:: check_output(args, stdout=None, stderr=None, shell=False) 1400 1401 Run a command in a subprocess and return the resulting output. Raises a 1402 :py:exc:`subprocess.CalledProcessError` exception if an error occurred. 1403 The behavior on cancellation is the same as for ``run()``. 1404 1405Here is an example of using :class:`Popen` to read streaming output off of a 1406subprocess with Curio:: 1407 1408 import curio 1409 from curio import subprocess 1410 1411 async def main(): 1412 p = subprocess.Popen(['ping', 'www.python.org'], stdout=subprocess.PIPE) 1413 async for line in p.stdout: 1414 print('Got:', line.decode('ascii'), end='') 1415 1416 if __name__ == '__main__': 1417 kernel = curio.Kernel() 1418 kernel.add_task(main()) 1419 kernel.run() 1420 1421Asynchronous Threads 1422-------------------- 1423 1424If you need to perform a lot of synchronous operations, but still 1425interact with Curio, you can launch an async-thread. 1426An asynchronous thread flips the whole world around--instead 1427of executing selected synchronous operations using ``run_in_thread()``, you 1428run everything in a thread and perform selected async operations using the 1429``AWAIT()`` function. 1430 1431To create an asynchronous thread, use ``spawn_thread()``: 1432 1433.. asyncfunction:: spawn_thread(func, *args, daemon=False) 1434 1435 Launch an asynchronous thread that runs the callable ``func(*args)``. 1436 ``daemon`` specifies if the thread runs in daemonic mode. 1437 Returns an ``AsyncThread`` instance. 1438 1439An instance ``t`` of ``AsyncThread`` supports the following methods. 1440 1441.. list-table:: 1442 :widths: 55 45 1443 :header-rows: 0 1444 1445 * - ``await t.join()`` 1446 - Waits for the thread to terminate, returning the final result. 1447 The final result is returned in the same manner as ``Task.join()``. 1448 * - ``await t.wait()`` 1449 - Waits for the thread to terminate, but does not return any result. 1450 * - ``await t.cancel(*, blocking=True, exc=TaskCancelled)`` 1451 - Cancels the asynchronous thread. The behavior is the same as with ``Task``. 1452 Note: An asynchronous thread can only be cancelled when it performs 1453 operations using ``AWAIT()``. 1454 * - ``t.result`` 1455 - The final result of the thread. If the thread crashed 1456 with an exception, that exception is reraised on access. 1457 * - ``t.exception`` 1458 - The final exception (if any) 1459 * - ``t.id`` 1460 - Thread ID. A monotonically increasing integer. 1461 * - ``t.terminated`` 1462 - ``True`` if the thread is terminated. 1463 * - ``t.cancelled`` 1464 - ``True`` if the thread was cancelled. 1465 1466Within a thread, the following function is used to execute any coroutine. 1467 1468.. function:: AWAIT(coro) 1469 1470 Execute a coroutine on behalf of an asynchronous thread. The requested 1471 coroutine executes in Curio's main execution thread. The caller is 1472 blocked until it completes. If used outside of an asynchronous thread, 1473 an ``AsyncOnlyError`` exception is raised. If ``coro`` is not a 1474 coroutine, it is returned unmodified. The reason ``AWAIT`` is all-caps 1475 is to make it more easily heard when there are all of these coders yelling 1476 at you to just use pure async code instead of launching a thread. Also, 1477 ``await`` is a reserved keyword in Python 3.7. 1478 1479Here is an example of an asynchronous thread reading off a Curio queue:: 1480 1481 from curio import run, Queue, sleep, CancelledError 1482 from curio.thread import spawn_thread, AWAIT 1483 1484 def consumer(queue): 1485 try: 1486 while True: 1487 item = AWAIT(queue.get()) 1488 print('Got:', item) 1489 AWAIT(queue.task_done()) 1490 1491 except CancelledError: 1492 print('Consumer goodbye!') 1493 raise 1494 1495 async def main(): 1496 q = Queue() 1497 t = await spawn_thread(consumer, q) 1498 1499 for i in range(10): 1500 await q.put(i) 1501 await sleep(1) 1502 1503 await q.join() 1504 await t.cancel() 1505 1506 run(main()) 1507 1508Asynchronous threads can perform any combination of blocking operations 1509including those that might involve normal thread-related primitives such 1510as locks and queues. These operations will block the thread itself, but 1511will not block the Curio kernel loop. In a sense, this is the whole 1512point--if you run things in an async threads, the rest of Curio is 1513protected. Asynchronous threads can be cancelled in the same manner 1514as normal Curio tasks. However, the same rules apply--an asynchronous 1515thread can only be cancelled on blocking operations involving ``AWAIT()``. 1516 1517Scheduler Activations 1518--------------------- 1519 1520Every task in Curio goes through a life-cycle of creation, running, 1521suspension, and termination. These steps are managed by an internal 1522scheduler. A scheduler activation is a mechanism for monitoring these 1523steps. To do this, you define a class that inherits from 1524:class:`Activation` in the submodule ``curio.activation``. 1525 1526.. class:: Activation 1527 1528 Base class for defining scheduler activations. 1529 1530An instance ``a`` of :class:`Activation` implements the following methods: 1531 1532.. list-table:: 1533 :widths: 55 45 1534 :header-rows: 0 1535 1536 * - ``a.activate(kernel)`` 1537 - Executed once upon initialization of the Curio kernel. *kernel* is 1538 a reference to the ``Kernel`` instance. 1539 * - ``a.created(task)`` 1540 - Called when a new task is created. *task* is the newly created ``Task`` instance. 1541 * - ``a.running(task)`` 1542 - Called immediately prior to the execution cycle of a task. 1543 * - ``a.suspended(task, trap)`` 1544 - Called when a task has suspended execution. *trap* is the trap executed. 1545 * - ``a.terminated(task)`` 1546 - Called when a task has terminated execution. Note: the 1547 ``suspended()`` method is always called immediately prior to a task being terminated. 1548 1549Activations are used to implement debugging and diagnostic tools. As 1550an example, here is a scheduler activation that monitors for 1551long-execution times and reports warnings:: 1552 1553 from curio.activation import Activation 1554 import time 1555 1556 class LongBlock(Activation): 1557 def __init__(self, maxtime): 1558 self.maxtime = maxtime 1559 1560 def running(self, task): 1561 self.start = time.time() 1562 1563 def suspended(self, task, trap): 1564 end = time.time() 1565 if end - self.start > self.maxtime: 1566 print(f'Long blocking in {task.name}: {end - self.start}') 1567 1568Scheduler activations are registered when a ``Kernel`` is created or with the 1569top-level ``run()`` function:: 1570 1571 kern = Kernel(activations=[LongBlock(0.05)]) 1572 with kern: 1573 kern.run(coro) 1574 1575 # Alternative 1576 run(coro, activations=[LongBlock(0.05)]) 1577 1578Asynchronous Metaprogramming 1579---------------------------- 1580 1581The :mod:`curio.meta` module provides some functions that might be useful if 1582implementing more complex programs and APIs involving coroutines. 1583 1584.. function:: curio_running(): 1585 1586 Return ``True`` if Curio is running in the current thread. 1587 1588.. function:: iscoroutinefunction(func) 1589 1590 True ``True`` if the supplied *func* is a coroutine function or is known 1591 to resolve into a coroutine. Unlike a similar function in ``inspect``, 1592 this function knows about ``functools.partial()``, awaitable objects, 1593 and async generators. 1594 1595.. function:: instantiate_coroutine(corofunc, *args, **kwargs) 1596 1597 Instantiate a coroutine from *corofunc*. If *corofunc* is already 1598 a coroutine object, it is returned unmodified. If it's a coroutine 1599 function, it's executed within an async context using the given 1600 arguments. If it's not a coroutine, *corofunc* is called 1601 with the given arguments with the expectation that whatever is 1602 returned will be a coroutine instance. 1603 1604.. function:: from_coroutine(level=2) 1605 1606 Returns ``True`` if the caller is calling function is being invoked 1607 from inside a coroutine or not. This is primarily of use when 1608 writing decorators and other advanced metaprogramming features. 1609 The implementation requires stack-frame inspection. The *level* 1610 argument controls the stack frame in which information is obtained 1611 and might need to be adjusted depending on the nature of code calling 1612 this function. 1613 1614.. function:: awaitable(syncfunc) 1615 1616 A decorator that allows an asynchronous implementation of a function to be 1617 attached to an existing synchronous function. If the resulting function is 1618 called from synchronous code, the synchronous function is used. If the 1619 function is called from asynchronous code, the asynchronous function is used. 1620 1621Here is an example that illustrates:: 1622 1623 import curio 1624 from curio.meta import awaitable 1625 1626 def spam(x, y): 1627 print('Synchronous ->', x, y) 1628 1629 @awaitable(spam) 1630 async def spam(x, y): 1631 print('Asynchronous ->', x, y) 1632 1633 async def main(): 1634 await spam(2, 3) # Calls asynchronous spam() 1635 1636 if __name__ == '__main__': 1637 spam(2, 3) # Calls synchronous spam() 1638 curio.run(main()) 1639 1640Exceptions 1641---------- 1642 1643The following exceptions are defined. All are subclasses of the 1644:class:`CurioError` base class. 1645 1646 1647.. list-table:: 1648 :widths: 40 60 1649 :header-rows: 0 1650 1651 * - ``CurioError`` 1652 - Base class for all Curio-specific exceptions. 1653 * - ``CancelledError`` 1654 - Base class for all cancellation-related exceptions. 1655 * - ``TaskCancelled`` 1656 - Exception raised in a coroutine if it has been cancelled using the :meth:`Task.cancel` method. If ignored, the 1657 coroutine is silently terminated. If caught, a coroutine can continue to 1658 run, but should work to terminate execution. Ignoring a cancellation 1659 request and continuing to execute will likely cause some other task to hang. 1660 * - ``TaskTimeout`` 1661 - Exception raised in a coroutine if it has been cancelled by timeout. 1662 A subclass of ``CancelledError``. 1663 * - ``TimeoutCancellationError`` 1664 - Exception raised in a coroutine if it has been cancelled due to a timeout, 1665 but not one related to the inner-most timeout operation. A subclass 1666 of ``CancelledError``. 1667 * - ``UncaughtTimeoutError`` 1668 - Exception raised if a timeout from an inner timeout operation has 1669 propagated to an outer timeout, indicating the lack of a proper 1670 try-except block. A subclass of ``CurioError``. 1671 * - ``TaskError`` 1672 - Exception raised by the :meth:`Task.join` method if an uncaught exception 1673 occurs in a task. It is a chained exception. The ``__cause__`` attribute 1674 contains the exception that causes the task to fail. 1675 * - ``SyncIOError`` 1676 - Exception raised if a task attempts to perform a synchronous I/O operation 1677 on an object that only supports asynchronous I/O. 1678 * - ``AsyncOnlyError`` 1679 - Exception raised by the ``AWAIT()`` function if its applied to code not 1680 properly running in an async-thread. 1681 * - ``ResourceBusy`` 1682 - Exception raised in an I/O operation is requested on a resource, but the 1683 resource is already busy performing the same operation on behalf of another task. 1684 The exceptions ``ReadResourceBusy`` and ``WriteResourceBusy`` are subclasses 1685 that provide a more specific cause. 1686 1687Low-level Traps and Scheduling 1688------------------------------ 1689 1690The following system calls are available in ``curio.traps``, but not typically used 1691directly in user code. They are used to implement higher level 1692objects such as locks, socket wrappers, and so forth. If you find 1693yourself using these, you're probably doing something wrong--or 1694implementing a new Curio primitive. 1695 1696Unless otherwise indicated, all traps are potentially blocking and 1697may raise a cancellation exception. 1698 1699.. list-table:: 1700 :widths: 50 50 1701 :header-rows: 0 1702 1703 * - ``await _read_wait(fileobj)`` 1704 - Sleep until data is available for reading on 1705 *fileobj*. *fileobj* is any file-like object with a `fileno()` 1706 method. 1707 * - ``await _write_wait(fileobj)`` 1708 - Sleep until data can be written on *fileobj*. 1709 *fileobj* is any file-like object with a `fileno()` method. 1710 * - ``await _io_release(fileobj)`` 1711 - Release any kernel resources associated with *fileobj*. Should 1712 be called prior to closing any file. 1713 * - ``await _io_waiting(fileobj)`` 1714 - Returns a tuple `(rtask, wtask)` of tasks currently sleeping on 1715 *fileobj* (if any). Returns immediately. 1716 * - ``await _future_wait(fut)`` 1717 - Sleep until a result is set on *fut*. *fut* 1718 is an instance of :py:class:`concurrent.futures.Future`. 1719 * - ``await _cancel_task(task, exc=TaskCancelled, val=None)`` 1720 - Cancel *task*. Returns immediately. *exc* and *val* 1721 specify the exception type and value. 1722 * - ``await _scheduler_wait(sched, state_name)`` 1723 - Go to sleep on a kernel scheduler primitive. *sched* is an 1724 instance of ``curio.sched.SchedBase``. *state_name* is the name of 1725 the wait state (used in debugging). 1726 * - ``await _scheduler_wake(sched, n=1, value=None, exc=None)`` 1727 - Reschedule one or more tasks from a kernel scheduler primitive. *n* is the 1728 number of tasks to release. *value* and *exc* specify the return 1729 value or exception to raise in the task when it resumes execution. 1730 Returns immediately. 1731 * - ``await _get_kernel()`` 1732 - Get a reference to the running ``Kernel`` object. Returns immediately. 1733 * - ``await _get_current()`` 1734 - Get a reference to the currently running ``Task`` instance. Returns immediately. 1735 * - ``await _sleep(seconds)`` 1736 - Sleep for a given number of seconds. 1737 * - ``await _set_timeout(seconds)`` 1738 - Set a timeout in the currently running task. Returns immediately 1739 with the previous timeout (if any) 1740 * - ``await _unset_timeout(previous)`` 1741 - Unset a timeout in the currently running task. *previous* is the 1742 value returned by the _set_timeout() call used to set the timeout. 1743 Returns immediately. 1744 * - ``await _clock()`` 1745 - Immediately returns the current monotonic clock value. 1746 1747Again, you're unlikely to use any of these functions directly. 1748However, here's a small taste of how they get used. For example, the 1749:meth:`curio.io.Socket.recv` method looks roughly like this:: 1750 1751 class Socket: 1752 ... 1753 def recv(self, maxbytes): 1754 while True: 1755 try: 1756 return self._socket.recv(maxbytes) 1757 except BlockingIOError: 1758 await _read_wait(self._socket) 1759 ... 1760 1761This method first tries to receive data. If none is available, the 1762:func:`_read_wait` call is used to put the task to sleep until reading 1763can be performed. When it awakes, the receive operation is 1764retried. Just to emphasize, the :func:`_read_wait` doesn't actually 1765perform any I/O. It's just scheduling a task for it. 1766 1767The ``_scheduler_wait()`` and ``_scheduler_wake()`` traps are used to 1768implement high-level synchronization and queuing primitives. The 1769``sched`` argument to these calls is an instance of a class that 1770inherits from ``SchedBase`` defined in the ``curio.sched`` submodule. 1771The following specific classes are defined: 1772 1773.. class:: SchedFIFO 1774 1775 A scheduling FIFO queue. Used to implement locks and queues. 1776 1777.. class:: SchedBarrier 1778 1779 A scheduling barrier. Used to implement events. 1780 1781The following public methods are defined on an instance ``s`` of these classes: 1782 1783.. list-table:: 1784 :widths: 40 60 1785 :header-rows: 0 1786 1787 * - ``await s.suspend(reason)`` 1788 - Suspend the calling task. ``reason`` is a string describing why. 1789 * - ``await s.wake(n=1)`` 1790 - Wake one or more suspended tasks. 1791 * - ``len(s)`` 1792 - Number of tasks suspended. 1793 1794Here is an example of how a scheduler primitive is used to implement an ``Event``:: 1795 1796 from curio.sched import SchedBarrier 1797 1798 class Event: 1799 def __init__(self): 1800 self._value = 0 1801 self._sched = SchedBarrier() 1802 1803 async def wait(self): 1804 if self._value == 0: 1805 await self._sched.suspend('EVENT_WAIT') 1806 1807 async def set(self): 1808 self._value = 1 1809 await self._sched.wake(len(self._sched)) 1810 1811 1812Debugging and Diagnostics 1813------------------------- 1814 1815Curio provides a few facilities for basic debugging and diagnostics. If you 1816print a ``Task`` instance, it will tell you the name of the associated 1817coroutine along with the current file/linenumber of where the task is currently 1818executing. The output might look similar to this:: 1819 1820 Task(id=3, name='child', state='TIME_SLEEP') at filename.py:9 1821 1822You can additionally use the ``Task.traceback()`` method to create a current 1823stack traceback of any given task. For example:: 1824 1825 t = await spawn(coro) 1826 ... 1827 print(t.traceback()) 1828 1829Instead of a full traceback, you can also get the current filename and line number:: 1830 1831 filename, lineno = await t.where() 1832 1833To find out more detailed information about what the kernel is doing, you can 1834supply one or more debugging modules to the ``run()`` function. To trace 1835all task scheduling events, use the ``schedtrace`` debugger as follows:: 1836 1837 from curio.debug import schedtrace 1838 run(coro, debug=schedtrace) 1839 1840To additionally include information on low-level kernel traps, use the ``traptrace`` debugger 1841instead:: 1842 1843 from curio.debug import traptrace 1844 run(coro, debug=traptrace) 1845 1846To report all exceptions from crashed tasks, use the ``logcrash`` debugger:: 1847 1848 from curio.debug import logcrash 1849 run(coro, debug=logcrash) 1850 1851To report warnings about long-running tasks that appear to be stalling the 1852event loop, use the ``longblock`` debugger:: 1853 1854 from curio.debug import longblock 1855 run(coro, debug=longblock(max_time=0.1)) 1856 1857The different debuggers may be combined together if you provide a list. For example:: 1858 1859 run(coro, debug=[schedtrace, traptrace, logcrash]) 1860 1861The amount of output produced by the different debugging modules might be considerable. You 1862can filter it to a specific set of coroutine names using the ``filter`` keyword argument. 1863For example:: 1864 1865 async def spam(): 1866 ... 1867 1868 async def coro(): 1869 t = await spawn(spam) 1870 ... 1871 1872 run(coro, debug=schedtrace(filter={'spam'})) 1873 1874The logging level used by the different debuggers can be changed using the 1875``level`` keyword argument:: 1876 1877 run(coro, debug=schedtrace(level=logging.DEBUG)) 1878 1879A different ``Logger`` instance can be used using the ``log`` keyword argument:: 1880 1881 import logging 1882 run(coro, debug=schedtrace(log=logging.getLogger('spam'))) 1883 1884Be aware that all diagnostic logging is synchronous. As such, all 1885logging operations might temporarily block the event loop--especially 1886if logging output involves file I/O or network operations. If this is 1887a concern, you should take steps to mitigate it in the configuration 1888of logging. For example, you might use the ``QueueHandler`` and 1889``QueueListener`` objects from the ``logging`` module to offload log 1890handling to a separate thread. 1891 1892 1893 1894 1895