1""" 2The main interface for management of the aio loop 3""" 4import asyncio 5import functools 6import signal 7from typing import Any 8from typing import AsyncGenerator 9from typing import Callable 10from typing import Coroutine 11from typing import Iterable 12 13import proxy_tools 14 15import pop.hub 16 17__virtualname__ = "loop" 18 19 20def __init__(hub): 21 hub.pop.loop.FUT_QUE = None 22 hub.pop.loop.CURRENT_LOOP = None 23 hub.pop.loop.EXECUTOR = None 24 hub.pop.loop.POLICY = None 25 hub.pop.sub.add(dyne_name="loop") 26 hub.pop.Loop = proxy_tools.module_property(hub.loop.init.get) 27 28 29def create(hub, loop_plugin: str = None): 30 """ 31 Call the pop-loop dyne for creating the event loop. 32 This is an idempotent operation. 33 34 :param hub: 35 :param loop_plugin: The plugin from pop-loop to use to initialize the loop 36 """ 37 if not hub.pop.loop.CURRENT_LOOP: 38 hub.loop.init.create(loop_plugin) 39 40 41def call_soon(hub: "pop.hub.Hub", ref: str, *args, **kwargs): 42 """ 43 Schedule a coroutine to be called when the loop has time. This needs 44 to be called after the creation of the loop 45 """ 46 coro = hub.pop.loop.wrap(hub[ref], *args, **kwargs) 47 return hub.pop.loop.CURRENT_LOOP.call_soon(hub.pop.loop.unwrap, coro) 48 49 50def ensure_future(hub: "pop.hub.Hub", ref: str, *args, **kwargs) -> asyncio.Future: 51 """ 52 Schedule a coroutine to be called when the loop has time. This needs 53 to be called after the creation of the loop. This function also uses 54 the hold system to await the future when it is done making it easy 55 to create a future that will be cleanly awaited in the background. 56 """ 57 future = asyncio.ensure_future(hub[ref](*args, **kwargs)) 58 59 if hub.pop.loop.FUT_QUE is None: 60 raise RuntimeError( 61 "Run 'await hub.pop.loop.init_futures()' to create the futures queue in the proper loop" 62 ) 63 64 def callback(fut): 65 hub.pop.loop.FUT_QUE.put_nowait(fut) 66 67 future.add_done_callback(callback) 68 return future 69 70 71async def init_futures(hub): 72 if hub.pop.loop.FUT_QUE is None: 73 hub.pop.loop.FUT_QUE = asyncio.Queue() 74 75 76def start( 77 hub: "pop.hub.Hub", 78 *coros, 79 hold: bool = False, 80 sigint: Callable = None, 81 sigterm: Callable = None, 82 loop_plugin: asyncio.AbstractEventLoop = None, 83) -> Any: 84 """ 85 Start a loop that will run until complete 86 """ 87 hub.pop.loop.create(loop_plugin=loop_plugin) 88 loop: asyncio.AbstractEventLoop = hub.pop.loop.CURRENT_LOOP 89 90 if sigint: 91 s = signal.SIGINT 92 loop.add_signal_handler(s, lambda s=s: loop.create_task(sigint(s))) 93 if sigterm: 94 s = signal.SIGTERM 95 loop.add_signal_handler(s, lambda s=s: loop.create_task(sigterm(s))) 96 if hold: 97 coros = list(coros) 98 coros.append(_holder(hub)) 99 100 try: 101 # DO NOT CHANGE THIS CALL TO run_forever! If we do that then the tracebacks 102 # do not get resolved. 103 return loop.run_until_complete(asyncio.gather(*coros)) 104 except KeyboardInterrupt: 105 print("Caught keyboard interrupt. Canceling...") 106 finally: 107 loop.close() 108 109 110async def _holder(hub): 111 """ 112 Just a sleeping while loop to hold the loop open while it runs until 113 complete 114 """ 115 if hub.pop.loop.FUT_QUE is None: 116 hub.pop.loop.FUT_QUE = asyncio.Queue() 117 118 while True: 119 future = await hub.pop.loop.FUT_QUE.get() 120 await future 121 122 123async def await_futures(hub: "pop.hub.Hub"): 124 """ 125 Scan over the futures that have completed and manually await them. 126 This function is used to clean up futures when the loop is not opened 127 up with hold=True so that ensured futures can be cleaned up on demand 128 """ 129 if hub.pop.loop.FUT_QUE is None: 130 hub.pop.loop.FUT_QUE = asyncio.Queue() 131 132 while not hub.pop.loop.FUT_QUE.empty(): 133 future = await hub.pop.loop.FUT_QUE.get() 134 await future 135 136 137async def kill(hub: "pop.hub.Hub", wait: int or float = 0): 138 """ 139 Close out the loop 140 """ 141 await asyncio.sleep(wait) 142 hub.pop.loop.CURRENT_LOOP.stop() 143 while True: 144 if hub.pop.loop.CURRENT_LOOP is not None: 145 if not hub.pop.loop.CURRENT_LOOP.is_running(): 146 hub.pop.loop.CURRENT_LOOP.close() 147 hub.pop.loop.CURRENT_LOOP = None 148 else: 149 await asyncio.sleep(1) 150 151 152# Helpers for async operations 153 154 155async def as_yielded( 156 hub: "pop.hub.Hub", gens: Iterable[AsyncGenerator] 157) -> AsyncGenerator: 158 """ 159 Concurrently run multiple async generators and yield the next yielded 160 value from the soonest yielded generator. 161 162 async def many(): 163 for n in range(10): 164 yield os.urandom(6).hex() 165 166 async def run(): 167 gens = [] 168 for n in range(10): 169 gens.append(many()) 170 async for y in as_yielded(gens): 171 print(y) 172 """ 173 fin = object() 174 que = asyncio.Queue() 175 to_clean = [] 176 177 async def _yield(agen: AsyncGenerator): 178 async for comp in agen: 179 await que.put(comp) 180 181 async def _ensure(coroutines: Iterable[Coroutine]): 182 for f in asyncio.as_completed(coroutines): 183 await f 184 185 async def _set_done(): 186 await que.put(fin) 187 188 def _done(future: asyncio.Future): 189 to_clean.append(asyncio.ensure_future(_set_done())) 190 191 coros = [] 192 for gen in gens: 193 coros.append(_yield(gen)) 194 195 fut = asyncio.ensure_future(_ensure(coros)) 196 fut.add_done_callback(_done) 197 198 while True: 199 ret = await que.get() 200 if ret is fin: 201 break 202 yield ret 203 for c in to_clean: 204 await c 205 206 207async def sleep(hub, delay: float, *args, **kwargs): 208 await asyncio.sleep(delay, *args, **kwargs) 209 210 211async def wrap( 212 hub: "pop.hub.Hub", 213 synchronous_function: Callable, 214 *args, 215 **kwargs, 216): 217 """ 218 Run a synchronous function asynchronously 219 220 :param hub: 221 :param synchronous_function: The function to wrap 222 """ 223 return await hub.pop.loop.CURRENT_LOOP.run_in_executor( 224 executor=hub.pop.loop.EXECUTOR, 225 func=functools.partial(synchronous_function, *args, **kwargs), 226 ) 227 228 229async def unwrap(hub, function_ret: Coroutine or Any): 230 """ 231 Take the return of a function, if it is awaitable, await it. 232 Return the result 233 234 :param hub: 235 :param function_ret: The return from a possibly asynchronous function 236 """ 237 while asyncio.iscoroutine(function_ret): 238 function_ret = await function_ret 239 return function_ret 240