1import math 2import sys 3import threading 4from contextlib import contextmanager 5from importlib import import_module 6from typing import Any, Callable, Coroutine, Dict, Generator, Optional, Tuple, Type, TypeVar 7 8import sniffio 9 10# This must be updated when new backends are introduced 11from ._compat import DeprecatedAwaitableFloat 12 13BACKENDS = 'asyncio', 'trio' 14 15T_Retval = TypeVar('T_Retval') 16threadlocals = threading.local() 17 18 19def run(func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object, 20 backend: str = 'asyncio', backend_options: Optional[Dict[str, Any]] = None) -> T_Retval: 21 """ 22 Run the given coroutine function in an asynchronous event loop. 23 24 The current thread must not be already running an event loop. 25 26 :param func: a coroutine function 27 :param args: positional arguments to ``func`` 28 :param backend: name of the asynchronous event loop implementation – currently either 29 ``asyncio`` or ``trio`` 30 :param backend_options: keyword arguments to call the backend ``run()`` implementation with 31 (documented :ref:`here <backend options>`) 32 :return: the return value of the coroutine function 33 :raises RuntimeError: if an asynchronous event loop is already running in this thread 34 :raises LookupError: if the named backend is not found 35 36 """ 37 try: 38 asynclib_name = sniffio.current_async_library() 39 except sniffio.AsyncLibraryNotFoundError: 40 pass 41 else: 42 raise RuntimeError(f'Already running {asynclib_name} in this thread') 43 44 try: 45 asynclib = import_module(f'..._backends._{backend}', package=__name__) 46 except ImportError as exc: 47 raise LookupError(f'No such backend: {backend}') from exc 48 49 token = None 50 if sniffio.current_async_library_cvar.get(None) is None: 51 # Since we're in control of the event loop, we can cache the name of the async library 52 token = sniffio.current_async_library_cvar.set(backend) 53 54 try: 55 backend_options = backend_options or {} 56 return asynclib.run(func, *args, **backend_options) # type: ignore 57 finally: 58 if token: 59 sniffio.current_async_library_cvar.reset(token) 60 61 62async def sleep(delay: float) -> None: 63 """ 64 Pause the current task for the specified duration. 65 66 :param delay: the duration, in seconds 67 68 """ 69 return await get_asynclib().sleep(delay) 70 71 72async def sleep_forever() -> None: 73 """ 74 Pause the current task until it's cancelled. 75 76 This is a shortcut for ``sleep(math.inf)``. 77 78 .. versionadded:: 3.1 79 80 """ 81 await sleep(math.inf) 82 83 84async def sleep_until(deadline: float) -> None: 85 """ 86 Pause the current task until the given time. 87 88 :param deadline: the absolute time to wake up at (according to the internal monotonic clock of 89 the event loop) 90 91 .. versionadded:: 3.1 92 93 """ 94 now = current_time() 95 await sleep(max(deadline - now, 0)) 96 97 98def current_time() -> DeprecatedAwaitableFloat: 99 """ 100 Return the current value of the event loop's internal clock. 101 102 :return: the clock value (seconds) 103 104 """ 105 return DeprecatedAwaitableFloat(get_asynclib().current_time(), current_time) 106 107 108def get_all_backends() -> Tuple[str, ...]: 109 """Return a tuple of the names of all built-in backends.""" 110 return BACKENDS 111 112 113def get_cancelled_exc_class() -> Type[BaseException]: 114 """Return the current async library's cancellation exception class.""" 115 return get_asynclib().CancelledError 116 117 118# 119# Private API 120# 121 122@contextmanager 123def claim_worker_thread(backend: str) -> Generator[Any, None, None]: 124 module = sys.modules['anyio._backends._' + backend] 125 threadlocals.current_async_module = module 126 try: 127 yield 128 finally: 129 del threadlocals.current_async_module 130 131 132def get_asynclib(asynclib_name: Optional[str] = None) -> Any: 133 if asynclib_name is None: 134 asynclib_name = sniffio.current_async_library() 135 136 modulename = 'anyio._backends._' + asynclib_name 137 try: 138 return sys.modules[modulename] 139 except KeyError: 140 return import_module(modulename) 141