1"""Event loop and event loop policy."""
2
3__all__ = (
4    'AbstractEventLoopPolicy',
5    'AbstractEventLoop', 'AbstractServer',
6    'Handle', 'TimerHandle',
7    'get_event_loop_policy', 'set_event_loop_policy',
8    'get_event_loop', 'set_event_loop', 'new_event_loop',
9    'get_child_watcher', 'set_child_watcher',
10    '_set_running_loop', 'get_running_loop',
11    '_get_running_loop',
12)
13
14import contextvars
15import os
16import socket
17import subprocess
18import sys
19import threading
20
21from . import format_helpers
22from . import exceptions
23
24
25class Handle:
26    """Object returned by callback registration methods."""
27
28    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
29                 '_source_traceback', '_repr', '__weakref__',
30                 '_context')
31
32    def __init__(self, callback, args, loop, context=None):
33        if context is None:
34            context = contextvars.copy_context()
35        self._context = context
36        self._loop = loop
37        self._callback = callback
38        self._args = args
39        self._cancelled = False
40        self._repr = None
41        if self._loop.get_debug():
42            self._source_traceback = format_helpers.extract_stack(
43                sys._getframe(1))
44        else:
45            self._source_traceback = None
46
47    def _repr_info(self):
48        info = [self.__class__.__name__]
49        if self._cancelled:
50            info.append('cancelled')
51        if self._callback is not None:
52            info.append(format_helpers._format_callback_source(
53                self._callback, self._args))
54        if self._source_traceback:
55            frame = self._source_traceback[-1]
56            info.append(f'created at {frame[0]}:{frame[1]}')
57        return info
58
59    def __repr__(self):
60        if self._repr is not None:
61            return self._repr
62        info = self._repr_info()
63        return '<{}>'.format(' '.join(info))
64
65    def cancel(self):
66        if not self._cancelled:
67            self._cancelled = True
68            if self._loop.get_debug():
69                # Keep a representation in debug mode to keep callback and
70                # parameters. For example, to log the warning
71                # "Executing <Handle...> took 2.5 second"
72                self._repr = repr(self)
73            self._callback = None
74            self._args = None
75
76    def cancelled(self):
77        return self._cancelled
78
79    def _run(self):
80        try:
81            self._context.run(self._callback, *self._args)
82        except (SystemExit, KeyboardInterrupt):
83            raise
84        except BaseException as exc:
85            cb = format_helpers._format_callback_source(
86                self._callback, self._args)
87            msg = f'Exception in callback {cb}'
88            context = {
89                'message': msg,
90                'exception': exc,
91                'handle': self,
92            }
93            if self._source_traceback:
94                context['source_traceback'] = self._source_traceback
95            self._loop.call_exception_handler(context)
96        self = None  # Needed to break cycles when an exception occurs.
97
98
99class TimerHandle(Handle):
100    """Object returned by timed callback registration methods."""
101
102    __slots__ = ['_scheduled', '_when']
103
104    def __init__(self, when, callback, args, loop, context=None):
105        assert when is not None
106        super().__init__(callback, args, loop, context)
107        if self._source_traceback:
108            del self._source_traceback[-1]
109        self._when = when
110        self._scheduled = False
111
112    def _repr_info(self):
113        info = super()._repr_info()
114        pos = 2 if self._cancelled else 1
115        info.insert(pos, f'when={self._when}')
116        return info
117
118    def __hash__(self):
119        return hash(self._when)
120
121    def __lt__(self, other):
122        return self._when < other._when
123
124    def __le__(self, other):
125        if self._when < other._when:
126            return True
127        return self.__eq__(other)
128
129    def __gt__(self, other):
130        return self._when > other._when
131
132    def __ge__(self, other):
133        if self._when > other._when:
134            return True
135        return self.__eq__(other)
136
137    def __eq__(self, other):
138        if isinstance(other, TimerHandle):
139            return (self._when == other._when and
140                    self._callback == other._callback and
141                    self._args == other._args and
142                    self._cancelled == other._cancelled)
143        return NotImplemented
144
145    def __ne__(self, other):
146        equal = self.__eq__(other)
147        return NotImplemented if equal is NotImplemented else not equal
148
149    def cancel(self):
150        if not self._cancelled:
151            self._loop._timer_handle_cancelled(self)
152        super().cancel()
153
154    def when(self):
155        """Return a scheduled callback time.
156
157        The time is an absolute timestamp, using the same time
158        reference as loop.time().
159        """
160        return self._when
161
162
163class AbstractServer:
164    """Abstract server returned by create_server()."""
165
166    def close(self):
167        """Stop serving.  This leaves existing connections open."""
168        raise NotImplementedError
169
170    def get_loop(self):
171        """Get the event loop the Server object is attached to."""
172        raise NotImplementedError
173
174    def is_serving(self):
175        """Return True if the server is accepting connections."""
176        raise NotImplementedError
177
178    async def start_serving(self):
179        """Start accepting connections.
180
181        This method is idempotent, so it can be called when
182        the server is already being serving.
183        """
184        raise NotImplementedError
185
186    async def serve_forever(self):
187        """Start accepting connections until the coroutine is cancelled.
188
189        The server is closed when the coroutine is cancelled.
190        """
191        raise NotImplementedError
192
193    async def wait_closed(self):
194        """Coroutine to wait until service is closed."""
195        raise NotImplementedError
196
197    async def __aenter__(self):
198        return self
199
200    async def __aexit__(self, *exc):
201        self.close()
202        await self.wait_closed()
203
204
205class AbstractEventLoop:
206    """Abstract event loop."""
207
208    # Running and stopping the event loop.
209
210    def run_forever(self):
211        """Run the event loop until stop() is called."""
212        raise NotImplementedError
213
214    def run_until_complete(self, future):
215        """Run the event loop until a Future is done.
216
217        Return the Future's result, or raise its exception.
218        """
219        raise NotImplementedError
220
221    def stop(self):
222        """Stop the event loop as soon as reasonable.
223
224        Exactly how soon that is may depend on the implementation, but
225        no more I/O callbacks should be scheduled.
226        """
227        raise NotImplementedError
228
229    def is_running(self):
230        """Return whether the event loop is currently running."""
231        raise NotImplementedError
232
233    def is_closed(self):
234        """Returns True if the event loop was closed."""
235        raise NotImplementedError
236
237    def close(self):
238        """Close the loop.
239
240        The loop should not be running.
241
242        This is idempotent and irreversible.
243
244        No other methods should be called after this one.
245        """
246        raise NotImplementedError
247
248    async def shutdown_asyncgens(self):
249        """Shutdown all active asynchronous generators."""
250        raise NotImplementedError
251
252    # Methods scheduling callbacks.  All these return Handles.
253
254    def _timer_handle_cancelled(self, handle):
255        """Notification that a TimerHandle has been cancelled."""
256        raise NotImplementedError
257
258    def call_soon(self, callback, *args):
259        return self.call_later(0, callback, *args)
260
261    def call_later(self, delay, callback, *args):
262        raise NotImplementedError
263
264    def call_at(self, when, callback, *args):
265        raise NotImplementedError
266
267    def time(self):
268        raise NotImplementedError
269
270    def create_future(self):
271        raise NotImplementedError
272
273    # Method scheduling a coroutine object: create a task.
274
275    def create_task(self, coro, *, name=None):
276        raise NotImplementedError
277
278    # Methods for interacting with threads.
279
280    def call_soon_threadsafe(self, callback, *args):
281        raise NotImplementedError
282
283    def run_in_executor(self, executor, func, *args):
284        raise NotImplementedError
285
286    def set_default_executor(self, executor):
287        raise NotImplementedError
288
289    # Network I/O methods returning Futures.
290
291    async def getaddrinfo(self, host, port, *,
292                          family=0, type=0, proto=0, flags=0):
293        raise NotImplementedError
294
295    async def getnameinfo(self, sockaddr, flags=0):
296        raise NotImplementedError
297
298    async def create_connection(
299            self, protocol_factory, host=None, port=None,
300            *, ssl=None, family=0, proto=0,
301            flags=0, sock=None, local_addr=None,
302            server_hostname=None,
303            ssl_handshake_timeout=None,
304            happy_eyeballs_delay=None, interleave=None):
305        raise NotImplementedError
306
307    async def create_server(
308            self, protocol_factory, host=None, port=None,
309            *, family=socket.AF_UNSPEC,
310            flags=socket.AI_PASSIVE, sock=None, backlog=100,
311            ssl=None, reuse_address=None, reuse_port=None,
312            ssl_handshake_timeout=None,
313            start_serving=True):
314        """A coroutine which creates a TCP server bound to host and port.
315
316        The return value is a Server object which can be used to stop
317        the service.
318
319        If host is an empty string or None all interfaces are assumed
320        and a list of multiple sockets will be returned (most likely
321        one for IPv4 and another one for IPv6). The host parameter can also be
322        a sequence (e.g. list) of hosts to bind to.
323
324        family can be set to either AF_INET or AF_INET6 to force the
325        socket to use IPv4 or IPv6. If not set it will be determined
326        from host (defaults to AF_UNSPEC).
327
328        flags is a bitmask for getaddrinfo().
329
330        sock can optionally be specified in order to use a preexisting
331        socket object.
332
333        backlog is the maximum number of queued connections passed to
334        listen() (defaults to 100).
335
336        ssl can be set to an SSLContext to enable SSL over the
337        accepted connections.
338
339        reuse_address tells the kernel to reuse a local socket in
340        TIME_WAIT state, without waiting for its natural timeout to
341        expire. If not specified will automatically be set to True on
342        UNIX.
343
344        reuse_port tells the kernel to allow this endpoint to be bound to
345        the same port as other existing endpoints are bound to, so long as
346        they all set this flag when being created. This option is not
347        supported on Windows.
348
349        ssl_handshake_timeout is the time in seconds that an SSL server
350        will wait for completion of the SSL handshake before aborting the
351        connection. Default is 60s.
352
353        start_serving set to True (default) causes the created server
354        to start accepting connections immediately.  When set to False,
355        the user should await Server.start_serving() or Server.serve_forever()
356        to make the server to start accepting connections.
357        """
358        raise NotImplementedError
359
360    async def sendfile(self, transport, file, offset=0, count=None,
361                       *, fallback=True):
362        """Send a file through a transport.
363
364        Return an amount of sent bytes.
365        """
366        raise NotImplementedError
367
368    async def start_tls(self, transport, protocol, sslcontext, *,
369                        server_side=False,
370                        server_hostname=None,
371                        ssl_handshake_timeout=None):
372        """Upgrade a transport to TLS.
373
374        Return a new transport that *protocol* should start using
375        immediately.
376        """
377        raise NotImplementedError
378
379    async def create_unix_connection(
380            self, protocol_factory, path=None, *,
381            ssl=None, sock=None,
382            server_hostname=None,
383            ssl_handshake_timeout=None):
384        raise NotImplementedError
385
386    async def create_unix_server(
387            self, protocol_factory, path=None, *,
388            sock=None, backlog=100, ssl=None,
389            ssl_handshake_timeout=None,
390            start_serving=True):
391        """A coroutine which creates a UNIX Domain Socket server.
392
393        The return value is a Server object, which can be used to stop
394        the service.
395
396        path is a str, representing a file systsem path to bind the
397        server socket to.
398
399        sock can optionally be specified in order to use a preexisting
400        socket object.
401
402        backlog is the maximum number of queued connections passed to
403        listen() (defaults to 100).
404
405        ssl can be set to an SSLContext to enable SSL over the
406        accepted connections.
407
408        ssl_handshake_timeout is the time in seconds that an SSL server
409        will wait for the SSL handshake to complete (defaults to 60s).
410
411        start_serving set to True (default) causes the created server
412        to start accepting connections immediately.  When set to False,
413        the user should await Server.start_serving() or Server.serve_forever()
414        to make the server to start accepting connections.
415        """
416        raise NotImplementedError
417
418    async def create_datagram_endpoint(self, protocol_factory,
419                                       local_addr=None, remote_addr=None, *,
420                                       family=0, proto=0, flags=0,
421                                       reuse_address=None, reuse_port=None,
422                                       allow_broadcast=None, sock=None):
423        """A coroutine which creates a datagram endpoint.
424
425        This method will try to establish the endpoint in the background.
426        When successful, the coroutine returns a (transport, protocol) pair.
427
428        protocol_factory must be a callable returning a protocol instance.
429
430        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
431        host (or family if specified), socket type SOCK_DGRAM.
432
433        reuse_address tells the kernel to reuse a local socket in
434        TIME_WAIT state, without waiting for its natural timeout to
435        expire. If not specified it will automatically be set to True on
436        UNIX.
437
438        reuse_port tells the kernel to allow this endpoint to be bound to
439        the same port as other existing endpoints are bound to, so long as
440        they all set this flag when being created. This option is not
441        supported on Windows and some UNIX's. If the
442        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
443        capability is unsupported.
444
445        allow_broadcast tells the kernel to allow this endpoint to send
446        messages to the broadcast address.
447
448        sock can optionally be specified in order to use a preexisting
449        socket object.
450        """
451        raise NotImplementedError
452
453    # Pipes and subprocesses.
454
455    async def connect_read_pipe(self, protocol_factory, pipe):
456        """Register read pipe in event loop. Set the pipe to non-blocking mode.
457
458        protocol_factory should instantiate object with Protocol interface.
459        pipe is a file-like object.
460        Return pair (transport, protocol), where transport supports the
461        ReadTransport interface."""
462        # The reason to accept file-like object instead of just file descriptor
463        # is: we need to own pipe and close it at transport finishing
464        # Can got complicated errors if pass f.fileno(),
465        # close fd in pipe transport then close f and vise versa.
466        raise NotImplementedError
467
468    async def connect_write_pipe(self, protocol_factory, pipe):
469        """Register write pipe in event loop.
470
471        protocol_factory should instantiate object with BaseProtocol interface.
472        Pipe is file-like object already switched to nonblocking.
473        Return pair (transport, protocol), where transport support
474        WriteTransport interface."""
475        # The reason to accept file-like object instead of just file descriptor
476        # is: we need to own pipe and close it at transport finishing
477        # Can got complicated errors if pass f.fileno(),
478        # close fd in pipe transport then close f and vise versa.
479        raise NotImplementedError
480
481    async def subprocess_shell(self, protocol_factory, cmd, *,
482                               stdin=subprocess.PIPE,
483                               stdout=subprocess.PIPE,
484                               stderr=subprocess.PIPE,
485                               **kwargs):
486        raise NotImplementedError
487
488    async def subprocess_exec(self, protocol_factory, *args,
489                              stdin=subprocess.PIPE,
490                              stdout=subprocess.PIPE,
491                              stderr=subprocess.PIPE,
492                              **kwargs):
493        raise NotImplementedError
494
495    # Ready-based callback registration methods.
496    # The add_*() methods return None.
497    # The remove_*() methods return True if something was removed,
498    # False if there was nothing to delete.
499
500    def add_reader(self, fd, callback, *args):
501        raise NotImplementedError
502
503    def remove_reader(self, fd):
504        raise NotImplementedError
505
506    def add_writer(self, fd, callback, *args):
507        raise NotImplementedError
508
509    def remove_writer(self, fd):
510        raise NotImplementedError
511
512    # Completion based I/O methods returning Futures.
513
514    async def sock_recv(self, sock, nbytes):
515        raise NotImplementedError
516
517    async def sock_recv_into(self, sock, buf):
518        raise NotImplementedError
519
520    async def sock_sendall(self, sock, data):
521        raise NotImplementedError
522
523    async def sock_connect(self, sock, address):
524        raise NotImplementedError
525
526    async def sock_accept(self, sock):
527        raise NotImplementedError
528
529    async def sock_sendfile(self, sock, file, offset=0, count=None,
530                            *, fallback=None):
531        raise NotImplementedError
532
533    # Signal handling.
534
535    def add_signal_handler(self, sig, callback, *args):
536        raise NotImplementedError
537
538    def remove_signal_handler(self, sig):
539        raise NotImplementedError
540
541    # Task factory.
542
543    def set_task_factory(self, factory):
544        raise NotImplementedError
545
546    def get_task_factory(self):
547        raise NotImplementedError
548
549    # Error handlers.
550
551    def get_exception_handler(self):
552        raise NotImplementedError
553
554    def set_exception_handler(self, handler):
555        raise NotImplementedError
556
557    def default_exception_handler(self, context):
558        raise NotImplementedError
559
560    def call_exception_handler(self, context):
561        raise NotImplementedError
562
563    # Debug flag management.
564
565    def get_debug(self):
566        raise NotImplementedError
567
568    def set_debug(self, enabled):
569        raise NotImplementedError
570
571
572class AbstractEventLoopPolicy:
573    """Abstract policy for accessing the event loop."""
574
575    def get_event_loop(self):
576        """Get the event loop for the current context.
577
578        Returns an event loop object implementing the BaseEventLoop interface,
579        or raises an exception in case no event loop has been set for the
580        current context and the current policy does not specify to create one.
581
582        It should never return None."""
583        raise NotImplementedError
584
585    def set_event_loop(self, loop):
586        """Set the event loop for the current context to loop."""
587        raise NotImplementedError
588
589    def new_event_loop(self):
590        """Create and return a new event loop object according to this
591        policy's rules. If there's need to set this loop as the event loop for
592        the current context, set_event_loop must be called explicitly."""
593        raise NotImplementedError
594
595    # Child processes handling (Unix only).
596
597    def get_child_watcher(self):
598        "Get the watcher for child processes."
599        raise NotImplementedError
600
601    def set_child_watcher(self, watcher):
602        """Set the watcher for child processes."""
603        raise NotImplementedError
604
605
606class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
607    """Default policy implementation for accessing the event loop.
608
609    In this policy, each thread has its own event loop.  However, we
610    only automatically create an event loop by default for the main
611    thread; other threads by default have no event loop.
612
613    Other policies may have different rules (e.g. a single global
614    event loop, or automatically creating an event loop per thread, or
615    using some other notion of context to which an event loop is
616    associated).
617    """
618
619    _loop_factory = None
620
621    class _Local(threading.local):
622        _loop = None
623        _set_called = False
624
625    def __init__(self):
626        self._local = self._Local()
627
628    def get_event_loop(self):
629        """Get the event loop for the current context.
630
631        Returns an instance of EventLoop or raises an exception.
632        """
633        if (self._local._loop is None and
634                not self._local._set_called and
635                isinstance(threading.current_thread(), threading._MainThread)):
636            self.set_event_loop(self.new_event_loop())
637
638        if self._local._loop is None:
639            raise RuntimeError('There is no current event loop in thread %r.'
640                               % threading.current_thread().name)
641
642        return self._local._loop
643
644    def set_event_loop(self, loop):
645        """Set the event loop."""
646        self._local._set_called = True
647        assert loop is None or isinstance(loop, AbstractEventLoop)
648        self._local._loop = loop
649
650    def new_event_loop(self):
651        """Create a new event loop.
652
653        You must call set_event_loop() to make this the current event
654        loop.
655        """
656        return self._loop_factory()
657
658
659# Event loop policy.  The policy itself is always global, even if the
660# policy's rules say that there is an event loop per thread (or other
661# notion of context).  The default policy is installed by the first
662# call to get_event_loop_policy().
663_event_loop_policy = None
664
665# Lock for protecting the on-the-fly creation of the event loop policy.
666_lock = threading.Lock()
667
668
669# A TLS for the running event loop, used by _get_running_loop.
670class _RunningLoop(threading.local):
671    loop_pid = (None, None)
672
673
674_running_loop = _RunningLoop()
675
676
677def get_running_loop():
678    """Return the running event loop.  Raise a RuntimeError if there is none.
679
680    This function is thread-specific.
681    """
682    # NOTE: this function is implemented in C (see _asynciomodule.c)
683    loop = _get_running_loop()
684    if loop is None:
685        raise RuntimeError('no running event loop')
686    return loop
687
688
689def _get_running_loop():
690    """Return the running event loop or None.
691
692    This is a low-level function intended to be used by event loops.
693    This function is thread-specific.
694    """
695    # NOTE: this function is implemented in C (see _asynciomodule.c)
696    running_loop, pid = _running_loop.loop_pid
697    if running_loop is not None and pid == os.getpid():
698        return running_loop
699
700
701def _set_running_loop(loop):
702    """Set the running event loop.
703
704    This is a low-level function intended to be used by event loops.
705    This function is thread-specific.
706    """
707    # NOTE: this function is implemented in C (see _asynciomodule.c)
708    _running_loop.loop_pid = (loop, os.getpid())
709
710
711def _init_event_loop_policy():
712    global _event_loop_policy
713    with _lock:
714        if _event_loop_policy is None:  # pragma: no branch
715            from . import DefaultEventLoopPolicy
716            _event_loop_policy = DefaultEventLoopPolicy()
717
718
719def get_event_loop_policy():
720    """Get the current event loop policy."""
721    if _event_loop_policy is None:
722        _init_event_loop_policy()
723    return _event_loop_policy
724
725
726def set_event_loop_policy(policy):
727    """Set the current event loop policy.
728
729    If policy is None, the default policy is restored."""
730    global _event_loop_policy
731    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
732    _event_loop_policy = policy
733
734
735def get_event_loop():
736    """Return an asyncio event loop.
737
738    When called from a coroutine or a callback (e.g. scheduled with call_soon
739    or similar API), this function will always return the running event loop.
740
741    If there is no running event loop set, the function will return
742    the result of `get_event_loop_policy().get_event_loop()` call.
743    """
744    # NOTE: this function is implemented in C (see _asynciomodule.c)
745    current_loop = _get_running_loop()
746    if current_loop is not None:
747        return current_loop
748    return get_event_loop_policy().get_event_loop()
749
750
751def set_event_loop(loop):
752    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
753    get_event_loop_policy().set_event_loop(loop)
754
755
756def new_event_loop():
757    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
758    return get_event_loop_policy().new_event_loop()
759
760
761def get_child_watcher():
762    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
763    return get_event_loop_policy().get_child_watcher()
764
765
766def set_child_watcher(watcher):
767    """Equivalent to calling
768    get_event_loop_policy().set_child_watcher(watcher)."""
769    return get_event_loop_policy().set_child_watcher(watcher)
770
771
772# Alias pure-Python implementations for testing purposes.
773_py__get_running_loop = _get_running_loop
774_py__set_running_loop = _set_running_loop
775_py_get_running_loop = get_running_loop
776_py_get_event_loop = get_event_loop
777
778
779try:
780    # get_event_loop() is one of the most frequently called
781    # functions in asyncio.  Pure Python implementation is
782    # about 4 times slower than C-accelerated.
783    from _asyncio import (_get_running_loop, _set_running_loop,
784                          get_running_loop, get_event_loop)
785except ImportError:
786    pass
787else:
788    # Alias C implementations for testing purposes.
789    _c__get_running_loop = _get_running_loop
790    _c__set_running_loop = _set_running_loop
791    _c_get_running_loop = get_running_loop
792    _c_get_event_loop = get_event_loop
793