1import asyncio as __asyncio 2import typing as _typing 3 4from asyncio.events import BaseDefaultEventLoopPolicy as __BasePolicy 5 6from . import includes as __includes # NOQA 7from .loop import Loop as __BaseLoop # NOQA 8from ._version import __version__ # NOQA 9 10 11__all__ = ('new_event_loop', 'install', 'EventLoopPolicy') 12 13 14class Loop(__BaseLoop, __asyncio.AbstractEventLoop): # type: ignore[misc] 15 pass 16 17 18def new_event_loop() -> Loop: 19 """Return a new event loop.""" 20 return Loop() 21 22 23def install() -> None: 24 """A helper function to install uvloop policy.""" 25 __asyncio.set_event_loop_policy(EventLoopPolicy()) 26 27 28class EventLoopPolicy(__BasePolicy): 29 """Event loop policy. 30 31 The preferred way to make your application use uvloop: 32 33 >>> import asyncio 34 >>> import uvloop 35 >>> asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) 36 >>> asyncio.get_event_loop() 37 <uvloop.Loop running=False closed=False debug=False> 38 """ 39 40 def _loop_factory(self) -> Loop: 41 return new_event_loop() 42 43 if _typing.TYPE_CHECKING: 44 # EventLoopPolicy doesn't implement these, but since they are marked 45 # as abstract in typeshed, we have to put them in so mypy thinks 46 # the base methods are overridden. This is the same approach taken 47 # for the Windows event loop policy classes in typeshed. 48 def get_child_watcher(self) -> _typing.NoReturn: 49 ... 50 51 def set_child_watcher( 52 self, watcher: _typing.Any 53 ) -> _typing.NoReturn: 54 ... 55