1Reference Manual
2================
3
4Coroutines
5----------
6
7Curio executes coroutines.  A coroutine is a function defined using
8``async def``::
9
10    async def hello(name):
11          return 'Hello ' + name
12
13Coroutines call other coroutines using ``await``::
14
15    async def main(name):
16          s = await hello(name)
17          print(s)
18
19Coroutines never run on their own.
20They always execute under the supervision of a manager (e.g., an
21event-loop, a kernel, etc.).  In Curio, the initial coroutine is
22executed using ``run()``::
23
24    import curio
25    curio.run(main, 'Guido')
26
27When executing, a coroutine is encapsulated by a "Task."
28
29Basic Execution
30---------------
31
32The following function runs an initial coroutine:
33
34.. function:: run(corofunc, *args, debug=None, selector=None, with_monitor=False, taskcls=Task)
35
36   Run *corofunc* and return its result.  *args* are the arguments
37   provided to *corofunc*.  *with_monitor* enables the task monitor.
38   *selector* is an optional selector from the :mod:`selectors
39   <python:selectors>` standard library. *debug* is a list of
40   debugging features (see the section on debugging).  *taskcls* is
41   the class used to encapsulate coroutines.  If ``run()`` is called
42   when a task is already running, a ``RuntimeError`` is raised.
43
44If you are going to repeatedly execute coroutines one after the other, it
45is more efficient to create a ``Kernel`` instance and submit
46them using the ``run()`` method.
47
48.. class:: Kernel(selector=None, debug=None, taskcls=Task):
49
50   Create a runtime kernel. The arguments are the same
51   as described above for :func:`run()`.
52
53There is only one method that may be used on a :class:`Kernel` instance.
54
55.. method:: Kernel.run(corofunc=None, *args, shutdown=False)
56
57   Run *corofunc* and return its result.
58   *args* are the arguments given to *corofunc*.  If
59   *shutdown* is ``True``, the kernel cancels all remaining tasks
60   and performs a clean shutdown upon return. Calling this method with *corofunc*
61   set to ``None`` executes a single scheduling cycle of background tasks
62   before returning immediately. Raises a
63   ``RuntimeError`` if called on an already running kernel or if an attempt is
64   made to run more than one kernel in the same thread.
65
66A kernel is commonly used as a context manager. For example::
67
68    with Kernel() as kernel:
69        kernel.run(corofunc1)
70        kernel.run(corofunc2)
71        ...
72    # Kernel shuts down here
73
74When submitting work, you can either provide an async
75function and arguments or you can provide an already instantiated
76coroutine.  Both of these ``run()`` invocations work::
77
78    async def hello(name):
79        print('hello', name)
80
81    run(hello, 'Guido')    # Preferred
82    run(hello('Guido'))    # Ok
83
84This convention is observed by nearly all other functions that accept
85coroutines (e.g., spawning tasks, waiting for timeouts, etc.).
86
87Tasks
88-----
89
90The following functions manage the execution of concurrent tasks.
91
92.. asyncfunction:: spawn(corofunc, *args, daemon=False)
93
94   Create a new task that concurrently executes the async function *corofunc*.  *args*
95   are the arguments provided to *corofunc*. Returns a :class:`Task`
96   instance as a result.  The *daemon* option specifies
97   that the task is never joined and that its result may be
98   disregarded.
99
100.. asyncfunction:: current_task()
101
102   Returns the :class:`Task` instance corresponding to the caller.
103
104:func:`spawn` and :func:`current_task` return a :class:`Task` instance ``t``
105with the following methods and attributes:
106
107.. list-table::
108   :widths: 40 60
109   :header-rows: 0
110
111   * - ``await t.join()``
112     - Wait for the task to terminate and return its result.
113       Raises :exc:`curio.TaskError` if the task failed with an
114       exception. The ``__cause__`` attribute
115       contains the actual exception raised by the task when it crashed.
116   * - ``await t.wait()``
117     - Waits for task to terminate, but returns no value.
118
119   * - ``await t.cancel(*, blocking=True, exc=TaskCancelled)``
120     - Cancels the task by raising a :exc:`curio.TaskCancelled` exception
121       (or the exception specified by *exc*). If ``blocking=True`` (the
122       default), waits for the task to actually terminate.  A task may
123       only be cancelled once.  If invoked more than once, the second
124       request waits until the task is cancelled from the first request.
125       If the task has already terminated, this method does nothing and
126       returns immediately.  Note: uncaught exceptions that occur as a
127       result of cancellation are logged, but not propagated out of the
128       ``Task.cancel()`` method.
129
130   * - ``t.traceback()``
131     -  Creates a stack traceback string.  Useful for debugging.
132   * - ``t.where()``
133     - Return (filename, lineno) where the task is executing.
134   * - ``t.id``
135     - The task's integer id. Monotonically increases.
136   * - ``t.coro``
137     - The coroutine associated with the task.
138   * - ``t.daemon``
139     - Boolean flag that indicates whether or not a task is daemonic.
140   * - ``t.state``
141     - The name of the task's current state.  Useful for debugging.
142   * - ``t.cycles``
143     - The number of scheduling cycles the task has completed.
144   * - ``t.result``
145     - A property holding the task result. If accessed before the task terminates,
146       a ``RuntimeError`` exception is raised. If a task crashed with an exception,
147       that exception is reraised on access.
148   * - ``t.exception``
149     - Exception raised by a task, if any.  ``None`` otherwise.
150   * - ``t.cancelled``
151     - A boolean flag that indicates whether or not the task was cancelled.
152   * - ``t.terminated``
153     - A boolean flag that indicates whether or not the task has terminated.
154
155Task Groups
156-----------
157
158Tasks may be grouped together to better manage their execution and
159collect results.  To do this, create a ``TaskGroup`` instance.
160
161.. class:: TaskGroup(tasks=(), *, wait=all)
162
163   A class representing a group of executing tasks.  *tasks* is an
164   optional set of existing tasks to put into the group.
165   *wait* specifies the policy used by ``join()`` to wait for tasks. If *wait* is
166   ``all``, then wait for all tasks to complete.  If *wait* is
167   ``any`` then wait for any task to terminate and cancel any
168   remaining tasks.  If *wait* is ``object``, then wait for any task
169   to return a non-None object, cancelling all remaining
170   tasks afterwards. If *wait* is ``None``, then immediately cancel all running tasks.
171   Task groups do not form a hierarchy or have any kind of relationship to
172   other previously created task groups or tasks.  Moreover, Tasks created by
173   the top level ``spawn()`` function are not placed into any task group.
174   To create a task in a group, it should be created using ``TaskGroup.spawn()``
175   or explicitly added using ``TaskGroup.add_task()``.
176
177The following methods and attributes are supported on a ``TaskGroup`` instance ``g``:
178
179.. list-table::
180   :widths: 40 60
181   :header-rows: 0
182
183   * - ``await g.spawn(corofunc, *args, daemon=False)``
184     - Create a new task in the group. Returns a ``Task`` instance.
185       *daemon* specifies whether or not the result of the task is
186       disregarded.  Daemonic tasks are both ignored and cancelled by the ``join()``
187       method.
188   * - ``await g.add_task(task)``
189     - Add an already existing task to the group.
190   * - ``await g.next_done()``
191     - Wait for and return the next completed task. Return ``None`` if no more tasks remain.
192   * - ``await g.next_result()``
193     - Wait for and return the result of the next completed task.
194       If the task failed with an exception, the exception is raised.
195   * - ``await g.join()``
196     - Wait for all tasks in the group to terminate according to the wait policy
197       set for the group.  If any of the monitored tasks exits with an exception or
198       if the ``join()`` operation itself is cancelled, all remaining tasks in the
199       group are cancelled. If a ``TaskGroup`` is used as a
200       context manager, the ``join()`` method is called on block exit.
201   * - ``await g.cancel_remaining()``
202     - Cancel and remove all remaining non-daemonic tasks from the group.
203   * - ``g.completed``
204     - The first task that completed with a valid result after calling ``join()``.
205   * - ``g.result``
206     - The result of the first task that completed after calling ``join()``.
207       May raise an exception if the task exited with an exception.
208   * - ``g.exception``
209     - Exception raised by the first task that completed (if any).
210   * - ``g.results``
211     - A list of all results collected by ``join()``,
212       ordered by task id. May raise an exception if any task
213       exited with an exception.
214   * - ``g.exceptions``
215     - A list of all exceptions collected by ``join()``.
216   * - ``g.tasks``
217     - A list of all non-daemonic tasks managed by the group, ordered by task id.
218       Does not include tasks where ``Task.join()`` or ``Task.cancel()``
219       has been directly called already.
220
221The preferred way to use a ``TaskGroup`` is as a context manager.
222Here are a few common usage patterns::
223
224    # Spawn multiple tasks and collect all of their results
225    async with TaskGroup(wait=all) as g:
226        await g.spawn(coro1)
227        await g.spawn(coro2)
228        await g.spawn(coro3)
229    print('Results:', g.results)
230
231    # Spawn multiple tasks and collect the result of the first one
232    # that completes--cancelling other tasks
233    async with TaskGroup(wait=any) as g:
234        await g.spawn(coro1)
235        await g.spawn(coro2)
236        await g.spawn(coro3)
237    print('Result:', g.result)
238
239    # Spawn multiple tasks and collect their results as they complete
240    async with TaskGroup() as g:
241        await g.spawn(coro1)
242        await g.spawn(coro2)
243        await g.spawn(coro3)
244        async for task in g:
245            print(task, 'completed.', task.result)
246
247In these examples, access to the ``result`` or ``results`` attribute
248may raise an exception if a task failed for some reason.
249
250If an exception is raised inside the task group context, all managed
251tasks are cancelled and the exception is propagated.  For example::
252
253    try:
254        async with TaskGroup() as g:
255            t1 = await g.spawn(func1)
256            t2 = await g.spawn(func2)
257            t3 = await g.spawn(func3)
258            raise RuntimeError()
259    except RuntimeError:
260        # All launched tasks will have terminated or been cancelled here
261        assert t1.terminated
262        assert t2.terminated
263        assert t3.terminated
264
265It is important to emphasize that no tasks placed in a task group survive past
266the ``join()`` operation or exit from a context manager.  This includes
267any daemonic tasks running in the background.
268
269Time
270----
271
272Curio manages time with an internal monotonic clock.  The following functions
273are provided:
274
275.. asyncfunction:: sleep(seconds)
276
277   Sleep for a specified number of seconds.  If the number of seconds is 0,
278   execution switches to the next ready task (if any). Returns the current clock value.
279
280.. asyncfunction:: clock()
281
282   Returns the current value of the monotonic clock.  Use this to get a
283   base clock value for the ``wake_at()`` function.
284
285Timeouts
286--------
287Any blocking operation can be cancelled by a timeout.
288
289.. asyncfunction:: timeout_after(seconds, corofunc=None, *args)
290
291   Execute ``corofunc(*args)`` and return its result. If no result is
292   returned before *seconds* have elapsed, a
293   :py:exc:`curio.TaskTimeout` exception is raised on the current
294   blocking operation.  If *corofunc* is ``None``, the function
295   returns an asynchronous context manager that applies a timeout to a
296   block of statements.
297
298Every call to ``timeout_after()`` must have a matching exception handler
299to catch the resulting timeout. For example::
300
301    try:
302        result = await timeout_after(10, coro, arg1, arg2)
303    except TaskTimeout:
304        ...
305
306    # Alternative (context-manager)
307    try:
308        async with timeout_after(10):
309            result = coro(arg1, arg2)
310            ...
311    except TaskTimeout:
312        ...
313
314When timeout operations are nested, the resulting ``TaskTimeout``
315exception is paired to the matching ``timeout_after()`` operation that
316produced it.  Consider this subtle example::
317
318    async def main():
319        try:
320            async with timeout_after(1):        # Expires first
321                try:
322                    async with timeout_after(5):
323                        await sleep(1000)
324                except TaskTimeout:             # (a) Does NOT match
325                    print("Inner timeout")
326        except TaskTimeout:                     # (b) Matches!
327            print("Outer timeout")
328
329    run(main)
330
331If you run this, you will see output of "Outer timeout" from the
332exception handler at (b). This is because the outer timeout is the one
333that expired.  The exception handler at (a) does not match (at that
334point, the exception being reported is
335:py:exc:`curio.TimeoutCancellationError` which indicates
336that a timeout/cancellation has occurred somewhere, but that it is NOT
337due to the inner-most timeout).
338
339If a nested ``timeout_after()`` is used without a matching except
340clause, a timeout is reported as a
341:py:exc:`curio.UncaughtTimeoutError` exception.  Remember that all
342timeouts should have a matching exception handler.
343
344If you don't care about exception handling, you can also use the following
345functions:
346
347.. asyncfunction:: ignore_after(seconds, corofunc=None, *args, timeout_result=None)
348
349   Execute ``corofunc(*args)`` and return its result. If *seconds* elapse, the
350   operation is cancelled with a :py:exc:`curio.TaskTimeout` exception, but
351   the exception is discarded and the value of *timeout_result* is returned.
352   If *corofunc* is ``None``, returns an asynchronous context manager that
353   applies a timeout to a block of statements.  For this case, the resulting
354   context manager object has an ``expired`` attribute set to ``True`` if
355   time expired.
356
357Here are some examples::
358
359    result = await ignore_after(5, coro, args)
360    if result is None:
361        # Timeout occurred (if you care)
362        ...
363
364    # Execute multiple statements with a 5 second timeout
365    async with ignore_after(5) as s:
366        await coro1(args)
367        await coro2(args)
368
369    if s.expired:
370        # Timeout occurred
371        ...
372
373The ``ignore_after()`` function is just a convenience layer to simplify exception
374handling. All of the timeout-related functions can be composed and layered
375together in any configuration and it should still work.
376
377Cancellation Control
378--------------------
379
380Sometimes it is necessary to disable or control cancellation on critical operations. The
381following functions can control this:
382
383.. asyncfunction:: disable_cancellation(corofunc=None, *args)
384
385   Disables the delivery of cancellation-related exceptions while
386   executing *corofunc*.  *args* are the arguments to *corofunc*.
387   The result of *corofunc* is returned.  Any pending cancellation
388   is delivered to the first-blocking operation after
389   cancellation is reenabled.  If *corofunc* is ``None``, a
390   context manager is returned that shields a block of
391   statements from cancellation.
392
393.. asyncfunction:: check_cancellation(exc=None)
394
395   Explicitly check if a cancellation is pending for the calling task.  If
396   cancellation is enabled, any pending exception is raised
397   immediately.  If cancellation is disabled, it returns the pending
398   cancellation exception instance (if any) or ``None``.  If ``exc``
399   is supplied and it matches the type of the pending exception, the
400   exception is returned and any pending cancellation exception is
401   cleared.
402
403.. asyncfunction:: set_cancellation(exc)
404
405   Set the pending cancellation exception for the calling task to ``exc``.
406   If cancellation is enabled, it will be raised immediately on the next
407   blocking operation.  Returns any previously set, but pending cancellation
408   exception.
409
410A common use of these functions is to more precisely control cancellation
411points. Here is an example that shows how to check for cancellation at
412a specific code location (a)::
413
414    async def coro():
415        async with disable_cancellation():
416            while True:
417                await coro1()
418                await coro2()
419                if await check_cancellation():    # (a)
420                    break   # Bail out!
421
422        await check_cancellation()  # Cancellation (if any) delivered here
423
424If you only need to shield a single operation, you can write statements like this::
425
426    async def coro():
427        ...
428        await disable_cancellation(some_operation, x, y, z)
429        ...
430
431Note: It is not possible for cancellation to be reenabled inside code
432where it has been disabled.
433
434Synchronization Primitives
435--------------------------
436.. currentmodule:: None
437
438The following synchronization primitives are available. Their behavior
439is identical to their equivalents in the :mod:`threading` module.  However, none
440of these primitives are safe to use with threads.
441
442.. class:: Event()
443
444   An event object.
445
446An :class:`Event` instance ``e`` supports the following methods:
447
448.. list-table::
449   :widths: 40 60
450   :header-rows: 0
451
452   * - ``e.is_set()``
453     - Return ``True`` if set
454   * - ``e.clear()``
455     - Clear the event value
456   * - ``await e.wait()``
457     - Wait for the event to be set
458   * - ``await e.set()``
459     - Set the event. Wake all waiting tasks (if any)
460
461
462.. class:: Result()
463
464   A synchronized result object.  This is like an `Event` except
465   that it additionally carries a value or exception.
466
467An :class:`Result` instance ``r`` supports the following methods:
468
469.. list-table::
470   :widths: 40 60
471   :header-rows: 0
472
473   * - ``r.is_set()``
474     - Return ``True`` if a result value or exception has been set
475   * - ``await r.set_value(value)``
476     - Set the result value, waking any waiters.
477   * - ``await r.set_exception(exc)``
478     - Set the result exception, waking any waiters.
479   * - ``await r.unwrap()``
480     - Wait and return the set value. If an exception was set, it is reraised.
481
482``Lock``, ``RLock``, ``Semaphore`` classes that allow for mutual exclusion and
483inter-task coordination.
484
485.. class:: Lock()
486
487   A mutual exclusion lock.
488
489.. class:: RLock()
490
491   A recursive mutual-exclusion lock that can be acquired multiple times within the
492   same task.
493
494.. class:: Semaphore(value=1)
495
496   Semaphores are based on a counter.  ``acquire()`` and ``release()``
497   decrement and increment the counter respectively.  If the counter is 0,
498   ``acquire()`` blocks until the value is incremented by another task.  The ``value``
499   attribute of a semaphore is a read-only property holding the current value of the internal
500   counter.
501
502An instance ``lock`` of any of the above classes supports the following methods:
503
504.. list-table::
505   :widths: 40 60
506   :header-rows: 0
507
508   * - ``await lock.acquire()``
509     - Acquire the lock
510   * - ``await lock.release()``
511     - Release the lock.
512   * - ``lock.locked()``
513     - Return ``True`` if the lock is currently held.
514
515The preferred way to use a Lock is as an asynchronous context manager. For example::
516
517    import curio
518    lock = curio.Lock()
519
520    async def sometask():
521        async with lock:
522            print("Have the lock")
523            ...
524
525
526.. class:: Condition(lock=None)
527
528   Condition variable.  *lock* is the underlying lock to use. If ``None``, then
529   a :class:`Lock` object is used.
530
531An instance ``cv`` of :class:`Condition`  supports the following methods:
532
533
534.. list-table::
535   :widths: 40 60
536   :header-rows: 0
537
538   * - ``await cv.acquire()``
539     - Acquire the underlying lock
540   * - ``await cv.release()``
541     - Release the underlying lock.
542   * - ``cv.locked()``
543     - Return ``True`` if the lock is currently held.
544   * - ``await cv.wait()``
545     - Wait on the condition variable. Releases the underlying lock.
546   * - ``await cv.wait_for(pred)``
547     - Wait on the condition variable until a supplied predicate function returns ``True``.
548       ``pred`` is a callable that takes no arguments.
549   * - ``await cv.notify(n=1)``
550     - Notify one or more tasks, cause them to wake from ``cv.wait()``.
551   * - ``await cv.notify_all()``
552     - Notify all waiting tasks.
553
554Proper use of a condition variable is tricky. The following example shows how to implement
555producer-consumer synchronization on top of a ``collections.deque`` object::
556
557    import curio
558    from collections import deque
559
560    async def consumer(items, cond):
561        while True:
562            async with cond:
563                while not items:         # (a)
564                    await cond.wait()    # Wait for items
565                item = items.popleft()
566            print('Got', item)
567
568     async def producer(items, cond):
569         for n in range(10):
570              async with cond:
571                  items.append(n)
572                  await cond.notify()
573              await curio.sleep(1)
574
575     async def main():
576         items = deque()
577         cond = curio.Condition()
578         await curio.spawn(producer, items, cond)
579         await curio.spawn(consumer, items, cond)
580
581     curio.run(main())
582
583In this code, it is critically important that the ``wait()`` and
584``notify()`` operations take place in a block where the condition
585variable has been properly acquired.  Also, the ``while``-loop at (a)
586is not a typo.  Condition variables are often used to "signal" that
587some condition has become true, but it is standard practice to re-test
588the condition before proceding (it might be the case that a
589condition was only briefly transient and by the time a notified task
590awakes, the condition no longer holds).
591
592Queues
593------
594
595To communicate between tasks, use a :class:`Queue`.
596
597.. class:: Queue(maxsize=0)
598
599   Creates a queue with a maximum number of elements in *maxsize*.  If not
600   specified, the queue can hold an unlimited number of items.
601
602An instance ``q`` of :class:`Queue` supports the following methods:
603
604.. list-table::
605   :widths: 40 60
606   :header-rows: 0
607
608   * - ``q.empty()``
609     - Return ``True`` if the queue is empty.
610   * - ``q.full()``
611     - Return ``True`` if the queue is full.
612   * - ``q.size()``
613     - Return number of items currently in the queue.
614   * - ``await q.get()``
615     - Return an item from the queue. Block if no items are available.
616   * - ``await q.put(item)``
617     - Put an item on the queue. Blocks if the queue is at capacity.
618   * - ``await q.join()``
619     - Wait for all elements to be processed.  Consumers must call
620       ``q.task_done()`` to indicate the completion of each element.
621   * - ``await q.task_done()``
622     - Indicate that the processing has finished for an item.  If all
623       items have been processed and there are tasks waiting on
624       ``q.join()``, they will be awakened.
625
626Here is an example of using queues in a producer-consumer problem::
627
628    import curio
629
630    async def producer(queue):
631        for n in range(10):
632            await queue.put(n)
633        await queue.join()
634        print('Producer done')
635
636    async def consumer(queue):
637        while True:
638            item = await queue.get()
639            print('Consumer got', item)
640            await queue.task_done()
641
642    async def main():
643        q = curio.Queue()
644        prod_task = await curio.spawn(producer(q))
645        cons_task = await curio.spawn(consumer(q))
646        await prod_task.join()
647        await cons_task.cancel()
648
649    curio.run(main())
650
651The following variants of the basic :class:`Queue` class are also provided:
652
653.. class:: PriorityQueue(maxsize=0)
654
655   Creates a priority queue with a maximum number of elements in *maxsize*.
656   The priority of items is determined by standard relational operators
657   such as ``<`` and ``<=``.   Lowest priority items are returned first.
658
659.. class:: LifoQueue(maxsize=0)
660
661   A queue with "Last In First Out" retrieval policy. In other words, a stack.
662
663
664Universal Synchronizaton
665------------------------
666
667Sometimes it is necessary to synchronize Curio with threads and foreign event loops.
668For this, use the following queue and event classes.
669
670.. class:: UniversalQueue(maxsize=0, withfd=False)
671
672   A queue that can be simultaneously used from Curio tasks, threads,
673   and ``asyncio``. The same programming API is used in all
674   environments, but ``await`` is required for asynchronous
675   operations.  If used to coordinate Curio and ``asyncio``, they must
676   be executing in separate threads.  The ``withfd`` option specifies
677   whether or not the queue should optionally set up an I/O loopback
678   that allows it to be polled by a foreign event loop.  When
679   ``withfd`` is ``True``, adding something to the queue writes a
680   single byte of data to the I/O loopback.  Removing an item with ``get()``
681   reads this byte.
682
683.. class:: UniversalEvent()
684
685   An event object that can be used from Curio tasks, threads, and
686   ``asyncio``. The same programming interface is used in both.
687   Asynchronous operations must be prefaced by ``await``.  If used
688   to coordinate Curio and ``asyncio``, they must be executing in
689   separate threads.
690
691.. class:: UniversalResult()
692
693   A result object that can be used from Curio tasks, threads, and
694   ``asyncio``.  A result is somewhat similar to an event, but it
695   additionally carries an attached value or exception.  To set the
696   result, use ``set_value()`` or ``set_exception()``.  To return the
697   result, blocking if necessary, use ``unwrap()``.  If used in an
698   asynchronous environment, these operations must be prefaced by
699   ``await``.  If used to coordinate Curio and ``asyncio``, they must
700   be executing in separate threads.  A ``UniversalResult()`` is
701   somewhat similar to a ``Future`` in usage, but it has a much more
702   restrictive API.
703
704Here is an example of a producer-consumer problem with a ``UniversalQueue``
705involving Curio, threads, and ``asyncio`` all running at once::
706
707    from curio import run, UniversalQueue, spawn, run_in_thread
708
709    import time
710    import threading
711    import asyncio
712
713    # An async task
714    async def consumer(name, q):
715        print(f'{name} consumer starting')
716        while True:
717            item = await q.get()
718            if item is None:
719                break
720            print(f'{name} got: {item}')
721            await q.task_done()
722        print(f'{name} consumer done')
723        await q.put(None)
724
725    # A threaded producer
726    def producer(q):
727        for i in range(10):
728            q.put(i)
729            time.sleep(1)
730        q.join()
731        print('Producer done')
732
733    async def main():
734        q = UniversalQueue()
735        # A Curio consumer
736        t1 = await spawn(consumer('curio', q))
737
738        # An asyncio consumer
739        t2 = threading.Thread(target=asyncio.run, args=[consumer('asyncio', q)])
740        t2.start()
741
742        # A threaded producer
743        t3 = threading.Thread(target=producer, args=[q])
744        t3.start()
745        await run_in_thread(t3.join)
746
747        # Shutdown with a sentinel
748        await q.put(None)
749        await t1.join()
750        await run_in_thread(t2.join)
751
752    run(main())
753
754In this code, the ``consumer()`` coroutine is used simultaneously in
755Curio and ``asyncio``. ``producer()`` is an ordinary thread.
756
757Here is example that has Curio wait for the result produced
758by a thread using a ``UniversalResult`` object::
759
760    import threading
761    import curio
762    import time
763
764    def worker(x, y, result):
765        time.sleep(10)
766	try:
767            result.set_value(x+y)
768	except Exception as err:
769	    result.set_exception(err)
770
771    async def main():
772        result = curio.UniversalResult()
773	threading.Thread(target=worker, args=[2,3,result]).start()
774	print("Result:", await result.unwrap())
775
776    curio.run(main)
777
778When in doubt, queues, events, and results are the preferred mechanism
779of coordinating Curio with foreign environments.  Higher-level
780abstractions can often be built from these.
781
782Blocking Operations and External Work
783-------------------------------------
784
785Sometimes you need to perform work that takes a long time to complete
786or otherwise blocks the progress of other tasks. This includes
787CPU-intensive calculations and blocking operations carried out by
788foreign libraries.  Use the following functions to do that:
789
790.. asyncfunction:: run_in_process(callable, *args)
791
792   Run ``callable(*args)`` in a separate process and returns the
793   result.  If cancelled, the underlying worker process is immediately
794   cancelled by a ``SIGTERM`` signal.  The given callable executes in
795   an entirely independent Python interpreter and there is no shared
796   global state. The separate process is launched using the "spawn"
797   method of the ``multiprocessing`` module.
798
799.. asyncfunction:: run_in_thread(callable, *args)
800
801   Run ``callable(*args)`` in a separate thread and return
802   the result.  If the calling task is cancelled, the underlying
803   worker thread (if started) is set aside and sent a termination
804   request.  However, since there is no underlying mechanism to
805   forcefully kill threads, the thread won't recognize the termination
806   request until it runs the requested work to completion.  It's
807   important to note that a cancellation won't block other tasks
808   from using threads. Instead, cancellation produces a kind of
809   "zombie thread" that executes the requested work, discards the
810   result, and then disappears.  For reliability, work submitted to
811   threads should have a timeout or some other mechanism that
812   puts a bound on execution time.
813
814.. asyncfunction:: block_in_thread(callable, *args)
815
816   The same as ``run_in_thread()``, but guarantees that only
817   one background thread is used for each unique callable
818   regardless of how many tasks simultaneously try to
819   carry out the same operation at once.  Only use this function if there is
820   an expectation that the provided callable is going to
821   block for an undetermined amount of time and that there
822   might be a large amount of contention from multiple tasks on the same
823   resource.  The primary use is on waiting operations involving
824   foreign locks and queues.  For example, if you launched a hundred
825   Curio tasks and they all decided to block on a shared thread queue,
826   using this would be much more efficient than ``run_in_thread()``.
827
828.. asyncfunction:: run_in_executor(exc, callable, *args)
829
830   Run ``callable(*args)`` callable in a user-supplied
831   executor and returns the result. *exc* is an executor from the
832   :py:mod:`concurrent.futures` module in the standard library.  This
833   executor is expected to implement a
834   :meth:`~concurrent.futures.Executor.submit` method that executes
835   the given callable and returns a
836   :class:`~concurrent.futures.Future` instance for collecting its
837   result.
838
839
840When performing external work, it's almost always better to use the
841:func:`run_in_process` and :func:`run_in_thread` functions instead
842of :func:`run_in_executor`.  These functions have no external library
843dependencies, have less communication overhead, and more
844predictable cancellation semantics.
845
846The following values in :mod:`curio.workers` define how many
847worker threads and processes are used.  If you are going to
848change these values, do it before any tasks are executed.
849
850.. data:: MAX_WORKER_THREADS
851
852   Specifies the maximum number of threads used by a single kernel
853   using the :func:`run_in_thread` function.  Default value is 64.
854
855.. data:: MAX_WORKER_PROCESSES
856
857   Specifies the maximum number of processes used by a single kernel
858   using the :func:`run_in_process` function. Default value is the
859   number of CPUs on the host system.
860
861I/O Classes
862-----------
863
864I/O in Curio is managed by a collection of classes in :mod:`curio.io`.
865These classes act as asynchronous proxies around sockets, streams, and
866ordinary files.  The programming interface is meant to be the same as
867in normal synchronous Python code.
868
869Socket
870^^^^^^
871
872The :class:`Socket` class wraps an existing socket-like object with
873an async interface.
874
875.. class:: Socket(sockobj)
876
877   Creates a proxy around an existing socket *sockobj*.  *sockobj* is
878   put in non-blocking mode when wrapped. *sockobj* is not closed unless
879   the created ``Socket`` instance is explicitly closed or used as a
880   context manager.
881
882The following methods are redefined on an instance ``s`` of :class:`Socket`.
883
884.. list-table::
885   :widths: 60 40
886   :header-rows: 0
887
888   * - ``await s.recv(maxbytes, flags=0)``
889     - Receive up to *maxbytes* of data.
890   * - ``await s.recv_into(buffer, nbytes=0, flags=0)``
891     - Receive up to *nbytes* of data into a buffer.
892   * - ``await s.recvfrom(maxsize, flags=0)``
893     - Receive up to *maxbytes* of data. Returns a tuple ``(data, client_address)``.
894   * - ``await s.recvfrom_into(buffer, nbytes=0, flags=0)``
895     - Receive up to *nbytes* of data into a buffer.
896   * - ``await s.recvmsg(bufsize, ancbufsize=0, flags=0)``
897     - Receive normal and ancillary data.
898   * - ``await s.recvmsg_into(buffers, ancbufsize=0, flags=0)``
899     - Receive normal and ancillary data into a buffer.
900   * - ``await s.send(data, flags=0)``
901     - Send data.  Returns the number of bytes sent.
902   * - ``await s.sendall(data, flags=0)``
903     - Send all of the data in *data*. If cancelled, the ``bytes_sent`` attribute of the
904       exception contains the number of bytes sent.
905   * - ``await s.sendto(data, address)``
906     - Send data to the specified address.
907   * - ``await s.sendto(data, flags, address)``
908     - Send data to the specified address (alternate).
909   * - ``await s.sendmsg(buffers, ancdata=(), flags=0, address=None)``
910     - Send normal and ancillary data to the socket.
911   * - ``await s.accept()``
912     - Wait for a new connection.  Returns a tuple ``(sock, address)`` where ``sock``
913       is an instance of ``Socket``.
914   * - ``await s.connect(address)``
915     - Make a connection.
916   * - ``await s.connect_ex(address)``
917     - Make a connection and return an error code instead of raising an exception.
918   * - ``await s.close()``
919     - Close the connection.
920   * - ``await s.shutdown(how)``
921     - Shutdown the socket.  *how* is one of
922       ``SHUT_RD``, ``SHUT_WR``, or ``SHUT_RDWR``.
923   * - ``await s.do_handshake()``
924     - Perform an SSL client handshake (only on SSL sockets).
925   * - ``s.makefile(mode, buffering=0)``
926     - Make a :class:`curio.io.FileStream` instance wrapping the socket.
927       Prefer to use :meth:`Socket.as_stream` instead. Not supported on Windows.
928   * - ``s.as_stream()``
929     - Wrap the socket as a stream using :class:`curio.io.SocketStream`.
930   * - ``s.blocking()``
931     -  A context manager that returns the internal socket placed into blocking mode.
932
933Any socket method not listed here (e.g., ``s.setsockopt()``) will be
934delegated directly to the underlying socket as an ordinary method.
935:class:`Socket` objects may be used as an asynchronous context manager
936which cause the underlying socket to be closed when done.
937
938Streams
939^^^^^^^
940
941A stream is an asynchronous file-like object that wraps around an
942object that natively implements non-blocking I/O.  Curio implements
943two basic classes:
944
945.. class:: FileStream(fileobj)
946
947   Create a file-like wrapper around an existing file as might be
948   created by the built-in ``open()`` function or
949   ``socket.makefile()``.  *fileobj* must be in in binary mode and
950   must support non-blocking I/O.  The file is placed into
951   non-blocking mode using ``os.set_blocking(fileobj.fileno())``.
952   *fileobj* is not closed unless the resulting instance is explicitly
953   closed or used as a context manager.  Not supported on Windows.
954
955.. class:: SocketStream(sockobj)
956
957   Create a file-like wrapper around a socket.  *sockobj* is an
958   existing socket-like object.  The socket is put into non-blocking mode.
959   *sockobj* is not closed unless the resulting instance is explicitly
960   closed or used as a context manager.  Instantiated by ``Socket.as_stream()``.
961
962An instance ``s`` of either stream class implement the following methods:
963
964
965.. list-table::
966   :widths: 40 60
967   :header-rows: 0
968
969   * - ``await s.read(maxbytes=-1)``
970     - Read up to *maxbytes* of data on the file. If omitted, reads as
971       much data as is currently available.
972   * - ``await s.readall()``
973     - Return all data up to EOF.
974   * - ``await s.read_exactly(n)``
975     - Read exactly n bytes of data.
976   * - ``await s.readline()``
977     - Read a single line of data.
978   * - ``await s.readlines()``
979     - Read all of the lines.  If cancelled, the ``lines_read`` attribute of
980       the exception contains all lines read.
981   * - ``await s.write(bytes)``
982     - Write all of the data in *bytes*.
983   * - ``await s.writelines(lines)``
984     - Writes all of the lines in *lines*. If cancelled, the ``bytes_written``
985       attribute of the exception contains the total bytes written so far.
986   * - ``await s.flush()``
987     - Flush any unwritten data from buffers.
988   * - ``await s.close()``
989     - Flush any unwritten data and close the file.  Not called on garbage collection.
990   * - ``s.blocking()``
991     -  A context manager that temporarily places the stream into blocking mode and
992        returns the raw file object used internally.  Note: for
993        ``SocketStream`` this creates a file using
994        ``open(sock.fileno(), 'rb+', closefd=False)`` which is not
995        supported on Windows.
996
997Other methods (e.g., ``tell()``, ``seek()``, ``setsockopt()``, etc.) are available
998if the underlying ``fileobj`` or ``sockobj`` provides them. A ``Stream`` may be used as an asynchronous context manager.
999
1000Files
1001^^^^^
1002
1003The :mod:`curio.file` module provides an asynchronous compatible
1004replacement for the built-in ``open()`` function and associated file
1005objects.  Use this to read and write traditional files on the
1006filesystem while avoiding blocking. How this is accomplished is an
1007implementation detail (although threads are used in the initial
1008version).
1009
1010.. function:: aopen(*args, **kwargs)
1011
1012   Creates a :class:`curio.file.AsyncFile` wrapper around a traditional file object as
1013   returned by Python's builtin ``open()`` function.   The arguments are exactly the
1014   same as for ``open()``.  The returned file object must be used as an asynchronous
1015   context manager.
1016
1017.. class:: AsyncFile(fileobj)
1018
1019   This class represents an asynchronous file as returned by the ``aopen()``
1020   function.  Normally, instances are created by the ``aopen()`` function.
1021   However, it can be wrapped around an already-existing file object.
1022
1023The following methods are redefined on :class:`AsyncFile` objects to be
1024compatible with coroutines.  Any method not listed here will be
1025delegated directly to the underlying file.  These methods take the same arguments
1026as the underlying file object.  Be aware that not all of these methods are
1027available on all kinds of files (e.g., ``read1()``, ``readinto()`` and similar
1028methods are only available in binary-mode files).
1029
1030.. list-table::
1031   :widths: 50 50
1032   :header-rows: 0
1033
1034   * - ``await f.read(maxbytes=-1)``
1035     - Read up to *maxbytes* of data on the file. If omitted, reads as
1036       much data as is currently available.
1037   * - ``await f.read1(maxbytes=-1)``
1038     - Same as ``read()``, but uses a single system call.
1039   * - ``await f.readline(maxbytes=-1)``
1040     - Read a line of input.
1041   * - ``await f.readlines(maxbytes=-1)``
1042     - Read all lines of input data
1043   * - ``await f.readinto(buffer)``
1044     - Read data into a buffer.
1045   * - ``await f.readinto1(buffer)``
1046     - Read data into a buffer using a single system call.
1047   * - ``await f.readall()``
1048     - Read all available data up to EOF.
1049   * - ``await f.write(data)``
1050     - Write data
1051   * - ``await f.writelines(lines)``
1052     - Write all lines.
1053   * - ``await f.truncate(pos=None)``
1054     - Truncate the file to a given size/position. If ``None``, file is truncated at position
1055       of current file pointer.
1056   * - ``await f.seek(offset, whence=os.SEEK_SET)``
1057     - Seek to a new file position.
1058   * - ``await f.tell()``
1059     - Report current file pointer.
1060   * - ``await f.flush()``
1061     - Flush data to a file
1062   * - ``await f.close()``
1063     - Flush remaining data and close.
1064
1065The preferred way to use an :class:`AsyncFile` object is as an asynchronous context manager.
1066For example::
1067
1068    async with aopen(filename) as f:
1069        # Use the file
1070        data = await f.read()
1071
1072:class:`AsyncFile` objects may also be used with asynchronous iteration.
1073For example::
1074
1075    async with aopen(filename) as f:
1076        async for line in f:
1077            ...
1078
1079:class:`AsyncFile` objects are intentionally incompatible with code
1080that uses files in a synchronous manner.  Partly, this is to help
1081avoid unintentional errors in your program where blocking might
1082occur without you realizing it.  If you know what you're doing and you
1083need to access the underlying file in synchronous code, use the
1084`blocking()` context manager like this::
1085
1086    async with aopen(filename) as f:
1087        ...
1088        # Pass to synchronous code (danger: might block)
1089        with f.blocking() as sync_f:
1090             # Use synchronous I/O operations
1091             data = sync_f.read()
1092             ...
1093
1094At first glance, the API to streams and files might look identical.  The
1095difference concerns internal implementation.  A stream works natively
1096with non-blocking I/O.  An ``AsyncFile`` uses a combination of threads and
1097synchronous calls to provide an async-compatible API.   Given a choice,
1098you should use streams.  However, some systems don't provide non-blocking
1099implementations of certain system calls. In those cases, an ``AsyncFile`` is a
1100fallback.
1101
1102Networking
1103----------
1104
1105Curio provides a number of submodules for different kinds of network
1106programming.
1107
1108High Level Networking
1109^^^^^^^^^^^^^^^^^^^^^
1110
1111The following functions are use to make network connections and implement
1112socket-based servers.
1113
1114.. asyncfunction:: open_connection(host, port, *, ssl=None, source_addr=None, server_hostname=None, alpn_protocols=None)
1115
1116   Creates an outgoing connection to a server at *host* and
1117   *port*. This connection is made using the
1118   :py:func:`socket.create_connection` function and might be IPv4 or
1119   IPv6 depending on the network configuration (although you're not
1120   supposed to worry about it).  *ssl* specifies whether or not SSL
1121   should be used.  *ssl* can be ``True`` or an instance of
1122   :class:`curio.ssl.SSLContext`.  *source_addr* specifies the source
1123   address to use on the socket.  *server_hostname* specifies the
1124   hostname to check against when making SSL connections.  It is
1125   highly advised that this be supplied to avoid man-in-the-middle
1126   attacks.  *alpn_protocols* specifies a list of protocol names
1127   for use with the TLS ALPN extension (RFC7301).  A typical value
1128   might be ``['h2', 'http/1.1']`` for negotiating either a HTTP/2
1129   or HTTP/1.1 connection.
1130
1131.. asyncfunction:: open_unix_connection(path, *, ssl=None, server_hostname=None, alpn_protocols=None)
1132
1133   Creates a connection to a Unix domain socket with optional SSL applied.
1134
1135.. asyncfunction:: tcp_server(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True, reuse_port=False)
1136
1137   Runs a server for receiving TCP connections on
1138   a given host and port.  *client_connected_task* is a coroutine that
1139   is to be called to handle each connection.  Family specifies the
1140   address family and is either :data:`socket.AF_INET` or
1141   :data:`socket.AF_INET6`.  *backlog* is the argument to the
1142   :py:meth:`socket.socket.listen` method.  *ssl* specifies an
1143   :class:`curio.ssl.SSLContext` instance to use. *reuse_address*
1144   specifies whether to use the ``SO_REUSEADDR`` socket option.  *reuse_port*
1145   specifies whether to use the ``SO_REUSEPORT`` socket option.
1146
1147.. asyncfunction:: unix_server(path, client_connected_task, *, backlog=100, ssl=None)
1148
1149   Runs a Unix domain server on a given
1150   path. *client_connected_task* is a coroutine to execute on each
1151   connection. *backlog* is the argument given to the
1152   :py:meth:`socket.socket.listen` method.  *ssl* is an optional
1153   :class:`curio.ssl.SSLContext` to use if setting up an SSL
1154   connection.
1155
1156.. asyncfunction:: run_server(sock, client_connected_task, ssl=None)
1157
1158   Runs a server on a given socket.  *sock* is a socket already
1159   configured to receive incoming connections.  *client_connected_task* and
1160   *ssl* have the same meaning as for the ``tcp_server()`` and ``unix_server()``
1161   functions.  If you need to perform some kind of special socket
1162   setup, not possible with the normal ``tcp_server()`` function, you can
1163   create the underlying socket yourself and then call this function
1164   to run a server on it.
1165
1166.. function:: tcp_server_socket(host, port, family=AF_INET, backlog=100, reuse_address=True, reuse_port=False)
1167
1168   Creates and returns a TCP socket. Arguments are the same as for the
1169   ``tcp_server()`` function.  The socket is suitable for use with other
1170   async operations as well as the ``run_server()`` function.
1171
1172.. function:: unix_server_socket(path, backlog=100)
1173
1174   Creates and returns a Unix socket. Arguments are the same as for the
1175   ``unix_server()`` function.  The socket is suitable for use with other
1176   async operations as well as the ``run_server()`` function.
1177
1178
1179Message Passing and Channels
1180^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1181
1182Curio provides a :class:`Channel` class that can be used to perform message
1183passing between interpreters running in separate processes.  Message passing
1184uses the same protocol as the ``multiprocessing`` standard library.
1185
1186.. class:: Channel(address, family=socket.AF_INET)
1187
1188   Represents a communications endpoint for message passing.
1189   *address* is the address and *family* is the protocol
1190   family.
1191
1192The following methods are used to establish a connection on a :class:`Channel` instance ``ch``.
1193
1194.. list-table::
1195   :widths: 50 50
1196   :header-rows: 0
1197
1198   * - ``await ch.accept(*, authkey=None)``
1199     - Wait for an incoming connection and return a
1200       :class:`Connection` instance.  *authkey* is an optional
1201       authentication key.
1202   * - ``await ch.connect(*, authkey=None)``
1203     - Make an outgoing connection and return a :class:`Connection` instance.
1204       *authkey* is an optional authentication key.
1205   * - ``ch.bind()``
1206     - Performs the address binding step of the ``accept()`` method.
1207       Use this to have the host operating system to assign a port
1208       number.  For example, use an address of ``('localhost', socket.INADDR_ANY)`` and call ``bind()``.
1209       Afterwards,  ``ch.address`` contains the assigned address.
1210   * - ``await ch.close()``
1211     - Close the channel.
1212
1213The ``connect()`` and ``accept()`` methods of :class:`Channel` instances return an
1214instance of the :class:`Connection` class:
1215
1216.. class:: Connection(reader, writer)
1217
1218   Represents a connection on which message passing of Python objects is
1219   supported.  *reader* and *writer* are I/O streams on which reading
1220   and writing are to take place (for example, instances of ``SocketStream``
1221   or ``FileStream``).
1222
1223An instance ``c`` of :class:`Connection` supports the following methods:
1224
1225.. list-table::
1226   :widths: 55 45
1227   :header-rows: 0
1228
1229   * - ``await c.close()``
1230     - Close the connection.
1231   * - ``await c.recv()``
1232     - Receive a Python object.
1233   * - ``await c.recv_bytes()``
1234     - Receive a raw message of bytes.
1235   * - ``await c.send(obj)``
1236     - Send a Python object.
1237   * - ``await c.send_bytes(buf, offset=0, size=None)``
1238     - Send a buffer of bytes as a single message.  *offset* and *size* specify
1239       an optional byte offset and size into the underlying memory buffer.
1240   * - ``await c.authenticate_server(authkey)``
1241     - Authenticate server endpoint.
1242   * - ``await c.authenticate_client(authkey)``
1243     - Authenticate client endpoint.
1244
1245A :class:`Connection` instance may also be used as a context manager.
1246
1247Here is an example of a producer program using channels::
1248
1249    # producer.py
1250    from curio import Channel, run
1251
1252    async def producer(ch):
1253        c = await ch.accept(authkey=b'peekaboo')
1254        for i in range(10):
1255            await c.send(i)
1256        await c.send(None)   # Sentinel
1257
1258    if __name__ == '__main__':
1259        ch = Channel(('localhost', 30000))
1260        run(producer(ch))
1261
1262Here is an example of a corresponding consumer program using a channel::
1263
1264    # consumer.py
1265    from curio import Channel, run
1266
1267    async def consumer(ch):
1268        c = await ch.connect(authkey=b'peekaboo')
1269        while True:
1270            msg = await c.recv()
1271            if msg is None:
1272                break
1273            print('Got:', msg)
1274
1275    if __name__ == '__main__':
1276        ch = Channel(('localhost', 30000))
1277        run(consumer(ch))
1278
1279socket module
1280^^^^^^^^^^^^^
1281
1282The :mod:`curio.socket` module provides a wrapper around selected functions in the built-in
1283:mod:`socket` module--allowing it to be used as a stand-in in
1284Curio-related code.  The module provides exactly the same
1285functionality except that certain operations have been replaced by
1286asynchronous equivalents.
1287
1288.. function:: socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)
1289
1290   Creates a :class:`curio.io.Socket` wrapper the around :class:`socket` objects created in the built-in :mod:`socket`
1291   module.  The arguments for construction are identical and have the same meaning.
1292   The resulting :class:`socket` instance is set in non-blocking mode.
1293
1294The following module-level functions have been modified so that the returned socket
1295objects are compatible with Curio:
1296
1297.. function:: socketpair(family=AF_UNIX, type=SOCK_STREAM, proto=0)
1298.. function:: fromfd(fd, family, type, proto=0)
1299.. function:: create_connection(address, source_address)
1300
1301The following module-level functions have been redefined as coroutines so that they
1302don't block the kernel when interacting with DNS.  This is accomplished through
1303the use of threads.
1304
1305.. asyncfunction:: getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)
1306.. asyncfunction:: getfqdn(name)
1307.. asyncfunction:: gethostbyname(hostname)
1308.. asyncfunction:: gethostbyname_ex(hostname)
1309.. asyncfunction:: gethostname()
1310.. asyncfunction:: gethostbyaddr(ip_address)
1311.. asyncfunction:: getnameinfo(sockaddr, flags)
1312
1313
1314ssl module
1315^^^^^^^^^^
1316
1317The :mod:`curio.ssl` module provides Curio-compatible functions for creating an SSL
1318wrapped Curio socket.  The following functions are redefined (and have the same
1319calling signature as their counterparts in the standard :mod:`ssl` module:
1320
1321.. asyncfunction:: wrap_socket(*args, **kwargs)
1322
1323.. asyncfunction:: get_server_certificate(*args, **kwargs)
1324
1325.. function:: create_default_context(*args, **kwargs)
1326
1327.. class:: SSLContext
1328
1329   A redefined and modified variant of :class:`ssl.SSLContext` so that the
1330   :meth:`wrap_socket` method returns a socket compatible with Curio.
1331
1332Don't attempt to use the :mod:`curio.ssl` module without a careful read of Python's official documentation
1333at https://docs.python.org/3/library/ssl.html.
1334
1335It is usually easier to apply SSL to a
1336connection using the high level network functions previously described.
1337For example, here's how you make an outgoing SSL connection::
1338
1339    sock = await curio.open_connection('www.python.org', 443,
1340                                       ssl=True,
1341                                       server_hostname='www.python.org')
1342
1343Here's how you create a server that uses SSL::
1344
1345    import curio
1346    from curio import ssl
1347
1348    KEYFILE = "privkey_rsa"       # Private key
1349    CERTFILE = "certificate.crt"  # Server certificat
1350
1351    async def handler(client, addr):
1352        ...
1353
1354    if __name__ == '__main__':
1355        ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1356        ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
1357        curio.run(curio.tcp_server('', 10000, handler, ssl=ssl_context))
1358
1359
1360Subprocesses
1361------------
1362
1363The :mod:`curio.subprocess` module implements the same functionality as the built-in
1364:mod:`subprocess` module.
1365
1366.. class:: Popen(*args, **kwargs)
1367
1368   A wrapper around the :class:`subprocess.Popen` class.  The same arguments are
1369   accepted. On the resulting :class:`~subprocess.Popen` instance, the
1370   :attr:`~subprocess.Popen.stdin`, :attr:`~subprocess.Popen.stdout`, and
1371   :attr:`~subprocess.Popen.stderr` file attributes have been wrapped by the
1372   :class:`curio.io.FileStream` class. You can use these in an asynchronous
1373   context.
1374
1375The following methods of :class:`Popen` have been replaced by asynchronous equivalents:
1376
1377.. asyncmethod:: Popen.wait()
1378
1379   Wait for a subprocess to exit.  Cancellation does not terminate the process.
1380
1381.. asyncmethod:: Popen.communicate(input=b'')
1382
1383   Communicate with the subprocess, sending the specified input on standard input.
1384   Returns a tuple ``(stdout, stderr)`` with the resulting output of standard output
1385   and standard error.  If cancelled, the resulting exception has ``stdout`` and
1386   ``stderr`` attributes that contain the output read prior to cancellation.
1387   Cancellation does not terminate the underlying subprocess.
1388
1389The following functions are also available.  They accept the same arguments as their
1390equivalents in the :mod:`subprocess` module:
1391
1392.. asyncfunction:: run(args, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False)
1393
1394   Run a command in a subprocess.  Returns a :class:`subprocess.CompletedProcess` instance.
1395   If cancelled, the underlying process is terminated using the process ``kill()`` method.
1396   The resulting exception will have ``stdout`` and ``stderr`` attributes containing
1397   output read prior to cancellation.
1398
1399.. asyncfunction:: check_output(args, stdout=None, stderr=None, shell=False)
1400
1401   Run a command in a subprocess and return the resulting output. Raises a
1402   :py:exc:`subprocess.CalledProcessError` exception if an error occurred.
1403   The behavior on cancellation is the same as for ``run()``.
1404
1405Here is an example of using :class:`Popen` to read streaming output off of a
1406subprocess with Curio::
1407
1408    import curio
1409    from curio import subprocess
1410
1411    async def main():
1412        p = subprocess.Popen(['ping', 'www.python.org'], stdout=subprocess.PIPE)
1413        async for line in p.stdout:
1414            print('Got:', line.decode('ascii'), end='')
1415
1416    if __name__ == '__main__':
1417        kernel = curio.Kernel()
1418        kernel.add_task(main())
1419        kernel.run()
1420
1421Asynchronous Threads
1422--------------------
1423
1424If you need to perform a lot of synchronous operations, but still
1425interact with Curio, you can launch an async-thread.
1426An asynchronous thread flips the whole world around--instead
1427of executing selected synchronous operations using ``run_in_thread()``, you
1428run everything in a thread and perform selected async operations using the
1429``AWAIT()`` function.
1430
1431To create an asynchronous thread, use ``spawn_thread()``:
1432
1433.. asyncfunction:: spawn_thread(func, *args, daemon=False)
1434
1435   Launch an asynchronous thread that runs the callable ``func(*args)``.
1436   ``daemon`` specifies if the thread runs in daemonic mode.
1437   Returns an ``AsyncThread`` instance.
1438
1439An instance ``t`` of ``AsyncThread`` supports the following methods.
1440
1441.. list-table::
1442   :widths: 55 45
1443   :header-rows: 0
1444
1445   * - ``await t.join()``
1446     - Waits for the thread to terminate, returning the final result.
1447       The final result is returned in the same manner as ``Task.join()``.
1448   * - ``await t.wait()``
1449     - Waits for the thread to terminate, but does not return any result.
1450   * - ``await t.cancel(*, blocking=True, exc=TaskCancelled)``
1451     - Cancels the asynchronous thread.  The behavior is the same as with ``Task``.
1452       Note: An asynchronous thread can only be cancelled when it performs
1453       operations using ``AWAIT()``.
1454   * - ``t.result``
1455     - The final result of the thread. If the thread crashed
1456       with an exception, that exception is reraised on access.
1457   * - ``t.exception``
1458     - The final exception (if any)
1459   * - ``t.id``
1460     - Thread ID. A monotonically increasing integer.
1461   * - ``t.terminated``
1462     - ``True`` if the thread is terminated.
1463   * - ``t.cancelled``
1464     - ``True`` if the thread was cancelled.
1465
1466Within a thread, the following function is used to execute any coroutine.
1467
1468.. function:: AWAIT(coro)
1469
1470   Execute a coroutine on behalf of an asynchronous thread.  The requested
1471   coroutine executes in Curio's main execution thread.  The caller is
1472   blocked until it completes.  If used outside of an asynchronous thread,
1473   an ``AsyncOnlyError`` exception is raised.  If ``coro`` is not a
1474   coroutine, it is returned unmodified.   The reason ``AWAIT`` is all-caps
1475   is to make it more easily heard when there are all of these coders yelling
1476   at you to just use pure async code instead of launching a thread. Also,
1477   ``await`` is a reserved keyword in Python 3.7.
1478
1479Here is an example of an asynchronous thread reading off a Curio queue::
1480
1481    from curio import run, Queue, sleep, CancelledError
1482    from curio.thread import spawn_thread, AWAIT
1483
1484    def consumer(queue):
1485        try:
1486            while True:
1487                item = AWAIT(queue.get())
1488                print('Got:', item)
1489                AWAIT(queue.task_done())
1490
1491        except CancelledError:
1492            print('Consumer goodbye!')
1493            raise
1494
1495    async def main():
1496        q = Queue()
1497        t = await spawn_thread(consumer, q)
1498
1499        for i in range(10):
1500            await q.put(i)
1501            await sleep(1)
1502
1503        await q.join()
1504        await t.cancel()
1505
1506    run(main())
1507
1508Asynchronous threads can perform any combination of blocking operations
1509including those that might involve normal thread-related primitives such
1510as locks and queues.  These operations will block the thread itself, but
1511will not block the Curio kernel loop.  In a sense, this is the whole
1512point--if you run things in an async threads, the rest of Curio is
1513protected.   Asynchronous threads can be cancelled in the same manner
1514as normal Curio tasks.  However, the same rules apply--an asynchronous
1515thread can only be cancelled on blocking operations involving ``AWAIT()``.
1516
1517Scheduler Activations
1518---------------------
1519
1520Every task in Curio goes through a life-cycle of creation, running,
1521suspension, and termination.  These steps are managed by an internal
1522scheduler.  A scheduler activation is a mechanism for monitoring these
1523steps.  To do this, you define a class that inherits from
1524:class:`Activation` in the submodule ``curio.activation``.
1525
1526.. class:: Activation
1527
1528   Base class for defining scheduler activations.
1529
1530An instance ``a`` of :class:`Activation` implements the following methods:
1531
1532.. list-table::
1533   :widths: 55 45
1534   :header-rows: 0
1535
1536   * - ``a.activate(kernel)``
1537     - Executed once upon initialization of the Curio kernel. *kernel* is
1538       a reference to the ``Kernel`` instance.
1539   * - ``a.created(task)``
1540     - Called when a new task is created.  *task* is the newly created ``Task`` instance.
1541   * - ``a.running(task)``
1542     - Called immediately prior to the execution cycle of a task.
1543   * - ``a.suspended(task, trap)``
1544     - Called when a task has suspended execution. *trap* is the trap executed.
1545   * - ``a.terminated(task)``
1546     - Called when a task has terminated execution. Note: the
1547       ``suspended()`` method is always called immediately prior to a task being terminated.
1548
1549Activations are used to implement debugging and diagnostic tools. As
1550an example, here is a scheduler activation that monitors for
1551long-execution times and reports warnings::
1552
1553    from curio.activation import Activation
1554    import time
1555
1556    class LongBlock(Activation):
1557        def __init__(self, maxtime):
1558            self.maxtime = maxtime
1559
1560        def running(self, task):
1561            self.start = time.time()
1562
1563        def suspended(self, task, trap):
1564            end = time.time()
1565            if end - self.start > self.maxtime:
1566                print(f'Long blocking in {task.name}: {end - self.start}')
1567
1568Scheduler activations are registered when a ``Kernel`` is created or with the
1569top-level ``run()`` function::
1570
1571    kern = Kernel(activations=[LongBlock(0.05)])
1572    with kern:
1573        kern.run(coro)
1574
1575    # Alternative
1576    run(coro, activations=[LongBlock(0.05)])
1577
1578Asynchronous Metaprogramming
1579----------------------------
1580
1581The :mod:`curio.meta` module provides some functions that might be useful if
1582implementing more complex programs and APIs involving coroutines.
1583
1584.. function:: curio_running():
1585
1586   Return ``True`` if Curio is running in the current thread.
1587
1588.. function:: iscoroutinefunction(func)
1589
1590   True ``True`` if the supplied *func* is a coroutine function or is known
1591   to resolve into a coroutine.   Unlike a similar function in ``inspect``,
1592   this function knows about ``functools.partial()``, awaitable objects,
1593   and async generators.
1594
1595.. function:: instantiate_coroutine(corofunc, *args, **kwargs)
1596
1597   Instantiate a coroutine from *corofunc*. If *corofunc* is already
1598   a coroutine object, it is returned unmodified.  If it's a coroutine
1599   function, it's executed within an async context using the given
1600   arguments.  If it's not a coroutine, *corofunc* is called
1601   with the given arguments with the expectation that whatever is
1602   returned will be a coroutine instance.
1603
1604.. function:: from_coroutine(level=2)
1605
1606   Returns ``True`` if the caller is calling function is being invoked
1607   from inside a coroutine or not.  This is primarily of use when
1608   writing decorators and other advanced metaprogramming features.
1609   The implementation requires stack-frame inspection.  The *level*
1610   argument controls the stack frame in which information is obtained
1611   and might need to be adjusted depending on the nature of code calling
1612   this function.
1613
1614.. function:: awaitable(syncfunc)
1615
1616   A decorator that allows an asynchronous implementation of a function to be
1617   attached to an existing synchronous function. If the resulting function is
1618   called from synchronous code, the synchronous function is used. If the
1619   function is called from asynchronous code, the asynchronous function is used.
1620
1621Here is an example that illustrates::
1622
1623   import curio
1624   from curio.meta import awaitable
1625
1626   def spam(x, y):
1627       print('Synchronous ->', x, y)
1628
1629   @awaitable(spam)
1630   async def spam(x, y):
1631       print('Asynchronous ->', x, y)
1632
1633   async def main():
1634       await spam(2, 3)        # Calls asynchronous spam()
1635
1636   if __name__ == '__main__':
1637      spam(2, 3)               # Calls synchronous spam()
1638      curio.run(main())
1639
1640Exceptions
1641----------
1642
1643The following exceptions are defined. All are subclasses of the
1644:class:`CurioError` base class.
1645
1646
1647.. list-table::
1648   :widths: 40 60
1649   :header-rows: 0
1650
1651   * - ``CurioError``
1652     - Base class for all Curio-specific exceptions.
1653   * - ``CancelledError``
1654     - Base class for all cancellation-related exceptions.
1655   * - ``TaskCancelled``
1656     - Exception raised in a coroutine if it has been cancelled using the :meth:`Task.cancel` method.  If ignored, the
1657       coroutine is silently terminated.  If caught, a coroutine can continue to
1658       run, but should work to terminate execution.  Ignoring a cancellation
1659       request and continuing to execute will likely cause some other task to hang.
1660   * - ``TaskTimeout``
1661     - Exception raised in a coroutine if it has been cancelled by timeout.
1662       A subclass of ``CancelledError``.
1663   * - ``TimeoutCancellationError``
1664     - Exception raised in a coroutine if it has been cancelled due to a timeout,
1665       but not one related to the inner-most timeout operation.  A subclass
1666       of ``CancelledError``.
1667   * - ``UncaughtTimeoutError``
1668     - Exception raised if a timeout from an inner timeout operation has
1669       propagated to an outer timeout, indicating the lack of a proper
1670       try-except block.  A subclass of ``CurioError``.
1671   * - ``TaskError``
1672     - Exception raised by the :meth:`Task.join` method if an uncaught exception
1673       occurs in a task.  It is a chained exception. The ``__cause__`` attribute
1674       contains the exception that causes the task to fail.
1675   * - ``SyncIOError``
1676     - Exception raised if a task attempts to perform a synchronous I/O operation
1677       on an object that only supports asynchronous I/O.
1678   * - ``AsyncOnlyError``
1679     - Exception raised by the ``AWAIT()`` function if its applied to code not
1680       properly running in an async-thread.
1681   * - ``ResourceBusy``
1682     - Exception raised in an I/O operation is requested on a resource, but the
1683       resource is already busy performing the same operation on behalf of another task.
1684       The exceptions ``ReadResourceBusy`` and ``WriteResourceBusy`` are subclasses
1685       that provide a more specific cause.
1686
1687Low-level Traps and Scheduling
1688------------------------------
1689
1690The following system calls are available in ``curio.traps``, but not typically used
1691directly in user code.  They are used to implement higher level
1692objects such as locks, socket wrappers, and so forth. If you find
1693yourself using these, you're probably doing something wrong--or
1694implementing a new Curio primitive.
1695
1696Unless otherwise indicated, all traps are potentially blocking and
1697may raise a cancellation exception.
1698
1699.. list-table::
1700   :widths: 50 50
1701   :header-rows: 0
1702
1703   * - ``await _read_wait(fileobj)``
1704     - Sleep until data is available for reading on
1705       *fileobj*.  *fileobj* is any file-like object with a `fileno()`
1706       method.
1707   * - ``await _write_wait(fileobj)``
1708     - Sleep until data can be written on *fileobj*.
1709       *fileobj* is any file-like object with a `fileno()` method.
1710   * - ``await _io_release(fileobj)``
1711     - Release any kernel resources associated with *fileobj*.  Should
1712       be called prior to closing any file.
1713   * - ``await _io_waiting(fileobj)``
1714     - Returns a tuple `(rtask, wtask)` of tasks currently sleeping on
1715       *fileobj* (if any).  Returns immediately.
1716   * - ``await _future_wait(fut)``
1717     - Sleep until a result is set on *fut*.  *fut*
1718       is an instance of :py:class:`concurrent.futures.Future`.
1719   * - ``await _cancel_task(task, exc=TaskCancelled, val=None)``
1720     - Cancel *task*.  Returns immediately. *exc* and *val*
1721       specify the exception type and value.
1722   * - ``await _scheduler_wait(sched, state_name)``
1723     - Go to sleep on a kernel scheduler primitive. *sched* is an
1724       instance of ``curio.sched.SchedBase``. *state_name* is the name of
1725       the wait state (used in debugging).
1726   * - ``await _scheduler_wake(sched, n=1, value=None, exc=None)``
1727     -  Reschedule one or more tasks from a kernel scheduler primitive. *n* is the
1728        number of tasks to release. *value* and *exc* specify the return
1729        value or exception to raise in the task when it resumes execution.
1730        Returns immediately.
1731   * - ``await _get_kernel()``
1732     - Get a reference to the running ``Kernel`` object. Returns immediately.
1733   * - ``await _get_current()``
1734     - Get a reference to the currently running ``Task`` instance. Returns immediately.
1735   * - ``await _sleep(seconds)``
1736     - Sleep for a given number of seconds.
1737   * - ``await _set_timeout(seconds)``
1738     - Set a timeout in the currently running task. Returns immediately
1739       with the previous timeout (if any)
1740   * - ``await _unset_timeout(previous)``
1741     - Unset a timeout in the currently running task. *previous* is the
1742       value returned by the _set_timeout() call used to set the timeout.
1743       Returns immediately.
1744   * - ``await _clock()``
1745     - Immediately returns the current monotonic clock value.
1746
1747Again, you're unlikely to use any of these functions directly.
1748However, here's a small taste of how they get used.  For example, the
1749:meth:`curio.io.Socket.recv` method looks roughly like this::
1750
1751    class Socket:
1752        ...
1753        def recv(self, maxbytes):
1754            while True:
1755                try:
1756                    return self._socket.recv(maxbytes)
1757                except BlockingIOError:
1758                    await _read_wait(self._socket)
1759        ...
1760
1761This method first tries to receive data.  If none is available, the
1762:func:`_read_wait` call is used to put the task to sleep until reading
1763can be performed. When it awakes, the receive operation is
1764retried. Just to emphasize, the :func:`_read_wait` doesn't actually
1765perform any I/O. It's just scheduling a task for it.
1766
1767The ``_scheduler_wait()`` and ``_scheduler_wake()`` traps are used to
1768implement high-level synchronization and queuing primitives.  The
1769``sched`` argument to these calls is an instance of a class that
1770inherits from ``SchedBase`` defined in the ``curio.sched`` submodule.
1771The following specific classes are defined:
1772
1773.. class:: SchedFIFO
1774
1775   A scheduling FIFO queue.  Used to implement locks and queues.
1776
1777.. class:: SchedBarrier
1778
1779   A scheduling barrier.  Used to implement events.
1780
1781The following public methods are defined on an instance ``s`` of these classes:
1782
1783.. list-table::
1784   :widths: 40 60
1785   :header-rows: 0
1786
1787   * - ``await s.suspend(reason)``
1788     - Suspend the calling task. ``reason`` is a string describing why.
1789   * - ``await s.wake(n=1)``
1790     - Wake one or more suspended tasks.
1791   * - ``len(s)``
1792     - Number of tasks suspended.
1793
1794Here is an example of how a scheduler primitive is used to implement an ``Event``::
1795
1796    from curio.sched import SchedBarrier
1797
1798    class Event:
1799        def __init__(self):
1800            self._value = 0
1801            self._sched = SchedBarrier()
1802
1803        async def wait(self):
1804            if self._value == 0:
1805                await self._sched.suspend('EVENT_WAIT')
1806
1807        async def set(self):
1808            self._value = 1
1809            await self._sched.wake(len(self._sched))
1810
1811
1812Debugging and Diagnostics
1813-------------------------
1814
1815Curio provides a few facilities for basic debugging and diagnostics.  If you
1816print a ``Task`` instance, it will tell you the name of the associated
1817coroutine along with the current file/linenumber of where the task is currently
1818executing.   The output might look similar to this::
1819
1820    Task(id=3, name='child', state='TIME_SLEEP') at filename.py:9
1821
1822You can additionally use the ``Task.traceback()`` method to create a current
1823stack traceback of any given task.  For example::
1824
1825    t = await spawn(coro)
1826    ...
1827    print(t.traceback())
1828
1829Instead of a full traceback, you can also get the current filename and line number::
1830
1831    filename, lineno = await t.where()
1832
1833To find out more detailed information about what the kernel is doing, you can
1834supply one or more debugging modules to the ``run()`` function.  To trace
1835all task scheduling events, use the ``schedtrace`` debugger as follows::
1836
1837    from curio.debug import schedtrace
1838    run(coro, debug=schedtrace)
1839
1840To additionally include information on low-level kernel traps, use the ``traptrace`` debugger
1841instead::
1842
1843    from curio.debug import traptrace
1844    run(coro, debug=traptrace)
1845
1846To report all exceptions from crashed tasks, use the ``logcrash`` debugger::
1847
1848    from curio.debug import logcrash
1849    run(coro, debug=logcrash)
1850
1851To report warnings about long-running tasks that appear to be stalling the
1852event loop, use the ``longblock`` debugger::
1853
1854    from curio.debug import longblock
1855    run(coro, debug=longblock(max_time=0.1))
1856
1857The different debuggers may be combined together if you provide a list. For example::
1858
1859    run(coro, debug=[schedtrace, traptrace, logcrash])
1860
1861The amount of output produced by the different debugging modules might be considerable. You
1862can filter it to a specific set of coroutine names using the ``filter`` keyword argument.
1863For example::
1864
1865    async def spam():
1866        ...
1867
1868    async def coro():
1869        t = await spawn(spam)
1870        ...
1871
1872    run(coro, debug=schedtrace(filter={'spam'}))
1873
1874The logging level used by the different debuggers can be changed using the
1875``level`` keyword argument::
1876
1877    run(coro, debug=schedtrace(level=logging.DEBUG))
1878
1879A different ``Logger`` instance can be used using the ``log`` keyword argument::
1880
1881    import logging
1882    run(coro, debug=schedtrace(log=logging.getLogger('spam')))
1883
1884Be aware that all diagnostic logging is synchronous.  As such, all
1885logging operations might temporarily block the event loop--especially
1886if logging output involves file I/O or network operations.  If this is
1887a concern, you should take steps to mitigate it in the configuration
1888of logging.  For example, you might use the ``QueueHandler`` and
1889``QueueListener`` objects from the ``logging`` module to offload log
1890handling to a separate thread.
1891
1892
1893
1894
1895