1"""
2The main interface for management of the aio loop
3"""
4import asyncio
5import functools
6import signal
7from typing import Any
8from typing import AsyncGenerator
9from typing import Callable
10from typing import Coroutine
11from typing import Iterable
12
13import proxy_tools
14
15import pop.hub
16
17__virtualname__ = "loop"
18
19
20def __init__(hub):
21    hub.pop.loop.FUT_QUE = None
22    hub.pop.loop.CURRENT_LOOP = None
23    hub.pop.loop.EXECUTOR = None
24    hub.pop.loop.POLICY = None
25    hub.pop.sub.add(dyne_name="loop")
26    hub.pop.Loop = proxy_tools.module_property(hub.loop.init.get)
27
28
29def create(hub, loop_plugin: str = None):
30    """
31    Call the pop-loop dyne for creating the event loop.
32    This is an idempotent operation.
33
34    :param hub:
35    :param loop_plugin: The plugin from pop-loop to use to initialize the loop
36    """
37    if not hub.pop.loop.CURRENT_LOOP:
38        hub.loop.init.create(loop_plugin)
39
40
41def call_soon(hub: "pop.hub.Hub", ref: str, *args, **kwargs):
42    """
43    Schedule a coroutine to be called when the loop has time. This needs
44    to be called after the creation of the loop
45    """
46    coro = hub.pop.loop.wrap(hub[ref], *args, **kwargs)
47    return hub.pop.loop.CURRENT_LOOP.call_soon(hub.pop.loop.unwrap, coro)
48
49
50def ensure_future(hub: "pop.hub.Hub", ref: str, *args, **kwargs) -> asyncio.Future:
51    """
52    Schedule a coroutine to be called when the loop has time. This needs
53    to be called after the creation of the loop. This function also uses
54    the hold system to await the future when it is done making it easy
55    to create a future that will be cleanly awaited in the background.
56    """
57    future = asyncio.ensure_future(hub[ref](*args, **kwargs))
58
59    if hub.pop.loop.FUT_QUE is None:
60        raise RuntimeError(
61            "Run 'await hub.pop.loop.init_futures()' to create the futures queue in the proper loop"
62        )
63
64    def callback(fut):
65        hub.pop.loop.FUT_QUE.put_nowait(fut)
66
67    future.add_done_callback(callback)
68    return future
69
70
71async def init_futures(hub):
72    if hub.pop.loop.FUT_QUE is None:
73        hub.pop.loop.FUT_QUE = asyncio.Queue()
74
75
76def start(
77    hub: "pop.hub.Hub",
78    *coros,
79    hold: bool = False,
80    sigint: Callable = None,
81    sigterm: Callable = None,
82    loop_plugin: asyncio.AbstractEventLoop = None,
83) -> Any:
84    """
85    Start a loop that will run until complete
86    """
87    hub.pop.loop.create(loop_plugin=loop_plugin)
88    loop: asyncio.AbstractEventLoop = hub.pop.loop.CURRENT_LOOP
89
90    if sigint:
91        s = signal.SIGINT
92        loop.add_signal_handler(s, lambda s=s: loop.create_task(sigint(s)))
93    if sigterm:
94        s = signal.SIGTERM
95        loop.add_signal_handler(s, lambda s=s: loop.create_task(sigterm(s)))
96    if hold:
97        coros = list(coros)
98        coros.append(_holder(hub))
99
100    try:
101        # DO NOT CHANGE THIS CALL TO run_forever! If we do that then the tracebacks
102        # do not get resolved.
103        return loop.run_until_complete(asyncio.gather(*coros))
104    except KeyboardInterrupt:
105        print("Caught keyboard interrupt. Canceling...")
106    finally:
107        loop.close()
108
109
110async def _holder(hub):
111    """
112    Just a sleeping while loop to hold the loop open while it runs until
113    complete
114    """
115    if hub.pop.loop.FUT_QUE is None:
116        hub.pop.loop.FUT_QUE = asyncio.Queue()
117
118    while True:
119        future = await hub.pop.loop.FUT_QUE.get()
120        await future
121
122
123async def await_futures(hub: "pop.hub.Hub"):
124    """
125    Scan over the futures that have completed and manually await them.
126    This function is used to clean up futures when the loop is not opened
127    up with hold=True so that ensured futures can be cleaned up on demand
128    """
129    if hub.pop.loop.FUT_QUE is None:
130        hub.pop.loop.FUT_QUE = asyncio.Queue()
131
132    while not hub.pop.loop.FUT_QUE.empty():
133        future = await hub.pop.loop.FUT_QUE.get()
134        await future
135
136
137async def kill(hub: "pop.hub.Hub", wait: int or float = 0):
138    """
139    Close out the loop
140    """
141    await asyncio.sleep(wait)
142    hub.pop.loop.CURRENT_LOOP.stop()
143    while True:
144        if hub.pop.loop.CURRENT_LOOP is not None:
145            if not hub.pop.loop.CURRENT_LOOP.is_running():
146                hub.pop.loop.CURRENT_LOOP.close()
147                hub.pop.loop.CURRENT_LOOP = None
148            else:
149                await asyncio.sleep(1)
150
151
152# Helpers for async operations
153
154
155async def as_yielded(
156    hub: "pop.hub.Hub", gens: Iterable[AsyncGenerator]
157) -> AsyncGenerator:
158    """
159    Concurrently run multiple async generators and yield the next yielded
160    value from the soonest yielded generator.
161
162    async def many():
163        for n in range(10):
164            yield os.urandom(6).hex()
165
166    async def run():
167        gens = []
168        for n in range(10):
169            gens.append(many())
170        async for y in as_yielded(gens):
171            print(y)
172    """
173    fin = object()
174    que = asyncio.Queue()
175    to_clean = []
176
177    async def _yield(agen: AsyncGenerator):
178        async for comp in agen:
179            await que.put(comp)
180
181    async def _ensure(coroutines: Iterable[Coroutine]):
182        for f in asyncio.as_completed(coroutines):
183            await f
184
185    async def _set_done():
186        await que.put(fin)
187
188    def _done(future: asyncio.Future):
189        to_clean.append(asyncio.ensure_future(_set_done()))
190
191    coros = []
192    for gen in gens:
193        coros.append(_yield(gen))
194
195    fut = asyncio.ensure_future(_ensure(coros))
196    fut.add_done_callback(_done)
197
198    while True:
199        ret = await que.get()
200        if ret is fin:
201            break
202        yield ret
203    for c in to_clean:
204        await c
205
206
207async def sleep(hub, delay: float, *args, **kwargs):
208    await asyncio.sleep(delay, *args, **kwargs)
209
210
211async def wrap(
212    hub: "pop.hub.Hub",
213    synchronous_function: Callable,
214    *args,
215    **kwargs,
216):
217    """
218    Run a synchronous function asynchronously
219
220    :param hub:
221    :param synchronous_function: The function to wrap
222    """
223    return await hub.pop.loop.CURRENT_LOOP.run_in_executor(
224        executor=hub.pop.loop.EXECUTOR,
225        func=functools.partial(synchronous_function, *args, **kwargs),
226    )
227
228
229async def unwrap(hub, function_ret: Coroutine or Any):
230    """
231    Take the return of a function, if it is awaitable, await it.
232    Return the result
233
234    :param hub:
235    :param function_ret: The return from a possibly asynchronous function
236    """
237    while asyncio.iscoroutine(function_ret):
238        function_ret = await function_ret
239    return function_ret
240