1"""Event loop and event loop policy."""
2
3__all__ = (
4    'AbstractEventLoopPolicy',
5    'AbstractEventLoop', 'AbstractServer',
6    'Handle', 'TimerHandle',
7    'get_event_loop_policy', 'set_event_loop_policy',
8    'get_event_loop', 'set_event_loop', 'new_event_loop',
9    'get_child_watcher', 'set_child_watcher',
10    '_set_running_loop', 'get_running_loop',
11    '_get_running_loop',
12)
13
14import contextvars
15import os
16import socket
17import subprocess
18import sys
19import threading
20
21from . import format_helpers
22
23
24class Handle:
25    """Object returned by callback registration methods."""
26
27    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
28                 '_source_traceback', '_repr', '__weakref__',
29                 '_context')
30
31    def __init__(self, callback, args, loop, context=None):
32        if context is None:
33            context = contextvars.copy_context()
34        self._context = context
35        self._loop = loop
36        self._callback = callback
37        self._args = args
38        self._cancelled = False
39        self._repr = None
40        if self._loop.get_debug():
41            self._source_traceback = format_helpers.extract_stack(
42                sys._getframe(1))
43        else:
44            self._source_traceback = None
45
46    def _repr_info(self):
47        info = [self.__class__.__name__]
48        if self._cancelled:
49            info.append('cancelled')
50        if self._callback is not None:
51            info.append(format_helpers._format_callback_source(
52                self._callback, self._args))
53        if self._source_traceback:
54            frame = self._source_traceback[-1]
55            info.append(f'created at {frame[0]}:{frame[1]}')
56        return info
57
58    def __repr__(self):
59        if self._repr is not None:
60            return self._repr
61        info = self._repr_info()
62        return '<{}>'.format(' '.join(info))
63
64    def cancel(self):
65        if not self._cancelled:
66            self._cancelled = True
67            if self._loop.get_debug():
68                # Keep a representation in debug mode to keep callback and
69                # parameters. For example, to log the warning
70                # "Executing <Handle...> took 2.5 second"
71                self._repr = repr(self)
72            self._callback = None
73            self._args = None
74
75    def cancelled(self):
76        return self._cancelled
77
78    def _run(self):
79        try:
80            self._context.run(self._callback, *self._args)
81        except (SystemExit, KeyboardInterrupt):
82            raise
83        except BaseException as exc:
84            cb = format_helpers._format_callback_source(
85                self._callback, self._args)
86            msg = f'Exception in callback {cb}'
87            context = {
88                'message': msg,
89                'exception': exc,
90                'handle': self,
91            }
92            if self._source_traceback:
93                context['source_traceback'] = self._source_traceback
94            self._loop.call_exception_handler(context)
95        self = None  # Needed to break cycles when an exception occurs.
96
97
98class TimerHandle(Handle):
99    """Object returned by timed callback registration methods."""
100
101    __slots__ = ['_scheduled', '_when']
102
103    def __init__(self, when, callback, args, loop, context=None):
104        super().__init__(callback, args, loop, context)
105        if self._source_traceback:
106            del self._source_traceback[-1]
107        self._when = when
108        self._scheduled = False
109
110    def _repr_info(self):
111        info = super()._repr_info()
112        pos = 2 if self._cancelled else 1
113        info.insert(pos, f'when={self._when}')
114        return info
115
116    def __hash__(self):
117        return hash(self._when)
118
119    def __lt__(self, other):
120        if isinstance(other, TimerHandle):
121            return self._when < other._when
122        return NotImplemented
123
124    def __le__(self, other):
125        if isinstance(other, TimerHandle):
126            return self._when < other._when or self.__eq__(other)
127        return NotImplemented
128
129    def __gt__(self, other):
130        if isinstance(other, TimerHandle):
131            return self._when > other._when
132        return NotImplemented
133
134    def __ge__(self, other):
135        if isinstance(other, TimerHandle):
136            return self._when > other._when or self.__eq__(other)
137        return NotImplemented
138
139    def __eq__(self, other):
140        if isinstance(other, TimerHandle):
141            return (self._when == other._when and
142                    self._callback == other._callback and
143                    self._args == other._args and
144                    self._cancelled == other._cancelled)
145        return NotImplemented
146
147    def cancel(self):
148        if not self._cancelled:
149            self._loop._timer_handle_cancelled(self)
150        super().cancel()
151
152    def when(self):
153        """Return a scheduled callback time.
154
155        The time is an absolute timestamp, using the same time
156        reference as loop.time().
157        """
158        return self._when
159
160
161class AbstractServer:
162    """Abstract server returned by create_server()."""
163
164    def close(self):
165        """Stop serving.  This leaves existing connections open."""
166        raise NotImplementedError
167
168    def get_loop(self):
169        """Get the event loop the Server object is attached to."""
170        raise NotImplementedError
171
172    def is_serving(self):
173        """Return True if the server is accepting connections."""
174        raise NotImplementedError
175
176    async def start_serving(self):
177        """Start accepting connections.
178
179        This method is idempotent, so it can be called when
180        the server is already being serving.
181        """
182        raise NotImplementedError
183
184    async def serve_forever(self):
185        """Start accepting connections until the coroutine is cancelled.
186
187        The server is closed when the coroutine is cancelled.
188        """
189        raise NotImplementedError
190
191    async def wait_closed(self):
192        """Coroutine to wait until service is closed."""
193        raise NotImplementedError
194
195    async def __aenter__(self):
196        return self
197
198    async def __aexit__(self, *exc):
199        self.close()
200        await self.wait_closed()
201
202
203class AbstractEventLoop:
204    """Abstract event loop."""
205
206    # Running and stopping the event loop.
207
208    def run_forever(self):
209        """Run the event loop until stop() is called."""
210        raise NotImplementedError
211
212    def run_until_complete(self, future):
213        """Run the event loop until a Future is done.
214
215        Return the Future's result, or raise its exception.
216        """
217        raise NotImplementedError
218
219    def stop(self):
220        """Stop the event loop as soon as reasonable.
221
222        Exactly how soon that is may depend on the implementation, but
223        no more I/O callbacks should be scheduled.
224        """
225        raise NotImplementedError
226
227    def is_running(self):
228        """Return whether the event loop is currently running."""
229        raise NotImplementedError
230
231    def is_closed(self):
232        """Returns True if the event loop was closed."""
233        raise NotImplementedError
234
235    def close(self):
236        """Close the loop.
237
238        The loop should not be running.
239
240        This is idempotent and irreversible.
241
242        No other methods should be called after this one.
243        """
244        raise NotImplementedError
245
246    async def shutdown_asyncgens(self):
247        """Shutdown all active asynchronous generators."""
248        raise NotImplementedError
249
250    async def shutdown_default_executor(self):
251        """Schedule the shutdown of the default executor."""
252        raise NotImplementedError
253
254    # Methods scheduling callbacks.  All these return Handles.
255
256    def _timer_handle_cancelled(self, handle):
257        """Notification that a TimerHandle has been cancelled."""
258        raise NotImplementedError
259
260    def call_soon(self, callback, *args):
261        return self.call_later(0, callback, *args)
262
263    def call_later(self, delay, callback, *args):
264        raise NotImplementedError
265
266    def call_at(self, when, callback, *args):
267        raise NotImplementedError
268
269    def time(self):
270        raise NotImplementedError
271
272    def create_future(self):
273        raise NotImplementedError
274
275    # Method scheduling a coroutine object: create a task.
276
277    def create_task(self, coro, *, name=None):
278        raise NotImplementedError
279
280    # Methods for interacting with threads.
281
282    def call_soon_threadsafe(self, callback, *args):
283        raise NotImplementedError
284
285    def run_in_executor(self, executor, func, *args):
286        raise NotImplementedError
287
288    def set_default_executor(self, executor):
289        raise NotImplementedError
290
291    # Network I/O methods returning Futures.
292
293    async def getaddrinfo(self, host, port, *,
294                          family=0, type=0, proto=0, flags=0):
295        raise NotImplementedError
296
297    async def getnameinfo(self, sockaddr, flags=0):
298        raise NotImplementedError
299
300    async def create_connection(
301            self, protocol_factory, host=None, port=None,
302            *, ssl=None, family=0, proto=0,
303            flags=0, sock=None, local_addr=None,
304            server_hostname=None,
305            ssl_handshake_timeout=None,
306            happy_eyeballs_delay=None, interleave=None):
307        raise NotImplementedError
308
309    async def create_server(
310            self, protocol_factory, host=None, port=None,
311            *, family=socket.AF_UNSPEC,
312            flags=socket.AI_PASSIVE, sock=None, backlog=100,
313            ssl=None, reuse_address=None, reuse_port=None,
314            ssl_handshake_timeout=None,
315            start_serving=True):
316        """A coroutine which creates a TCP server bound to host and port.
317
318        The return value is a Server object which can be used to stop
319        the service.
320
321        If host is an empty string or None all interfaces are assumed
322        and a list of multiple sockets will be returned (most likely
323        one for IPv4 and another one for IPv6). The host parameter can also be
324        a sequence (e.g. list) of hosts to bind to.
325
326        family can be set to either AF_INET or AF_INET6 to force the
327        socket to use IPv4 or IPv6. If not set it will be determined
328        from host (defaults to AF_UNSPEC).
329
330        flags is a bitmask for getaddrinfo().
331
332        sock can optionally be specified in order to use a preexisting
333        socket object.
334
335        backlog is the maximum number of queued connections passed to
336        listen() (defaults to 100).
337
338        ssl can be set to an SSLContext to enable SSL over the
339        accepted connections.
340
341        reuse_address tells the kernel to reuse a local socket in
342        TIME_WAIT state, without waiting for its natural timeout to
343        expire. If not specified will automatically be set to True on
344        UNIX.
345
346        reuse_port tells the kernel to allow this endpoint to be bound to
347        the same port as other existing endpoints are bound to, so long as
348        they all set this flag when being created. This option is not
349        supported on Windows.
350
351        ssl_handshake_timeout is the time in seconds that an SSL server
352        will wait for completion of the SSL handshake before aborting the
353        connection. Default is 60s.
354
355        start_serving set to True (default) causes the created server
356        to start accepting connections immediately.  When set to False,
357        the user should await Server.start_serving() or Server.serve_forever()
358        to make the server to start accepting connections.
359        """
360        raise NotImplementedError
361
362    async def sendfile(self, transport, file, offset=0, count=None,
363                       *, fallback=True):
364        """Send a file through a transport.
365
366        Return an amount of sent bytes.
367        """
368        raise NotImplementedError
369
370    async def start_tls(self, transport, protocol, sslcontext, *,
371                        server_side=False,
372                        server_hostname=None,
373                        ssl_handshake_timeout=None):
374        """Upgrade a transport to TLS.
375
376        Return a new transport that *protocol* should start using
377        immediately.
378        """
379        raise NotImplementedError
380
381    async def create_unix_connection(
382            self, protocol_factory, path=None, *,
383            ssl=None, sock=None,
384            server_hostname=None,
385            ssl_handshake_timeout=None):
386        raise NotImplementedError
387
388    async def create_unix_server(
389            self, protocol_factory, path=None, *,
390            sock=None, backlog=100, ssl=None,
391            ssl_handshake_timeout=None,
392            start_serving=True):
393        """A coroutine which creates a UNIX Domain Socket server.
394
395        The return value is a Server object, which can be used to stop
396        the service.
397
398        path is a str, representing a file system path to bind the
399        server socket to.
400
401        sock can optionally be specified in order to use a preexisting
402        socket object.
403
404        backlog is the maximum number of queued connections passed to
405        listen() (defaults to 100).
406
407        ssl can be set to an SSLContext to enable SSL over the
408        accepted connections.
409
410        ssl_handshake_timeout is the time in seconds that an SSL server
411        will wait for the SSL handshake to complete (defaults to 60s).
412
413        start_serving set to True (default) causes the created server
414        to start accepting connections immediately.  When set to False,
415        the user should await Server.start_serving() or Server.serve_forever()
416        to make the server to start accepting connections.
417        """
418        raise NotImplementedError
419
420    async def connect_accepted_socket(
421            self, protocol_factory, sock,
422            *, ssl=None,
423            ssl_handshake_timeout=None):
424        """Handle an accepted connection.
425
426        This is used by servers that accept connections outside of
427        asyncio, but use asyncio to handle connections.
428
429        This method is a coroutine.  When completed, the coroutine
430        returns a (transport, protocol) pair.
431        """
432        raise NotImplementedError
433
434    async def create_datagram_endpoint(self, protocol_factory,
435                                       local_addr=None, remote_addr=None, *,
436                                       family=0, proto=0, flags=0,
437                                       reuse_address=None, reuse_port=None,
438                                       allow_broadcast=None, sock=None):
439        """A coroutine which creates a datagram endpoint.
440
441        This method will try to establish the endpoint in the background.
442        When successful, the coroutine returns a (transport, protocol) pair.
443
444        protocol_factory must be a callable returning a protocol instance.
445
446        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
447        host (or family if specified), socket type SOCK_DGRAM.
448
449        reuse_address tells the kernel to reuse a local socket in
450        TIME_WAIT state, without waiting for its natural timeout to
451        expire. If not specified it will automatically be set to True on
452        UNIX.
453
454        reuse_port tells the kernel to allow this endpoint to be bound to
455        the same port as other existing endpoints are bound to, so long as
456        they all set this flag when being created. This option is not
457        supported on Windows and some UNIX's. If the
458        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
459        capability is unsupported.
460
461        allow_broadcast tells the kernel to allow this endpoint to send
462        messages to the broadcast address.
463
464        sock can optionally be specified in order to use a preexisting
465        socket object.
466        """
467        raise NotImplementedError
468
469    # Pipes and subprocesses.
470
471    async def connect_read_pipe(self, protocol_factory, pipe):
472        """Register read pipe in event loop. Set the pipe to non-blocking mode.
473
474        protocol_factory should instantiate object with Protocol interface.
475        pipe is a file-like object.
476        Return pair (transport, protocol), where transport supports the
477        ReadTransport interface."""
478        # The reason to accept file-like object instead of just file descriptor
479        # is: we need to own pipe and close it at transport finishing
480        # Can got complicated errors if pass f.fileno(),
481        # close fd in pipe transport then close f and vice versa.
482        raise NotImplementedError
483
484    async def connect_write_pipe(self, protocol_factory, pipe):
485        """Register write pipe in event loop.
486
487        protocol_factory should instantiate object with BaseProtocol interface.
488        Pipe is file-like object already switched to nonblocking.
489        Return pair (transport, protocol), where transport support
490        WriteTransport interface."""
491        # The reason to accept file-like object instead of just file descriptor
492        # is: we need to own pipe and close it at transport finishing
493        # Can got complicated errors if pass f.fileno(),
494        # close fd in pipe transport then close f and vice versa.
495        raise NotImplementedError
496
497    async def subprocess_shell(self, protocol_factory, cmd, *,
498                               stdin=subprocess.PIPE,
499                               stdout=subprocess.PIPE,
500                               stderr=subprocess.PIPE,
501                               **kwargs):
502        raise NotImplementedError
503
504    async def subprocess_exec(self, protocol_factory, *args,
505                              stdin=subprocess.PIPE,
506                              stdout=subprocess.PIPE,
507                              stderr=subprocess.PIPE,
508                              **kwargs):
509        raise NotImplementedError
510
511    # Ready-based callback registration methods.
512    # The add_*() methods return None.
513    # The remove_*() methods return True if something was removed,
514    # False if there was nothing to delete.
515
516    def add_reader(self, fd, callback, *args):
517        raise NotImplementedError
518
519    def remove_reader(self, fd):
520        raise NotImplementedError
521
522    def add_writer(self, fd, callback, *args):
523        raise NotImplementedError
524
525    def remove_writer(self, fd):
526        raise NotImplementedError
527
528    # Completion based I/O methods returning Futures.
529
530    async def sock_recv(self, sock, nbytes):
531        raise NotImplementedError
532
533    async def sock_recv_into(self, sock, buf):
534        raise NotImplementedError
535
536    async def sock_sendall(self, sock, data):
537        raise NotImplementedError
538
539    async def sock_connect(self, sock, address):
540        raise NotImplementedError
541
542    async def sock_accept(self, sock):
543        raise NotImplementedError
544
545    async def sock_sendfile(self, sock, file, offset=0, count=None,
546                            *, fallback=None):
547        raise NotImplementedError
548
549    # Signal handling.
550
551    def add_signal_handler(self, sig, callback, *args):
552        raise NotImplementedError
553
554    def remove_signal_handler(self, sig):
555        raise NotImplementedError
556
557    # Task factory.
558
559    def set_task_factory(self, factory):
560        raise NotImplementedError
561
562    def get_task_factory(self):
563        raise NotImplementedError
564
565    # Error handlers.
566
567    def get_exception_handler(self):
568        raise NotImplementedError
569
570    def set_exception_handler(self, handler):
571        raise NotImplementedError
572
573    def default_exception_handler(self, context):
574        raise NotImplementedError
575
576    def call_exception_handler(self, context):
577        raise NotImplementedError
578
579    # Debug flag management.
580
581    def get_debug(self):
582        raise NotImplementedError
583
584    def set_debug(self, enabled):
585        raise NotImplementedError
586
587
588class AbstractEventLoopPolicy:
589    """Abstract policy for accessing the event loop."""
590
591    def get_event_loop(self):
592        """Get the event loop for the current context.
593
594        Returns an event loop object implementing the BaseEventLoop interface,
595        or raises an exception in case no event loop has been set for the
596        current context and the current policy does not specify to create one.
597
598        It should never return None."""
599        raise NotImplementedError
600
601    def set_event_loop(self, loop):
602        """Set the event loop for the current context to loop."""
603        raise NotImplementedError
604
605    def new_event_loop(self):
606        """Create and return a new event loop object according to this
607        policy's rules. If there's need to set this loop as the event loop for
608        the current context, set_event_loop must be called explicitly."""
609        raise NotImplementedError
610
611    # Child processes handling (Unix only).
612
613    def get_child_watcher(self):
614        "Get the watcher for child processes."
615        raise NotImplementedError
616
617    def set_child_watcher(self, watcher):
618        """Set the watcher for child processes."""
619        raise NotImplementedError
620
621
622class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
623    """Default policy implementation for accessing the event loop.
624
625    In this policy, each thread has its own event loop.  However, we
626    only automatically create an event loop by default for the main
627    thread; other threads by default have no event loop.
628
629    Other policies may have different rules (e.g. a single global
630    event loop, or automatically creating an event loop per thread, or
631    using some other notion of context to which an event loop is
632    associated).
633    """
634
635    _loop_factory = None
636
637    class _Local(threading.local):
638        _loop = None
639        _set_called = False
640
641    def __init__(self):
642        self._local = self._Local()
643
644    def get_event_loop(self):
645        """Get the event loop for the current context.
646
647        Returns an instance of EventLoop or raises an exception.
648        """
649        if (self._local._loop is None and
650                not self._local._set_called and
651                threading.current_thread() is threading.main_thread()):
652            self.set_event_loop(self.new_event_loop())
653
654        if self._local._loop is None:
655            raise RuntimeError('There is no current event loop in thread %r.'
656                               % threading.current_thread().name)
657
658        return self._local._loop
659
660    def set_event_loop(self, loop):
661        """Set the event loop."""
662        self._local._set_called = True
663        if loop is not None and not isinstance(loop, AbstractEventLoop):
664            raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
665        self._local._loop = loop
666
667    def new_event_loop(self):
668        """Create a new event loop.
669
670        You must call set_event_loop() to make this the current event
671        loop.
672        """
673        return self._loop_factory()
674
675
676# Event loop policy.  The policy itself is always global, even if the
677# policy's rules say that there is an event loop per thread (or other
678# notion of context).  The default policy is installed by the first
679# call to get_event_loop_policy().
680_event_loop_policy = None
681
682# Lock for protecting the on-the-fly creation of the event loop policy.
683_lock = threading.Lock()
684
685
686# A TLS for the running event loop, used by _get_running_loop.
687class _RunningLoop(threading.local):
688    loop_pid = (None, None)
689
690
691_running_loop = _RunningLoop()
692
693
694def get_running_loop():
695    """Return the running event loop.  Raise a RuntimeError if there is none.
696
697    This function is thread-specific.
698    """
699    # NOTE: this function is implemented in C (see _asynciomodule.c)
700    loop = _get_running_loop()
701    if loop is None:
702        raise RuntimeError('no running event loop')
703    return loop
704
705
706def _get_running_loop():
707    """Return the running event loop or None.
708
709    This is a low-level function intended to be used by event loops.
710    This function is thread-specific.
711    """
712    # NOTE: this function is implemented in C (see _asynciomodule.c)
713    running_loop, pid = _running_loop.loop_pid
714    if running_loop is not None and pid == os.getpid():
715        return running_loop
716
717
718def _set_running_loop(loop):
719    """Set the running event loop.
720
721    This is a low-level function intended to be used by event loops.
722    This function is thread-specific.
723    """
724    # NOTE: this function is implemented in C (see _asynciomodule.c)
725    _running_loop.loop_pid = (loop, os.getpid())
726
727
728def _init_event_loop_policy():
729    global _event_loop_policy
730    with _lock:
731        if _event_loop_policy is None:  # pragma: no branch
732            from . import DefaultEventLoopPolicy
733            _event_loop_policy = DefaultEventLoopPolicy()
734
735
736def get_event_loop_policy():
737    """Get the current event loop policy."""
738    if _event_loop_policy is None:
739        _init_event_loop_policy()
740    return _event_loop_policy
741
742
743def set_event_loop_policy(policy):
744    """Set the current event loop policy.
745
746    If policy is None, the default policy is restored."""
747    global _event_loop_policy
748    if policy is not None and not isinstance(policy, AbstractEventLoopPolicy):
749        raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'")
750    _event_loop_policy = policy
751
752
753def get_event_loop():
754    """Return an asyncio event loop.
755
756    When called from a coroutine or a callback (e.g. scheduled with call_soon
757    or similar API), this function will always return the running event loop.
758
759    If there is no running event loop set, the function will return
760    the result of `get_event_loop_policy().get_event_loop()` call.
761    """
762    # NOTE: this function is implemented in C (see _asynciomodule.c)
763    return _py__get_event_loop()
764
765
766def _get_event_loop(stacklevel=3):
767    current_loop = _get_running_loop()
768    if current_loop is not None:
769        return current_loop
770    import warnings
771    warnings.warn('There is no current event loop',
772                  DeprecationWarning, stacklevel=stacklevel)
773    return get_event_loop_policy().get_event_loop()
774
775
776def set_event_loop(loop):
777    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
778    get_event_loop_policy().set_event_loop(loop)
779
780
781def new_event_loop():
782    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
783    return get_event_loop_policy().new_event_loop()
784
785
786def get_child_watcher():
787    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
788    return get_event_loop_policy().get_child_watcher()
789
790
791def set_child_watcher(watcher):
792    """Equivalent to calling
793    get_event_loop_policy().set_child_watcher(watcher)."""
794    return get_event_loop_policy().set_child_watcher(watcher)
795
796
797# Alias pure-Python implementations for testing purposes.
798_py__get_running_loop = _get_running_loop
799_py__set_running_loop = _set_running_loop
800_py_get_running_loop = get_running_loop
801_py_get_event_loop = get_event_loop
802_py__get_event_loop = _get_event_loop
803
804
805try:
806    # get_event_loop() is one of the most frequently called
807    # functions in asyncio.  Pure Python implementation is
808    # about 4 times slower than C-accelerated.
809    from _asyncio import (_get_running_loop, _set_running_loop,
810                          get_running_loop, get_event_loop, _get_event_loop)
811except ImportError:
812    pass
813else:
814    # Alias C implementations for testing purposes.
815    _c__get_running_loop = _get_running_loop
816    _c__set_running_loop = _set_running_loop
817    _c_get_running_loop = get_running_loop
818    _c_get_event_loop = get_event_loop
819    _c__get_event_loop = _get_event_loop
820