1"""Event loop and event loop policy."""
2
3__all__ = (
4    'AbstractEventLoopPolicy',
5    'AbstractEventLoop', 'AbstractServer',
6    'Handle', 'TimerHandle',
7    'get_event_loop_policy', 'set_event_loop_policy',
8    'get_event_loop', 'set_event_loop', 'new_event_loop',
9    'get_child_watcher', 'set_child_watcher',
10    '_set_running_loop', 'get_running_loop',
11    '_get_running_loop',
12)
13
14import contextvars
15import os
16import socket
17import subprocess
18import sys
19import threading
20
21from . import format_helpers
22
23
24class Handle:
25    """Object returned by callback registration methods."""
26
27    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
28                 '_source_traceback', '_repr', '__weakref__',
29                 '_context')
30
31    def __init__(self, callback, args, loop, context=None):
32        if context is None:
33            context = contextvars.copy_context()
34        self._context = context
35        self._loop = loop
36        self._callback = callback
37        self._args = args
38        self._cancelled = False
39        self._repr = None
40        if self._loop.get_debug():
41            self._source_traceback = format_helpers.extract_stack(
42                sys._getframe(1))
43        else:
44            self._source_traceback = None
45
46    def _repr_info(self):
47        info = [self.__class__.__name__]
48        if self._cancelled:
49            info.append('cancelled')
50        if self._callback is not None:
51            info.append(format_helpers._format_callback_source(
52                self._callback, self._args))
53        if self._source_traceback:
54            frame = self._source_traceback[-1]
55            info.append(f'created at {frame[0]}:{frame[1]}')
56        return info
57
58    def __repr__(self):
59        if self._repr is not None:
60            return self._repr
61        info = self._repr_info()
62        return '<{}>'.format(' '.join(info))
63
64    def cancel(self):
65        if not self._cancelled:
66            self._cancelled = True
67            if self._loop.get_debug():
68                # Keep a representation in debug mode to keep callback and
69                # parameters. For example, to log the warning
70                # "Executing <Handle...> took 2.5 second"
71                self._repr = repr(self)
72            self._callback = None
73            self._args = None
74
75    def cancelled(self):
76        return self._cancelled
77
78    def _run(self):
79        try:
80            self._context.run(self._callback, *self._args)
81        except (SystemExit, KeyboardInterrupt):
82            raise
83        except BaseException as exc:
84            cb = format_helpers._format_callback_source(
85                self._callback, self._args)
86            msg = f'Exception in callback {cb}'
87            context = {
88                'message': msg,
89                'exception': exc,
90                'handle': self,
91            }
92            if self._source_traceback:
93                context['source_traceback'] = self._source_traceback
94            self._loop.call_exception_handler(context)
95        self = None  # Needed to break cycles when an exception occurs.
96
97
98class TimerHandle(Handle):
99    """Object returned by timed callback registration methods."""
100
101    __slots__ = ['_scheduled', '_when']
102
103    def __init__(self, when, callback, args, loop, context=None):
104        assert when is not None
105        super().__init__(callback, args, loop, context)
106        if self._source_traceback:
107            del self._source_traceback[-1]
108        self._when = when
109        self._scheduled = False
110
111    def _repr_info(self):
112        info = super()._repr_info()
113        pos = 2 if self._cancelled else 1
114        info.insert(pos, f'when={self._when}')
115        return info
116
117    def __hash__(self):
118        return hash(self._when)
119
120    def __lt__(self, other):
121        if isinstance(other, TimerHandle):
122            return self._when < other._when
123        return NotImplemented
124
125    def __le__(self, other):
126        if isinstance(other, TimerHandle):
127            return self._when < other._when or self.__eq__(other)
128        return NotImplemented
129
130    def __gt__(self, other):
131        if isinstance(other, TimerHandle):
132            return self._when > other._when
133        return NotImplemented
134
135    def __ge__(self, other):
136        if isinstance(other, TimerHandle):
137            return self._when > other._when or self.__eq__(other)
138        return NotImplemented
139
140    def __eq__(self, other):
141        if isinstance(other, TimerHandle):
142            return (self._when == other._when and
143                    self._callback == other._callback and
144                    self._args == other._args and
145                    self._cancelled == other._cancelled)
146        return NotImplemented
147
148    def cancel(self):
149        if not self._cancelled:
150            self._loop._timer_handle_cancelled(self)
151        super().cancel()
152
153    def when(self):
154        """Return a scheduled callback time.
155
156        The time is an absolute timestamp, using the same time
157        reference as loop.time().
158        """
159        return self._when
160
161
162class AbstractServer:
163    """Abstract server returned by create_server()."""
164
165    def close(self):
166        """Stop serving.  This leaves existing connections open."""
167        raise NotImplementedError
168
169    def get_loop(self):
170        """Get the event loop the Server object is attached to."""
171        raise NotImplementedError
172
173    def is_serving(self):
174        """Return True if the server is accepting connections."""
175        raise NotImplementedError
176
177    async def start_serving(self):
178        """Start accepting connections.
179
180        This method is idempotent, so it can be called when
181        the server is already being serving.
182        """
183        raise NotImplementedError
184
185    async def serve_forever(self):
186        """Start accepting connections until the coroutine is cancelled.
187
188        The server is closed when the coroutine is cancelled.
189        """
190        raise NotImplementedError
191
192    async def wait_closed(self):
193        """Coroutine to wait until service is closed."""
194        raise NotImplementedError
195
196    async def __aenter__(self):
197        return self
198
199    async def __aexit__(self, *exc):
200        self.close()
201        await self.wait_closed()
202
203
204class AbstractEventLoop:
205    """Abstract event loop."""
206
207    # Running and stopping the event loop.
208
209    def run_forever(self):
210        """Run the event loop until stop() is called."""
211        raise NotImplementedError
212
213    def run_until_complete(self, future):
214        """Run the event loop until a Future is done.
215
216        Return the Future's result, or raise its exception.
217        """
218        raise NotImplementedError
219
220    def stop(self):
221        """Stop the event loop as soon as reasonable.
222
223        Exactly how soon that is may depend on the implementation, but
224        no more I/O callbacks should be scheduled.
225        """
226        raise NotImplementedError
227
228    def is_running(self):
229        """Return whether the event loop is currently running."""
230        raise NotImplementedError
231
232    def is_closed(self):
233        """Returns True if the event loop was closed."""
234        raise NotImplementedError
235
236    def close(self):
237        """Close the loop.
238
239        The loop should not be running.
240
241        This is idempotent and irreversible.
242
243        No other methods should be called after this one.
244        """
245        raise NotImplementedError
246
247    async def shutdown_asyncgens(self):
248        """Shutdown all active asynchronous generators."""
249        raise NotImplementedError
250
251    async def shutdown_default_executor(self):
252        """Schedule the shutdown of the default executor."""
253        raise NotImplementedError
254
255    # Methods scheduling callbacks.  All these return Handles.
256
257    def _timer_handle_cancelled(self, handle):
258        """Notification that a TimerHandle has been cancelled."""
259        raise NotImplementedError
260
261    def call_soon(self, callback, *args):
262        return self.call_later(0, callback, *args)
263
264    def call_later(self, delay, callback, *args):
265        raise NotImplementedError
266
267    def call_at(self, when, callback, *args):
268        raise NotImplementedError
269
270    def time(self):
271        raise NotImplementedError
272
273    def create_future(self):
274        raise NotImplementedError
275
276    # Method scheduling a coroutine object: create a task.
277
278    def create_task(self, coro, *, name=None):
279        raise NotImplementedError
280
281    # Methods for interacting with threads.
282
283    def call_soon_threadsafe(self, callback, *args):
284        raise NotImplementedError
285
286    def run_in_executor(self, executor, func, *args):
287        raise NotImplementedError
288
289    def set_default_executor(self, executor):
290        raise NotImplementedError
291
292    # Network I/O methods returning Futures.
293
294    async def getaddrinfo(self, host, port, *,
295                          family=0, type=0, proto=0, flags=0):
296        raise NotImplementedError
297
298    async def getnameinfo(self, sockaddr, flags=0):
299        raise NotImplementedError
300
301    async def create_connection(
302            self, protocol_factory, host=None, port=None,
303            *, ssl=None, family=0, proto=0,
304            flags=0, sock=None, local_addr=None,
305            server_hostname=None,
306            ssl_handshake_timeout=None,
307            happy_eyeballs_delay=None, interleave=None):
308        raise NotImplementedError
309
310    async def create_server(
311            self, protocol_factory, host=None, port=None,
312            *, family=socket.AF_UNSPEC,
313            flags=socket.AI_PASSIVE, sock=None, backlog=100,
314            ssl=None, reuse_address=None, reuse_port=None,
315            ssl_handshake_timeout=None,
316            start_serving=True):
317        """A coroutine which creates a TCP server bound to host and port.
318
319        The return value is a Server object which can be used to stop
320        the service.
321
322        If host is an empty string or None all interfaces are assumed
323        and a list of multiple sockets will be returned (most likely
324        one for IPv4 and another one for IPv6). The host parameter can also be
325        a sequence (e.g. list) of hosts to bind to.
326
327        family can be set to either AF_INET or AF_INET6 to force the
328        socket to use IPv4 or IPv6. If not set it will be determined
329        from host (defaults to AF_UNSPEC).
330
331        flags is a bitmask for getaddrinfo().
332
333        sock can optionally be specified in order to use a preexisting
334        socket object.
335
336        backlog is the maximum number of queued connections passed to
337        listen() (defaults to 100).
338
339        ssl can be set to an SSLContext to enable SSL over the
340        accepted connections.
341
342        reuse_address tells the kernel to reuse a local socket in
343        TIME_WAIT state, without waiting for its natural timeout to
344        expire. If not specified will automatically be set to True on
345        UNIX.
346
347        reuse_port tells the kernel to allow this endpoint to be bound to
348        the same port as other existing endpoints are bound to, so long as
349        they all set this flag when being created. This option is not
350        supported on Windows.
351
352        ssl_handshake_timeout is the time in seconds that an SSL server
353        will wait for completion of the SSL handshake before aborting the
354        connection. Default is 60s.
355
356        start_serving set to True (default) causes the created server
357        to start accepting connections immediately.  When set to False,
358        the user should await Server.start_serving() or Server.serve_forever()
359        to make the server to start accepting connections.
360        """
361        raise NotImplementedError
362
363    async def sendfile(self, transport, file, offset=0, count=None,
364                       *, fallback=True):
365        """Send a file through a transport.
366
367        Return an amount of sent bytes.
368        """
369        raise NotImplementedError
370
371    async def start_tls(self, transport, protocol, sslcontext, *,
372                        server_side=False,
373                        server_hostname=None,
374                        ssl_handshake_timeout=None):
375        """Upgrade a transport to TLS.
376
377        Return a new transport that *protocol* should start using
378        immediately.
379        """
380        raise NotImplementedError
381
382    async def create_unix_connection(
383            self, protocol_factory, path=None, *,
384            ssl=None, sock=None,
385            server_hostname=None,
386            ssl_handshake_timeout=None):
387        raise NotImplementedError
388
389    async def create_unix_server(
390            self, protocol_factory, path=None, *,
391            sock=None, backlog=100, ssl=None,
392            ssl_handshake_timeout=None,
393            start_serving=True):
394        """A coroutine which creates a UNIX Domain Socket server.
395
396        The return value is a Server object, which can be used to stop
397        the service.
398
399        path is a str, representing a file system path to bind the
400        server socket to.
401
402        sock can optionally be specified in order to use a preexisting
403        socket object.
404
405        backlog is the maximum number of queued connections passed to
406        listen() (defaults to 100).
407
408        ssl can be set to an SSLContext to enable SSL over the
409        accepted connections.
410
411        ssl_handshake_timeout is the time in seconds that an SSL server
412        will wait for the SSL handshake to complete (defaults to 60s).
413
414        start_serving set to True (default) causes the created server
415        to start accepting connections immediately.  When set to False,
416        the user should await Server.start_serving() or Server.serve_forever()
417        to make the server to start accepting connections.
418        """
419        raise NotImplementedError
420
421    async def connect_accepted_socket(
422            self, protocol_factory, sock,
423            *, ssl=None,
424            ssl_handshake_timeout=None):
425        """Handle an accepted connection.
426
427        This is used by servers that accept connections outside of
428        asyncio, but use asyncio to handle connections.
429
430        This method is a coroutine.  When completed, the coroutine
431        returns a (transport, protocol) pair.
432        """
433        raise NotImplementedError
434
435    async def create_datagram_endpoint(self, protocol_factory,
436                                       local_addr=None, remote_addr=None, *,
437                                       family=0, proto=0, flags=0,
438                                       reuse_address=None, reuse_port=None,
439                                       allow_broadcast=None, sock=None):
440        """A coroutine which creates a datagram endpoint.
441
442        This method will try to establish the endpoint in the background.
443        When successful, the coroutine returns a (transport, protocol) pair.
444
445        protocol_factory must be a callable returning a protocol instance.
446
447        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
448        host (or family if specified), socket type SOCK_DGRAM.
449
450        reuse_address tells the kernel to reuse a local socket in
451        TIME_WAIT state, without waiting for its natural timeout to
452        expire. If not specified it will automatically be set to True on
453        UNIX.
454
455        reuse_port tells the kernel to allow this endpoint to be bound to
456        the same port as other existing endpoints are bound to, so long as
457        they all set this flag when being created. This option is not
458        supported on Windows and some UNIX's. If the
459        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
460        capability is unsupported.
461
462        allow_broadcast tells the kernel to allow this endpoint to send
463        messages to the broadcast address.
464
465        sock can optionally be specified in order to use a preexisting
466        socket object.
467        """
468        raise NotImplementedError
469
470    # Pipes and subprocesses.
471
472    async def connect_read_pipe(self, protocol_factory, pipe):
473        """Register read pipe in event loop. Set the pipe to non-blocking mode.
474
475        protocol_factory should instantiate object with Protocol interface.
476        pipe is a file-like object.
477        Return pair (transport, protocol), where transport supports the
478        ReadTransport interface."""
479        # The reason to accept file-like object instead of just file descriptor
480        # is: we need to own pipe and close it at transport finishing
481        # Can got complicated errors if pass f.fileno(),
482        # close fd in pipe transport then close f and vice versa.
483        raise NotImplementedError
484
485    async def connect_write_pipe(self, protocol_factory, pipe):
486        """Register write pipe in event loop.
487
488        protocol_factory should instantiate object with BaseProtocol interface.
489        Pipe is file-like object already switched to nonblocking.
490        Return pair (transport, protocol), where transport support
491        WriteTransport interface."""
492        # The reason to accept file-like object instead of just file descriptor
493        # is: we need to own pipe and close it at transport finishing
494        # Can got complicated errors if pass f.fileno(),
495        # close fd in pipe transport then close f and vice versa.
496        raise NotImplementedError
497
498    async def subprocess_shell(self, protocol_factory, cmd, *,
499                               stdin=subprocess.PIPE,
500                               stdout=subprocess.PIPE,
501                               stderr=subprocess.PIPE,
502                               **kwargs):
503        raise NotImplementedError
504
505    async def subprocess_exec(self, protocol_factory, *args,
506                              stdin=subprocess.PIPE,
507                              stdout=subprocess.PIPE,
508                              stderr=subprocess.PIPE,
509                              **kwargs):
510        raise NotImplementedError
511
512    # Ready-based callback registration methods.
513    # The add_*() methods return None.
514    # The remove_*() methods return True if something was removed,
515    # False if there was nothing to delete.
516
517    def add_reader(self, fd, callback, *args):
518        raise NotImplementedError
519
520    def remove_reader(self, fd):
521        raise NotImplementedError
522
523    def add_writer(self, fd, callback, *args):
524        raise NotImplementedError
525
526    def remove_writer(self, fd):
527        raise NotImplementedError
528
529    # Completion based I/O methods returning Futures.
530
531    async def sock_recv(self, sock, nbytes):
532        raise NotImplementedError
533
534    async def sock_recv_into(self, sock, buf):
535        raise NotImplementedError
536
537    async def sock_sendall(self, sock, data):
538        raise NotImplementedError
539
540    async def sock_connect(self, sock, address):
541        raise NotImplementedError
542
543    async def sock_accept(self, sock):
544        raise NotImplementedError
545
546    async def sock_sendfile(self, sock, file, offset=0, count=None,
547                            *, fallback=None):
548        raise NotImplementedError
549
550    # Signal handling.
551
552    def add_signal_handler(self, sig, callback, *args):
553        raise NotImplementedError
554
555    def remove_signal_handler(self, sig):
556        raise NotImplementedError
557
558    # Task factory.
559
560    def set_task_factory(self, factory):
561        raise NotImplementedError
562
563    def get_task_factory(self):
564        raise NotImplementedError
565
566    # Error handlers.
567
568    def get_exception_handler(self):
569        raise NotImplementedError
570
571    def set_exception_handler(self, handler):
572        raise NotImplementedError
573
574    def default_exception_handler(self, context):
575        raise NotImplementedError
576
577    def call_exception_handler(self, context):
578        raise NotImplementedError
579
580    # Debug flag management.
581
582    def get_debug(self):
583        raise NotImplementedError
584
585    def set_debug(self, enabled):
586        raise NotImplementedError
587
588
589class AbstractEventLoopPolicy:
590    """Abstract policy for accessing the event loop."""
591
592    def get_event_loop(self):
593        """Get the event loop for the current context.
594
595        Returns an event loop object implementing the BaseEventLoop interface,
596        or raises an exception in case no event loop has been set for the
597        current context and the current policy does not specify to create one.
598
599        It should never return None."""
600        raise NotImplementedError
601
602    def set_event_loop(self, loop):
603        """Set the event loop for the current context to loop."""
604        raise NotImplementedError
605
606    def new_event_loop(self):
607        """Create and return a new event loop object according to this
608        policy's rules. If there's need to set this loop as the event loop for
609        the current context, set_event_loop must be called explicitly."""
610        raise NotImplementedError
611
612    # Child processes handling (Unix only).
613
614    def get_child_watcher(self):
615        "Get the watcher for child processes."
616        raise NotImplementedError
617
618    def set_child_watcher(self, watcher):
619        """Set the watcher for child processes."""
620        raise NotImplementedError
621
622
623class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
624    """Default policy implementation for accessing the event loop.
625
626    In this policy, each thread has its own event loop.  However, we
627    only automatically create an event loop by default for the main
628    thread; other threads by default have no event loop.
629
630    Other policies may have different rules (e.g. a single global
631    event loop, or automatically creating an event loop per thread, or
632    using some other notion of context to which an event loop is
633    associated).
634    """
635
636    _loop_factory = None
637
638    class _Local(threading.local):
639        _loop = None
640        _set_called = False
641
642    def __init__(self):
643        self._local = self._Local()
644
645    def get_event_loop(self):
646        """Get the event loop for the current context.
647
648        Returns an instance of EventLoop or raises an exception.
649        """
650        if (self._local._loop is None and
651                not self._local._set_called and
652                threading.current_thread() is threading.main_thread()):
653            self.set_event_loop(self.new_event_loop())
654
655        if self._local._loop is None:
656            raise RuntimeError('There is no current event loop in thread %r.'
657                               % threading.current_thread().name)
658
659        return self._local._loop
660
661    def set_event_loop(self, loop):
662        """Set the event loop."""
663        self._local._set_called = True
664        assert loop is None or isinstance(loop, AbstractEventLoop)
665        self._local._loop = loop
666
667    def new_event_loop(self):
668        """Create a new event loop.
669
670        You must call set_event_loop() to make this the current event
671        loop.
672        """
673        return self._loop_factory()
674
675
676# Event loop policy.  The policy itself is always global, even if the
677# policy's rules say that there is an event loop per thread (or other
678# notion of context).  The default policy is installed by the first
679# call to get_event_loop_policy().
680_event_loop_policy = None
681
682# Lock for protecting the on-the-fly creation of the event loop policy.
683_lock = threading.Lock()
684
685
686# A TLS for the running event loop, used by _get_running_loop.
687class _RunningLoop(threading.local):
688    loop_pid = (None, None)
689
690
691_running_loop = _RunningLoop()
692
693
694def get_running_loop():
695    """Return the running event loop.  Raise a RuntimeError if there is none.
696
697    This function is thread-specific.
698    """
699    # NOTE: this function is implemented in C (see _asynciomodule.c)
700    loop = _get_running_loop()
701    if loop is None:
702        raise RuntimeError('no running event loop')
703    return loop
704
705
706def _get_running_loop():
707    """Return the running event loop or None.
708
709    This is a low-level function intended to be used by event loops.
710    This function is thread-specific.
711    """
712    # NOTE: this function is implemented in C (see _asynciomodule.c)
713    running_loop, pid = _running_loop.loop_pid
714    if running_loop is not None and pid == os.getpid():
715        return running_loop
716
717
718def _set_running_loop(loop):
719    """Set the running event loop.
720
721    This is a low-level function intended to be used by event loops.
722    This function is thread-specific.
723    """
724    # NOTE: this function is implemented in C (see _asynciomodule.c)
725    _running_loop.loop_pid = (loop, os.getpid())
726
727
728def _init_event_loop_policy():
729    global _event_loop_policy
730    with _lock:
731        if _event_loop_policy is None:  # pragma: no branch
732            from . import DefaultEventLoopPolicy
733            _event_loop_policy = DefaultEventLoopPolicy()
734
735
736def get_event_loop_policy():
737    """Get the current event loop policy."""
738    if _event_loop_policy is None:
739        _init_event_loop_policy()
740    return _event_loop_policy
741
742
743def set_event_loop_policy(policy):
744    """Set the current event loop policy.
745
746    If policy is None, the default policy is restored."""
747    global _event_loop_policy
748    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
749    _event_loop_policy = policy
750
751
752def get_event_loop():
753    """Return an asyncio event loop.
754
755    When called from a coroutine or a callback (e.g. scheduled with call_soon
756    or similar API), this function will always return the running event loop.
757
758    If there is no running event loop set, the function will return
759    the result of `get_event_loop_policy().get_event_loop()` call.
760    """
761    # NOTE: this function is implemented in C (see _asynciomodule.c)
762    return _py__get_event_loop()
763
764
765def _get_event_loop(stacklevel=3):
766    current_loop = _get_running_loop()
767    if current_loop is not None:
768        return current_loop
769    import warnings
770    warnings.warn('There is no current event loop',
771                  DeprecationWarning, stacklevel=stacklevel)
772    return get_event_loop_policy().get_event_loop()
773
774
775def set_event_loop(loop):
776    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
777    get_event_loop_policy().set_event_loop(loop)
778
779
780def new_event_loop():
781    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
782    return get_event_loop_policy().new_event_loop()
783
784
785def get_child_watcher():
786    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
787    return get_event_loop_policy().get_child_watcher()
788
789
790def set_child_watcher(watcher):
791    """Equivalent to calling
792    get_event_loop_policy().set_child_watcher(watcher)."""
793    return get_event_loop_policy().set_child_watcher(watcher)
794
795
796# Alias pure-Python implementations for testing purposes.
797_py__get_running_loop = _get_running_loop
798_py__set_running_loop = _set_running_loop
799_py_get_running_loop = get_running_loop
800_py_get_event_loop = get_event_loop
801_py__get_event_loop = _get_event_loop
802
803
804try:
805    # get_event_loop() is one of the most frequently called
806    # functions in asyncio.  Pure Python implementation is
807    # about 4 times slower than C-accelerated.
808    from _asyncio import (_get_running_loop, _set_running_loop,
809                          get_running_loop, get_event_loop, _get_event_loop)
810except ImportError:
811    pass
812else:
813    # Alias C implementations for testing purposes.
814    _c__get_running_loop = _get_running_loop
815    _c__set_running_loop = _set_running_loop
816    _c_get_running_loop = get_running_loop
817    _c_get_event_loop = get_event_loop
818    _c__get_event_loop = _get_event_loop
819