1""" Multicast DNS Service Discovery for Python, v0.14-wmcbrine
2    Copyright 2003 Paul Scott-Murphy, 2014 William McBrine
3
4    This module provides a framework for the use of DNS Service Discovery
5    using IP multicast.
6
7    This library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 2.1 of the License, or (at your option) any later version.
11
12    This library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with this library; if not, write to the Free Software
19    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20    USA
21"""
22
23import asyncio
24import concurrent.futures
25import contextlib
26import queue
27from typing import Any, Awaitable, Coroutine, List, Optional, Set, cast
28
29from .time import millis_to_seconds
30from .._exceptions import EventLoopBlocked
31from ..const import _LOADED_SYSTEM_TIMEOUT
32
33# The combined timeouts should be lower than _CLOSE_TIMEOUT + _WAIT_FOR_LOOP_TASKS_TIMEOUT
34_TASK_AWAIT_TIMEOUT = 1
35_GET_ALL_TASKS_TIMEOUT = 3
36_WAIT_FOR_LOOP_TASKS_TIMEOUT = 3  # Must be larger than _TASK_AWAIT_TIMEOUT
37
38
39def get_best_available_queue() -> queue.Queue:
40    """Create the best available queue type."""
41    if hasattr(queue, "SimpleQueue"):
42        return queue.SimpleQueue()  # type: ignore  # pylint: disable=all
43    return queue.Queue()
44
45
46# Switch to asyncio.wait_for once https://bugs.python.org/issue39032 is fixed
47async def wait_event_or_timeout(event: asyncio.Event, timeout: float) -> None:
48    """Wait for an event or timeout."""
49    loop = asyncio.get_event_loop()
50    future = loop.create_future()
51
52    def _handle_timeout_or_wait_complete(*_: Any) -> None:
53        if not future.done():
54            future.set_result(None)
55
56    timer_handle = loop.call_later(timeout, _handle_timeout_or_wait_complete)
57    event_wait = loop.create_task(event.wait())
58    event_wait.add_done_callback(_handle_timeout_or_wait_complete)
59
60    try:
61        await future
62    finally:
63        timer_handle.cancel()
64        if not event_wait.done():
65            event_wait.cancel()
66        with contextlib.suppress(asyncio.CancelledError):
67            await event_wait
68
69
70async def _async_get_all_tasks(loop: asyncio.AbstractEventLoop) -> List[asyncio.Task]:
71    """Return all tasks running."""
72    await asyncio.sleep(0)  # flush out any call_soon_threadsafe
73    # If there are multiple event loops running, all_tasks is not
74    # safe EVEN WHEN CALLED FROM THE EVENTLOOP
75    # under PyPy so we have to try a few times.
76    for _ in range(3):
77        with contextlib.suppress(RuntimeError):
78            if hasattr(asyncio, 'all_tasks'):
79                return asyncio.all_tasks(loop)  # type: ignore  # pylint: disable=no-member
80            return asyncio.Task.all_tasks(loop)  # type: ignore  # pylint: disable=no-member
81    return []
82
83
84async def _wait_for_loop_tasks(wait_tasks: Set[asyncio.Task]) -> None:
85    """Wait for the event loop thread we started to shutdown."""
86    await asyncio.wait(wait_tasks, timeout=_TASK_AWAIT_TIMEOUT)
87
88
89async def await_awaitable(aw: Awaitable) -> None:
90    """Wait on an awaitable and the task it returns."""
91    task = await aw
92    await task
93
94
95def run_coro_with_timeout(aw: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float) -> Any:
96    """Run a coroutine with a timeout.
97
98    The timeout should only be used as a safeguard to prevent
99    the program from blocking forever. The timeout should
100    never be expected to be reached during normal operation.
101
102    While not expected during normal operations, the
103    function raises `EventLoopBlocked` if the coroutine takes
104    longer to complete than the timeout.
105    """
106    try:
107        return asyncio.run_coroutine_threadsafe(aw, loop).result(
108            millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT
109        )
110    except concurrent.futures.TimeoutError as ex:
111        raise EventLoopBlocked from ex
112
113
114def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None:
115    """Wait for pending tasks and stop an event loop."""
116    pending_tasks = set(
117        asyncio.run_coroutine_threadsafe(_async_get_all_tasks(loop), loop).result(_GET_ALL_TASKS_TIMEOUT)
118    )
119    pending_tasks -= set(task for task in pending_tasks if task.done())
120    if pending_tasks:
121        asyncio.run_coroutine_threadsafe(_wait_for_loop_tasks(pending_tasks), loop).result(
122            _WAIT_FOR_LOOP_TASKS_TIMEOUT
123        )
124    loop.call_soon_threadsafe(loop.stop)
125
126
127# Remove the call to _get_running_loop once we drop python 3.6 support
128def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
129    """Check if an event loop is already running."""
130    with contextlib.suppress(RuntimeError):
131        if hasattr(asyncio, "get_running_loop"):
132            return cast(
133                asyncio.AbstractEventLoop,
134                asyncio.get_running_loop(),  # type: ignore  # pylint: disable=no-member  # noqa
135            )
136        return asyncio._get_running_loop()  # pylint: disable=no-member,protected-access
137    return None
138