1import math
2import sys
3import threading
4from contextlib import contextmanager
5from importlib import import_module
6from typing import Any, Callable, Coroutine, Dict, Generator, Optional, Tuple, Type, TypeVar
7
8import sniffio
9
10# This must be updated when new backends are introduced
11from ._compat import DeprecatedAwaitableFloat
12
13BACKENDS = 'asyncio', 'trio'
14
15T_Retval = TypeVar('T_Retval')
16threadlocals = threading.local()
17
18
19def run(func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object,
20        backend: str = 'asyncio', backend_options: Optional[Dict[str, Any]] = None) -> T_Retval:
21    """
22    Run the given coroutine function in an asynchronous event loop.
23
24    The current thread must not be already running an event loop.
25
26    :param func: a coroutine function
27    :param args: positional arguments to ``func``
28    :param backend: name of the asynchronous event loop implementation – currently either
29        ``asyncio`` or ``trio``
30    :param backend_options: keyword arguments to call the backend ``run()`` implementation with
31        (documented :ref:`here <backend options>`)
32    :return: the return value of the coroutine function
33    :raises RuntimeError: if an asynchronous event loop is already running in this thread
34    :raises LookupError: if the named backend is not found
35
36    """
37    try:
38        asynclib_name = sniffio.current_async_library()
39    except sniffio.AsyncLibraryNotFoundError:
40        pass
41    else:
42        raise RuntimeError(f'Already running {asynclib_name} in this thread')
43
44    try:
45        asynclib = import_module(f'..._backends._{backend}', package=__name__)
46    except ImportError as exc:
47        raise LookupError(f'No such backend: {backend}') from exc
48
49    token = None
50    if sniffio.current_async_library_cvar.get(None) is None:
51        # Since we're in control of the event loop, we can cache the name of the async library
52        token = sniffio.current_async_library_cvar.set(backend)
53
54    try:
55        backend_options = backend_options or {}
56        return asynclib.run(func, *args, **backend_options)  # type: ignore
57    finally:
58        if token:
59            sniffio.current_async_library_cvar.reset(token)
60
61
62async def sleep(delay: float) -> None:
63    """
64    Pause the current task for the specified duration.
65
66    :param delay: the duration, in seconds
67
68    """
69    return await get_asynclib().sleep(delay)
70
71
72async def sleep_forever() -> None:
73    """
74    Pause the current task until it's cancelled.
75
76    This is a shortcut for ``sleep(math.inf)``.
77
78    .. versionadded:: 3.1
79
80    """
81    await sleep(math.inf)
82
83
84async def sleep_until(deadline: float) -> None:
85    """
86    Pause the current task until the given time.
87
88    :param deadline: the absolute time to wake up at (according to the internal monotonic clock of
89        the event loop)
90
91    .. versionadded:: 3.1
92
93    """
94    now = current_time()
95    await sleep(max(deadline - now, 0))
96
97
98def current_time() -> DeprecatedAwaitableFloat:
99    """
100    Return the current value of the event loop's internal clock.
101
102    :return: the clock value (seconds)
103
104    """
105    return DeprecatedAwaitableFloat(get_asynclib().current_time(), current_time)
106
107
108def get_all_backends() -> Tuple[str, ...]:
109    """Return a tuple of the names of all built-in backends."""
110    return BACKENDS
111
112
113def get_cancelled_exc_class() -> Type[BaseException]:
114    """Return the current async library's cancellation exception class."""
115    return get_asynclib().CancelledError
116
117
118#
119# Private API
120#
121
122@contextmanager
123def claim_worker_thread(backend: str) -> Generator[Any, None, None]:
124    module = sys.modules['anyio._backends._' + backend]
125    threadlocals.current_async_module = module
126    try:
127        yield
128    finally:
129        del threadlocals.current_async_module
130
131
132def get_asynclib(asynclib_name: Optional[str] = None) -> Any:
133    if asynclib_name is None:
134        asynclib_name = sniffio.current_async_library()
135
136    modulename = 'anyio._backends._' + asynclib_name
137    try:
138        return sys.modules[modulename]
139    except KeyError:
140        return import_module(modulename)
141