1""" Multicast DNS Service Discovery for Python, v0.14-wmcbrine 2 Copyright 2003 Paul Scott-Murphy, 2014 William McBrine 3 4 This module provides a framework for the use of DNS Service Discovery 5 using IP multicast. 6 7 This library is free software; you can redistribute it and/or 8 modify it under the terms of the GNU Lesser General Public 9 License as published by the Free Software Foundation; either 10 version 2.1 of the License, or (at your option) any later version. 11 12 This library is distributed in the hope that it will be useful, 13 but WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 Lesser General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public 18 License along with this library; if not, write to the Free Software 19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 20 USA 21""" 22 23import asyncio 24import concurrent.futures 25import contextlib 26import queue 27from typing import Any, Awaitable, Coroutine, List, Optional, Set, cast 28 29from .time import millis_to_seconds 30from .._exceptions import EventLoopBlocked 31from ..const import _LOADED_SYSTEM_TIMEOUT 32 33# The combined timeouts should be lower than _CLOSE_TIMEOUT + _WAIT_FOR_LOOP_TASKS_TIMEOUT 34_TASK_AWAIT_TIMEOUT = 1 35_GET_ALL_TASKS_TIMEOUT = 3 36_WAIT_FOR_LOOP_TASKS_TIMEOUT = 3 # Must be larger than _TASK_AWAIT_TIMEOUT 37 38 39def get_best_available_queue() -> queue.Queue: 40 """Create the best available queue type.""" 41 if hasattr(queue, "SimpleQueue"): 42 return queue.SimpleQueue() # type: ignore # pylint: disable=all 43 return queue.Queue() 44 45 46# Switch to asyncio.wait_for once https://bugs.python.org/issue39032 is fixed 47async def wait_event_or_timeout(event: asyncio.Event, timeout: float) -> None: 48 """Wait for an event or timeout.""" 49 loop = asyncio.get_event_loop() 50 future = loop.create_future() 51 52 def _handle_timeout_or_wait_complete(*_: Any) -> None: 53 if not future.done(): 54 future.set_result(None) 55 56 timer_handle = loop.call_later(timeout, _handle_timeout_or_wait_complete) 57 event_wait = loop.create_task(event.wait()) 58 event_wait.add_done_callback(_handle_timeout_or_wait_complete) 59 60 try: 61 await future 62 finally: 63 timer_handle.cancel() 64 if not event_wait.done(): 65 event_wait.cancel() 66 with contextlib.suppress(asyncio.CancelledError): 67 await event_wait 68 69 70async def _async_get_all_tasks(loop: asyncio.AbstractEventLoop) -> List[asyncio.Task]: 71 """Return all tasks running.""" 72 await asyncio.sleep(0) # flush out any call_soon_threadsafe 73 # If there are multiple event loops running, all_tasks is not 74 # safe EVEN WHEN CALLED FROM THE EVENTLOOP 75 # under PyPy so we have to try a few times. 76 for _ in range(3): 77 with contextlib.suppress(RuntimeError): 78 if hasattr(asyncio, 'all_tasks'): 79 return asyncio.all_tasks(loop) # type: ignore # pylint: disable=no-member 80 return asyncio.Task.all_tasks(loop) # type: ignore # pylint: disable=no-member 81 return [] 82 83 84async def _wait_for_loop_tasks(wait_tasks: Set[asyncio.Task]) -> None: 85 """Wait for the event loop thread we started to shutdown.""" 86 await asyncio.wait(wait_tasks, timeout=_TASK_AWAIT_TIMEOUT) 87 88 89async def await_awaitable(aw: Awaitable) -> None: 90 """Wait on an awaitable and the task it returns.""" 91 task = await aw 92 await task 93 94 95def run_coro_with_timeout(aw: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float) -> Any: 96 """Run a coroutine with a timeout. 97 98 The timeout should only be used as a safeguard to prevent 99 the program from blocking forever. The timeout should 100 never be expected to be reached during normal operation. 101 102 While not expected during normal operations, the 103 function raises `EventLoopBlocked` if the coroutine takes 104 longer to complete than the timeout. 105 """ 106 try: 107 return asyncio.run_coroutine_threadsafe(aw, loop).result( 108 millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT 109 ) 110 except concurrent.futures.TimeoutError as ex: 111 raise EventLoopBlocked from ex 112 113 114def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None: 115 """Wait for pending tasks and stop an event loop.""" 116 pending_tasks = set( 117 asyncio.run_coroutine_threadsafe(_async_get_all_tasks(loop), loop).result(_GET_ALL_TASKS_TIMEOUT) 118 ) 119 pending_tasks -= set(task for task in pending_tasks if task.done()) 120 if pending_tasks: 121 asyncio.run_coroutine_threadsafe(_wait_for_loop_tasks(pending_tasks), loop).result( 122 _WAIT_FOR_LOOP_TASKS_TIMEOUT 123 ) 124 loop.call_soon_threadsafe(loop.stop) 125 126 127# Remove the call to _get_running_loop once we drop python 3.6 support 128def get_running_loop() -> Optional[asyncio.AbstractEventLoop]: 129 """Check if an event loop is already running.""" 130 with contextlib.suppress(RuntimeError): 131 if hasattr(asyncio, "get_running_loop"): 132 return cast( 133 asyncio.AbstractEventLoop, 134 asyncio.get_running_loop(), # type: ignore # pylint: disable=no-member # noqa 135 ) 136 return asyncio._get_running_loop() # pylint: disable=no-member,protected-access 137 return None 138