1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69
70def _format_handle(handle):
71    cb = handle._callback
72    if isinstance(getattr(cb, '__self__', None), tasks.Task):
73        # format the task
74        return repr(cb.__self__)
75    else:
76        return str(handle)
77
78
79def _format_pipe(fd):
80    if fd == subprocess.PIPE:
81        return '<pipe>'
82    elif fd == subprocess.STDOUT:
83        return '<stdout>'
84    else:
85        return repr(fd)
86
87
88def _set_reuseport(sock):
89    if not hasattr(socket, 'SO_REUSEPORT'):
90        raise ValueError('reuse_port not supported by socket module')
91    else:
92        try:
93            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94        except OSError:
95            raise ValueError('reuse_port not supported by socket module, '
96                             'SO_REUSEPORT defined but not implemented.')
97
98
99def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
100    # Try to skip getaddrinfo if "host" is already an IP. Users might have
101    # handled name resolution in their own code and pass in resolved IPs.
102    if not hasattr(socket, 'inet_pton'):
103        return
104
105    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106            host is None:
107        return None
108
109    if type == socket.SOCK_STREAM:
110        proto = socket.IPPROTO_TCP
111    elif type == socket.SOCK_DGRAM:
112        proto = socket.IPPROTO_UDP
113    else:
114        return None
115
116    if port is None:
117        port = 0
118    elif isinstance(port, bytes) and port == b'':
119        port = 0
120    elif isinstance(port, str) and port == '':
121        port = 0
122    else:
123        # If port's a service name like "http", don't skip getaddrinfo.
124        try:
125            port = int(port)
126        except (TypeError, ValueError):
127            return None
128
129    if family == socket.AF_UNSPEC:
130        afs = [socket.AF_INET]
131        if _HAS_IPv6:
132            afs.append(socket.AF_INET6)
133    else:
134        afs = [family]
135
136    if isinstance(host, bytes):
137        host = host.decode('idna')
138    if '%' in host:
139        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
140        # like '::1%lo0'.
141        return None
142
143    for af in afs:
144        try:
145            socket.inet_pton(af, host)
146            # The host has already been resolved.
147            if _HAS_IPv6 and af == socket.AF_INET6:
148                return af, type, proto, '', (host, port, flowinfo, scopeid)
149            else:
150                return af, type, proto, '', (host, port)
151        except OSError:
152            pass
153
154    # "host" is not an IP address.
155    return None
156
157
158def _interleave_addrinfos(addrinfos, first_address_family_count=1):
159    """Interleave list of addrinfo tuples by family."""
160    # Group addresses by family
161    addrinfos_by_family = collections.OrderedDict()
162    for addr in addrinfos:
163        family = addr[0]
164        if family not in addrinfos_by_family:
165            addrinfos_by_family[family] = []
166        addrinfos_by_family[family].append(addr)
167    addrinfos_lists = list(addrinfos_by_family.values())
168
169    reordered = []
170    if first_address_family_count > 1:
171        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
172        del addrinfos_lists[0][:first_address_family_count - 1]
173    reordered.extend(
174        a for a in itertools.chain.from_iterable(
175            itertools.zip_longest(*addrinfos_lists)
176        ) if a is not None)
177    return reordered
178
179
180def _run_until_complete_cb(fut):
181    if not fut.cancelled():
182        exc = fut.exception()
183        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
184            # Issue #22429: run_forever() already finished, no need to
185            # stop it.
186            return
187    futures._get_loop(fut).stop()
188
189
190if hasattr(socket, 'TCP_NODELAY'):
191    def _set_nodelay(sock):
192        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
193                sock.type == socket.SOCK_STREAM and
194                sock.proto == socket.IPPROTO_TCP):
195            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
196else:
197    def _set_nodelay(sock):
198        pass
199
200
201class _SendfileFallbackProtocol(protocols.Protocol):
202    def __init__(self, transp):
203        if not isinstance(transp, transports._FlowControlMixin):
204            raise TypeError("transport should be _FlowControlMixin instance")
205        self._transport = transp
206        self._proto = transp.get_protocol()
207        self._should_resume_reading = transp.is_reading()
208        self._should_resume_writing = transp._protocol_paused
209        transp.pause_reading()
210        transp.set_protocol(self)
211        if self._should_resume_writing:
212            self._write_ready_fut = self._transport._loop.create_future()
213        else:
214            self._write_ready_fut = None
215
216    async def drain(self):
217        if self._transport.is_closing():
218            raise ConnectionError("Connection closed by peer")
219        fut = self._write_ready_fut
220        if fut is None:
221            return
222        await fut
223
224    def connection_made(self, transport):
225        raise RuntimeError("Invalid state: "
226                           "connection should have been established already.")
227
228    def connection_lost(self, exc):
229        if self._write_ready_fut is not None:
230            # Never happens if peer disconnects after sending the whole content
231            # Thus disconnection is always an exception from user perspective
232            if exc is None:
233                self._write_ready_fut.set_exception(
234                    ConnectionError("Connection is closed by peer"))
235            else:
236                self._write_ready_fut.set_exception(exc)
237        self._proto.connection_lost(exc)
238
239    def pause_writing(self):
240        if self._write_ready_fut is not None:
241            return
242        self._write_ready_fut = self._transport._loop.create_future()
243
244    def resume_writing(self):
245        if self._write_ready_fut is None:
246            return
247        self._write_ready_fut.set_result(False)
248        self._write_ready_fut = None
249
250    def data_received(self, data):
251        raise RuntimeError("Invalid state: reading should be paused")
252
253    def eof_received(self):
254        raise RuntimeError("Invalid state: reading should be paused")
255
256    async def restore(self):
257        self._transport.set_protocol(self._proto)
258        if self._should_resume_reading:
259            self._transport.resume_reading()
260        if self._write_ready_fut is not None:
261            # Cancel the future.
262            # Basically it has no effect because protocol is switched back,
263            # no code should wait for it anymore.
264            self._write_ready_fut.cancel()
265        if self._should_resume_writing:
266            self._proto.resume_writing()
267
268
269class Server(events.AbstractServer):
270
271    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
272                 ssl_handshake_timeout):
273        self._loop = loop
274        self._sockets = sockets
275        self._active_count = 0
276        self._waiters = []
277        self._protocol_factory = protocol_factory
278        self._backlog = backlog
279        self._ssl_context = ssl_context
280        self._ssl_handshake_timeout = ssl_handshake_timeout
281        self._serving = False
282        self._serving_forever_fut = None
283
284    def __repr__(self):
285        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
286
287    def _attach(self):
288        assert self._sockets is not None
289        self._active_count += 1
290
291    def _detach(self):
292        assert self._active_count > 0
293        self._active_count -= 1
294        if self._active_count == 0 and self._sockets is None:
295            self._wakeup()
296
297    def _wakeup(self):
298        waiters = self._waiters
299        self._waiters = None
300        for waiter in waiters:
301            if not waiter.done():
302                waiter.set_result(waiter)
303
304    def _start_serving(self):
305        if self._serving:
306            return
307        self._serving = True
308        for sock in self._sockets:
309            sock.listen(self._backlog)
310            self._loop._start_serving(
311                self._protocol_factory, sock, self._ssl_context,
312                self, self._backlog, self._ssl_handshake_timeout)
313
314    def get_loop(self):
315        return self._loop
316
317    def is_serving(self):
318        return self._serving
319
320    @property
321    def sockets(self):
322        if self._sockets is None:
323            return ()
324        return tuple(trsock.TransportSocket(s) for s in self._sockets)
325
326    def close(self):
327        sockets = self._sockets
328        if sockets is None:
329            return
330        self._sockets = None
331
332        for sock in sockets:
333            self._loop._stop_serving(sock)
334
335        self._serving = False
336
337        if (self._serving_forever_fut is not None and
338                not self._serving_forever_fut.done()):
339            self._serving_forever_fut.cancel()
340            self._serving_forever_fut = None
341
342        if self._active_count == 0:
343            self._wakeup()
344
345    async def start_serving(self):
346        self._start_serving()
347        # Skip one loop iteration so that all 'loop.add_reader'
348        # go through.
349        await tasks.sleep(0)
350
351    async def serve_forever(self):
352        if self._serving_forever_fut is not None:
353            raise RuntimeError(
354                f'server {self!r} is already being awaited on serve_forever()')
355        if self._sockets is None:
356            raise RuntimeError(f'server {self!r} is closed')
357
358        self._start_serving()
359        self._serving_forever_fut = self._loop.create_future()
360
361        try:
362            await self._serving_forever_fut
363        except exceptions.CancelledError:
364            try:
365                self.close()
366                await self.wait_closed()
367            finally:
368                raise
369        finally:
370            self._serving_forever_fut = None
371
372    async def wait_closed(self):
373        if self._sockets is None or self._waiters is None:
374            return
375        waiter = self._loop.create_future()
376        self._waiters.append(waiter)
377        await waiter
378
379
380class BaseEventLoop(events.AbstractEventLoop):
381
382    def __init__(self):
383        self._timer_cancelled_count = 0
384        self._closed = False
385        self._stopping = False
386        self._ready = collections.deque()
387        self._scheduled = []
388        self._default_executor = None
389        self._internal_fds = 0
390        # Identifier of the thread running the event loop, or None if the
391        # event loop is not running
392        self._thread_id = None
393        self._clock_resolution = time.get_clock_info('monotonic').resolution
394        self._exception_handler = None
395        self.set_debug(coroutines._is_debug_mode())
396        # In debug mode, if the execution of a callback or a step of a task
397        # exceed this duration in seconds, the slow callback/task is logged.
398        self.slow_callback_duration = 0.1
399        self._current_handle = None
400        self._task_factory = None
401        self._coroutine_origin_tracking_enabled = False
402        self._coroutine_origin_tracking_saved_depth = None
403
404        # A weak set of all asynchronous generators that are
405        # being iterated by the loop.
406        self._asyncgens = weakref.WeakSet()
407        # Set to True when `loop.shutdown_asyncgens` is called.
408        self._asyncgens_shutdown_called = False
409        # Set to True when `loop.shutdown_default_executor` is called.
410        self._executor_shutdown_called = False
411
412    def __repr__(self):
413        return (
414            f'<{self.__class__.__name__} running={self.is_running()} '
415            f'closed={self.is_closed()} debug={self.get_debug()}>'
416        )
417
418    def create_future(self):
419        """Create a Future object attached to the loop."""
420        return futures.Future(loop=self)
421
422    def create_task(self, coro, *, name=None):
423        """Schedule a coroutine object.
424
425        Return a task object.
426        """
427        self._check_closed()
428        if self._task_factory is None:
429            task = tasks.Task(coro, loop=self, name=name)
430            if task._source_traceback:
431                del task._source_traceback[-1]
432        else:
433            task = self._task_factory(self, coro)
434            tasks._set_task_name(task, name)
435
436        return task
437
438    def set_task_factory(self, factory):
439        """Set a task factory that will be used by loop.create_task().
440
441        If factory is None the default task factory will be set.
442
443        If factory is a callable, it should have a signature matching
444        '(loop, coro)', where 'loop' will be a reference to the active
445        event loop, 'coro' will be a coroutine object.  The callable
446        must return a Future.
447        """
448        if factory is not None and not callable(factory):
449            raise TypeError('task factory must be a callable or None')
450        self._task_factory = factory
451
452    def get_task_factory(self):
453        """Return a task factory, or None if the default one is in use."""
454        return self._task_factory
455
456    def _make_socket_transport(self, sock, protocol, waiter=None, *,
457                               extra=None, server=None):
458        """Create socket transport."""
459        raise NotImplementedError
460
461    def _make_ssl_transport(
462            self, rawsock, protocol, sslcontext, waiter=None,
463            *, server_side=False, server_hostname=None,
464            extra=None, server=None,
465            ssl_handshake_timeout=None,
466            call_connection_made=True):
467        """Create SSL transport."""
468        raise NotImplementedError
469
470    def _make_datagram_transport(self, sock, protocol,
471                                 address=None, waiter=None, extra=None):
472        """Create datagram transport."""
473        raise NotImplementedError
474
475    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
476                                  extra=None):
477        """Create read pipe transport."""
478        raise NotImplementedError
479
480    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
481                                   extra=None):
482        """Create write pipe transport."""
483        raise NotImplementedError
484
485    async def _make_subprocess_transport(self, protocol, args, shell,
486                                         stdin, stdout, stderr, bufsize,
487                                         extra=None, **kwargs):
488        """Create subprocess transport."""
489        raise NotImplementedError
490
491    def _write_to_self(self):
492        """Write a byte to self-pipe, to wake up the event loop.
493
494        This may be called from a different thread.
495
496        The subclass is responsible for implementing the self-pipe.
497        """
498        raise NotImplementedError
499
500    def _process_events(self, event_list):
501        """Process selector events."""
502        raise NotImplementedError
503
504    def _check_closed(self):
505        if self._closed:
506            raise RuntimeError('Event loop is closed')
507
508    def _check_default_executor(self):
509        if self._executor_shutdown_called:
510            raise RuntimeError('Executor shutdown has been called')
511
512    def _asyncgen_finalizer_hook(self, agen):
513        self._asyncgens.discard(agen)
514        if not self.is_closed():
515            self.call_soon_threadsafe(self.create_task, agen.aclose())
516
517    def _asyncgen_firstiter_hook(self, agen):
518        if self._asyncgens_shutdown_called:
519            warnings.warn(
520                f"asynchronous generator {agen!r} was scheduled after "
521                f"loop.shutdown_asyncgens() call",
522                ResourceWarning, source=self)
523
524        self._asyncgens.add(agen)
525
526    async def shutdown_asyncgens(self):
527        """Shutdown all active asynchronous generators."""
528        self._asyncgens_shutdown_called = True
529
530        if not len(self._asyncgens):
531            # If Python version is <3.6 or we don't have any asynchronous
532            # generators alive.
533            return
534
535        closing_agens = list(self._asyncgens)
536        self._asyncgens.clear()
537
538        results = await tasks.gather(
539            *[ag.aclose() for ag in closing_agens],
540            return_exceptions=True)
541
542        for result, agen in zip(results, closing_agens):
543            if isinstance(result, Exception):
544                self.call_exception_handler({
545                    'message': f'an error occurred during closing of '
546                               f'asynchronous generator {agen!r}',
547                    'exception': result,
548                    'asyncgen': agen
549                })
550
551    async def shutdown_default_executor(self):
552        """Schedule the shutdown of the default executor."""
553        self._executor_shutdown_called = True
554        if self._default_executor is None:
555            return
556        future = self.create_future()
557        thread = threading.Thread(target=self._do_shutdown, args=(future,))
558        thread.start()
559        try:
560            await future
561        finally:
562            thread.join()
563
564    def _do_shutdown(self, future):
565        try:
566            self._default_executor.shutdown(wait=True)
567            self.call_soon_threadsafe(future.set_result, None)
568        except Exception as ex:
569            self.call_soon_threadsafe(future.set_exception, ex)
570
571    def _check_running(self):
572        if self.is_running():
573            raise RuntimeError('This event loop is already running')
574        if events._get_running_loop() is not None:
575            raise RuntimeError(
576                'Cannot run the event loop while another loop is running')
577
578    def run_forever(self):
579        """Run until stop() is called."""
580        self._check_closed()
581        self._check_running()
582        self._set_coroutine_origin_tracking(self._debug)
583        self._thread_id = threading.get_ident()
584
585        old_agen_hooks = sys.get_asyncgen_hooks()
586        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
587                               finalizer=self._asyncgen_finalizer_hook)
588        try:
589            events._set_running_loop(self)
590            while True:
591                self._run_once()
592                if self._stopping:
593                    break
594        finally:
595            self._stopping = False
596            self._thread_id = None
597            events._set_running_loop(None)
598            self._set_coroutine_origin_tracking(False)
599            sys.set_asyncgen_hooks(*old_agen_hooks)
600
601    def run_until_complete(self, future):
602        """Run until the Future is done.
603
604        If the argument is a coroutine, it is wrapped in a Task.
605
606        WARNING: It would be disastrous to call run_until_complete()
607        with the same coroutine twice -- it would wrap it in two
608        different Tasks and that can't be good.
609
610        Return the Future's result, or raise its exception.
611        """
612        self._check_closed()
613        self._check_running()
614
615        new_task = not futures.isfuture(future)
616        future = tasks.ensure_future(future, loop=self)
617        if new_task:
618            # An exception is raised if the future didn't complete, so there
619            # is no need to log the "destroy pending task" message
620            future._log_destroy_pending = False
621
622        future.add_done_callback(_run_until_complete_cb)
623        try:
624            self.run_forever()
625        except:
626            if new_task and future.done() and not future.cancelled():
627                # The coroutine raised a BaseException. Consume the exception
628                # to not log a warning, the caller doesn't have access to the
629                # local task.
630                future.exception()
631            raise
632        finally:
633            future.remove_done_callback(_run_until_complete_cb)
634        if not future.done():
635            raise RuntimeError('Event loop stopped before Future completed.')
636
637        return future.result()
638
639    def stop(self):
640        """Stop running the event loop.
641
642        Every callback already scheduled will still run.  This simply informs
643        run_forever to stop looping after a complete iteration.
644        """
645        self._stopping = True
646
647    def close(self):
648        """Close the event loop.
649
650        This clears the queues and shuts down the executor,
651        but does not wait for the executor to finish.
652
653        The event loop must not be running.
654        """
655        if self.is_running():
656            raise RuntimeError("Cannot close a running event loop")
657        if self._closed:
658            return
659        if self._debug:
660            logger.debug("Close %r", self)
661        self._closed = True
662        self._ready.clear()
663        self._scheduled.clear()
664        self._executor_shutdown_called = True
665        executor = self._default_executor
666        if executor is not None:
667            self._default_executor = None
668            executor.shutdown(wait=False)
669
670    def is_closed(self):
671        """Returns True if the event loop was closed."""
672        return self._closed
673
674    def __del__(self, _warn=warnings.warn):
675        if not self.is_closed():
676            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
677            if not self.is_running():
678                self.close()
679
680    def is_running(self):
681        """Returns True if the event loop is running."""
682        return (self._thread_id is not None)
683
684    def time(self):
685        """Return the time according to the event loop's clock.
686
687        This is a float expressed in seconds since an epoch, but the
688        epoch, precision, accuracy and drift are unspecified and may
689        differ per event loop.
690        """
691        return time.monotonic()
692
693    def call_later(self, delay, callback, *args, context=None):
694        """Arrange for a callback to be called at a given time.
695
696        Return a Handle: an opaque object with a cancel() method that
697        can be used to cancel the call.
698
699        The delay can be an int or float, expressed in seconds.  It is
700        always relative to the current time.
701
702        Each callback will be called exactly once.  If two callbacks
703        are scheduled for exactly the same time, it undefined which
704        will be called first.
705
706        Any positional arguments after the callback will be passed to
707        the callback when it is called.
708        """
709        if delay is None:
710            raise TypeError('delay must not be None')
711        timer = self.call_at(self.time() + delay, callback, *args,
712                             context=context)
713        if timer._source_traceback:
714            del timer._source_traceback[-1]
715        return timer
716
717    def call_at(self, when, callback, *args, context=None):
718        """Like call_later(), but uses an absolute time.
719
720        Absolute time corresponds to the event loop's time() method.
721        """
722        if when is None:
723            raise TypeError("when cannot be None")
724        self._check_closed()
725        if self._debug:
726            self._check_thread()
727            self._check_callback(callback, 'call_at')
728        timer = events.TimerHandle(when, callback, args, self, context)
729        if timer._source_traceback:
730            del timer._source_traceback[-1]
731        heapq.heappush(self._scheduled, timer)
732        timer._scheduled = True
733        return timer
734
735    def call_soon(self, callback, *args, context=None):
736        """Arrange for a callback to be called as soon as possible.
737
738        This operates as a FIFO queue: callbacks are called in the
739        order in which they are registered.  Each callback will be
740        called exactly once.
741
742        Any positional arguments after the callback will be passed to
743        the callback when it is called.
744        """
745        self._check_closed()
746        if self._debug:
747            self._check_thread()
748            self._check_callback(callback, 'call_soon')
749        handle = self._call_soon(callback, args, context)
750        if handle._source_traceback:
751            del handle._source_traceback[-1]
752        return handle
753
754    def _check_callback(self, callback, method):
755        if (coroutines.iscoroutine(callback) or
756                coroutines.iscoroutinefunction(callback)):
757            raise TypeError(
758                f"coroutines cannot be used with {method}()")
759        if not callable(callback):
760            raise TypeError(
761                f'a callable object was expected by {method}(), '
762                f'got {callback!r}')
763
764    def _call_soon(self, callback, args, context):
765        handle = events.Handle(callback, args, self, context)
766        if handle._source_traceback:
767            del handle._source_traceback[-1]
768        self._ready.append(handle)
769        return handle
770
771    def _check_thread(self):
772        """Check that the current thread is the thread running the event loop.
773
774        Non-thread-safe methods of this class make this assumption and will
775        likely behave incorrectly when the assumption is violated.
776
777        Should only be called when (self._debug == True).  The caller is
778        responsible for checking this condition for performance reasons.
779        """
780        if self._thread_id is None:
781            return
782        thread_id = threading.get_ident()
783        if thread_id != self._thread_id:
784            raise RuntimeError(
785                "Non-thread-safe operation invoked on an event loop other "
786                "than the current one")
787
788    def call_soon_threadsafe(self, callback, *args, context=None):
789        """Like call_soon(), but thread-safe."""
790        self._check_closed()
791        if self._debug:
792            self._check_callback(callback, 'call_soon_threadsafe')
793        handle = self._call_soon(callback, args, context)
794        if handle._source_traceback:
795            del handle._source_traceback[-1]
796        self._write_to_self()
797        return handle
798
799    def run_in_executor(self, executor, func, *args):
800        self._check_closed()
801        if self._debug:
802            self._check_callback(func, 'run_in_executor')
803        if executor is None:
804            executor = self._default_executor
805            # Only check when the default executor is being used
806            self._check_default_executor()
807            if executor is None:
808                executor = concurrent.futures.ThreadPoolExecutor(
809                    thread_name_prefix='asyncio'
810                )
811                self._default_executor = executor
812        return futures.wrap_future(
813            executor.submit(func, *args), loop=self)
814
815    def set_default_executor(self, executor):
816        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
817            raise TypeError('executor must be ThreadPoolExecutor instance')
818        self._default_executor = executor
819
820    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
821        msg = [f"{host}:{port!r}"]
822        if family:
823            msg.append(f'family={family!r}')
824        if type:
825            msg.append(f'type={type!r}')
826        if proto:
827            msg.append(f'proto={proto!r}')
828        if flags:
829            msg.append(f'flags={flags!r}')
830        msg = ', '.join(msg)
831        logger.debug('Get address info %s', msg)
832
833        t0 = self.time()
834        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
835        dt = self.time() - t0
836
837        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
838        if dt >= self.slow_callback_duration:
839            logger.info(msg)
840        else:
841            logger.debug(msg)
842        return addrinfo
843
844    async def getaddrinfo(self, host, port, *,
845                          family=0, type=0, proto=0, flags=0):
846        if self._debug:
847            getaddr_func = self._getaddrinfo_debug
848        else:
849            getaddr_func = socket.getaddrinfo
850
851        return await self.run_in_executor(
852            None, getaddr_func, host, port, family, type, proto, flags)
853
854    async def getnameinfo(self, sockaddr, flags=0):
855        return await self.run_in_executor(
856            None, socket.getnameinfo, sockaddr, flags)
857
858    async def sock_sendfile(self, sock, file, offset=0, count=None,
859                            *, fallback=True):
860        if self._debug and sock.gettimeout() != 0:
861            raise ValueError("the socket must be non-blocking")
862        self._check_sendfile_params(sock, file, offset, count)
863        try:
864            return await self._sock_sendfile_native(sock, file,
865                                                    offset, count)
866        except exceptions.SendfileNotAvailableError as exc:
867            if not fallback:
868                raise
869        return await self._sock_sendfile_fallback(sock, file,
870                                                  offset, count)
871
872    async def _sock_sendfile_native(self, sock, file, offset, count):
873        # NB: sendfile syscall is not supported for SSL sockets and
874        # non-mmap files even if sendfile is supported by OS
875        raise exceptions.SendfileNotAvailableError(
876            f"syscall sendfile is not available for socket {sock!r} "
877            "and file {file!r} combination")
878
879    async def _sock_sendfile_fallback(self, sock, file, offset, count):
880        if offset:
881            file.seek(offset)
882        blocksize = (
883            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
884            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
885        )
886        buf = bytearray(blocksize)
887        total_sent = 0
888        try:
889            while True:
890                if count:
891                    blocksize = min(count - total_sent, blocksize)
892                    if blocksize <= 0:
893                        break
894                view = memoryview(buf)[:blocksize]
895                read = await self.run_in_executor(None, file.readinto, view)
896                if not read:
897                    break  # EOF
898                await self.sock_sendall(sock, view[:read])
899                total_sent += read
900            return total_sent
901        finally:
902            if total_sent > 0 and hasattr(file, 'seek'):
903                file.seek(offset + total_sent)
904
905    def _check_sendfile_params(self, sock, file, offset, count):
906        if 'b' not in getattr(file, 'mode', 'b'):
907            raise ValueError("file should be opened in binary mode")
908        if not sock.type == socket.SOCK_STREAM:
909            raise ValueError("only SOCK_STREAM type sockets are supported")
910        if count is not None:
911            if not isinstance(count, int):
912                raise TypeError(
913                    "count must be a positive integer (got {!r})".format(count))
914            if count <= 0:
915                raise ValueError(
916                    "count must be a positive integer (got {!r})".format(count))
917        if not isinstance(offset, int):
918            raise TypeError(
919                "offset must be a non-negative integer (got {!r})".format(
920                    offset))
921        if offset < 0:
922            raise ValueError(
923                "offset must be a non-negative integer (got {!r})".format(
924                    offset))
925
926    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
927        """Create, bind and connect one socket."""
928        my_exceptions = []
929        exceptions.append(my_exceptions)
930        family, type_, proto, _, address = addr_info
931        sock = None
932        try:
933            sock = socket.socket(family=family, type=type_, proto=proto)
934            sock.setblocking(False)
935            if local_addr_infos is not None:
936                for _, _, _, _, laddr in local_addr_infos:
937                    try:
938                        sock.bind(laddr)
939                        break
940                    except OSError as exc:
941                        msg = (
942                            f'error while attempting to bind on '
943                            f'address {laddr!r}: '
944                            f'{exc.strerror.lower()}'
945                        )
946                        exc = OSError(exc.errno, msg)
947                        my_exceptions.append(exc)
948                else:  # all bind attempts failed
949                    raise my_exceptions.pop()
950            await self.sock_connect(sock, address)
951            return sock
952        except OSError as exc:
953            my_exceptions.append(exc)
954            if sock is not None:
955                sock.close()
956            raise
957        except:
958            if sock is not None:
959                sock.close()
960            raise
961
962    async def create_connection(
963            self, protocol_factory, host=None, port=None,
964            *, ssl=None, family=0,
965            proto=0, flags=0, sock=None,
966            local_addr=None, server_hostname=None,
967            ssl_handshake_timeout=None,
968            happy_eyeballs_delay=None, interleave=None):
969        """Connect to a TCP server.
970
971        Create a streaming transport connection to a given internet host and
972        port: socket family AF_INET or socket.AF_INET6 depending on host (or
973        family if specified), socket type SOCK_STREAM. protocol_factory must be
974        a callable returning a protocol instance.
975
976        This method is a coroutine which will try to establish the connection
977        in the background.  When successful, the coroutine returns a
978        (transport, protocol) pair.
979        """
980        if server_hostname is not None and not ssl:
981            raise ValueError('server_hostname is only meaningful with ssl')
982
983        if server_hostname is None and ssl:
984            # Use host as default for server_hostname.  It is an error
985            # if host is empty or not set, e.g. when an
986            # already-connected socket was passed or when only a port
987            # is given.  To avoid this error, you can pass
988            # server_hostname='' -- this will bypass the hostname
989            # check.  (This also means that if host is a numeric
990            # IP/IPv6 address, we will attempt to verify that exact
991            # address; this will probably fail, but it is possible to
992            # create a certificate for a specific IP address, so we
993            # don't judge it here.)
994            if not host:
995                raise ValueError('You must set server_hostname '
996                                 'when using ssl without a host')
997            server_hostname = host
998
999        if ssl_handshake_timeout is not None and not ssl:
1000            raise ValueError(
1001                'ssl_handshake_timeout is only meaningful with ssl')
1002
1003        if happy_eyeballs_delay is not None and interleave is None:
1004            # If using happy eyeballs, default to interleave addresses by family
1005            interleave = 1
1006
1007        if host is not None or port is not None:
1008            if sock is not None:
1009                raise ValueError(
1010                    'host/port and sock can not be specified at the same time')
1011
1012            infos = await self._ensure_resolved(
1013                (host, port), family=family,
1014                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1015            if not infos:
1016                raise OSError('getaddrinfo() returned empty list')
1017
1018            if local_addr is not None:
1019                laddr_infos = await self._ensure_resolved(
1020                    local_addr, family=family,
1021                    type=socket.SOCK_STREAM, proto=proto,
1022                    flags=flags, loop=self)
1023                if not laddr_infos:
1024                    raise OSError('getaddrinfo() returned empty list')
1025            else:
1026                laddr_infos = None
1027
1028            if interleave:
1029                infos = _interleave_addrinfos(infos, interleave)
1030
1031            exceptions = []
1032            if happy_eyeballs_delay is None:
1033                # not using happy eyeballs
1034                for addrinfo in infos:
1035                    try:
1036                        sock = await self._connect_sock(
1037                            exceptions, addrinfo, laddr_infos)
1038                        break
1039                    except OSError:
1040                        continue
1041            else:  # using happy eyeballs
1042                sock, _, _ = await staggered.staggered_race(
1043                    (functools.partial(self._connect_sock,
1044                                       exceptions, addrinfo, laddr_infos)
1045                     for addrinfo in infos),
1046                    happy_eyeballs_delay, loop=self)
1047
1048            if sock is None:
1049                exceptions = [exc for sub in exceptions for exc in sub]
1050                if len(exceptions) == 1:
1051                    raise exceptions[0]
1052                else:
1053                    # If they all have the same str(), raise one.
1054                    model = str(exceptions[0])
1055                    if all(str(exc) == model for exc in exceptions):
1056                        raise exceptions[0]
1057                    # Raise a combined exception so the user can see all
1058                    # the various error messages.
1059                    raise OSError('Multiple exceptions: {}'.format(
1060                        ', '.join(str(exc) for exc in exceptions)))
1061
1062        else:
1063            if sock is None:
1064                raise ValueError(
1065                    'host and port was not specified and no sock specified')
1066            if sock.type != socket.SOCK_STREAM:
1067                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1068                # are SOCK_STREAM.
1069                # We support passing AF_UNIX sockets even though we have
1070                # a dedicated API for that: create_unix_connection.
1071                # Disallowing AF_UNIX in this method, breaks backwards
1072                # compatibility.
1073                raise ValueError(
1074                    f'A Stream Socket was expected, got {sock!r}')
1075
1076        transport, protocol = await self._create_connection_transport(
1077            sock, protocol_factory, ssl, server_hostname,
1078            ssl_handshake_timeout=ssl_handshake_timeout)
1079        if self._debug:
1080            # Get the socket from the transport because SSL transport closes
1081            # the old socket and creates a new SSL socket
1082            sock = transport.get_extra_info('socket')
1083            logger.debug("%r connected to %s:%r: (%r, %r)",
1084                         sock, host, port, transport, protocol)
1085        return transport, protocol
1086
1087    async def _create_connection_transport(
1088            self, sock, protocol_factory, ssl,
1089            server_hostname, server_side=False,
1090            ssl_handshake_timeout=None):
1091
1092        sock.setblocking(False)
1093
1094        protocol = protocol_factory()
1095        waiter = self.create_future()
1096        if ssl:
1097            sslcontext = None if isinstance(ssl, bool) else ssl
1098            transport = self._make_ssl_transport(
1099                sock, protocol, sslcontext, waiter,
1100                server_side=server_side, server_hostname=server_hostname,
1101                ssl_handshake_timeout=ssl_handshake_timeout)
1102        else:
1103            transport = self._make_socket_transport(sock, protocol, waiter)
1104
1105        try:
1106            await waiter
1107        except:
1108            transport.close()
1109            raise
1110
1111        return transport, protocol
1112
1113    async def sendfile(self, transport, file, offset=0, count=None,
1114                       *, fallback=True):
1115        """Send a file to transport.
1116
1117        Return the total number of bytes which were sent.
1118
1119        The method uses high-performance os.sendfile if available.
1120
1121        file must be a regular file object opened in binary mode.
1122
1123        offset tells from where to start reading the file. If specified,
1124        count is the total number of bytes to transmit as opposed to
1125        sending the file until EOF is reached. File position is updated on
1126        return or also in case of error in which case file.tell()
1127        can be used to figure out the number of bytes
1128        which were sent.
1129
1130        fallback set to True makes asyncio to manually read and send
1131        the file when the platform does not support the sendfile syscall
1132        (e.g. Windows or SSL socket on Unix).
1133
1134        Raise SendfileNotAvailableError if the system does not support
1135        sendfile syscall and fallback is False.
1136        """
1137        if transport.is_closing():
1138            raise RuntimeError("Transport is closing")
1139        mode = getattr(transport, '_sendfile_compatible',
1140                       constants._SendfileMode.UNSUPPORTED)
1141        if mode is constants._SendfileMode.UNSUPPORTED:
1142            raise RuntimeError(
1143                f"sendfile is not supported for transport {transport!r}")
1144        if mode is constants._SendfileMode.TRY_NATIVE:
1145            try:
1146                return await self._sendfile_native(transport, file,
1147                                                   offset, count)
1148            except exceptions.SendfileNotAvailableError as exc:
1149                if not fallback:
1150                    raise
1151
1152        if not fallback:
1153            raise RuntimeError(
1154                f"fallback is disabled and native sendfile is not "
1155                f"supported for transport {transport!r}")
1156
1157        return await self._sendfile_fallback(transport, file,
1158                                             offset, count)
1159
1160    async def _sendfile_native(self, transp, file, offset, count):
1161        raise exceptions.SendfileNotAvailableError(
1162            "sendfile syscall is not supported")
1163
1164    async def _sendfile_fallback(self, transp, file, offset, count):
1165        if offset:
1166            file.seek(offset)
1167        blocksize = min(count, 16384) if count else 16384
1168        buf = bytearray(blocksize)
1169        total_sent = 0
1170        proto = _SendfileFallbackProtocol(transp)
1171        try:
1172            while True:
1173                if count:
1174                    blocksize = min(count - total_sent, blocksize)
1175                    if blocksize <= 0:
1176                        return total_sent
1177                view = memoryview(buf)[:blocksize]
1178                read = await self.run_in_executor(None, file.readinto, view)
1179                if not read:
1180                    return total_sent  # EOF
1181                await proto.drain()
1182                transp.write(view[:read])
1183                total_sent += read
1184        finally:
1185            if total_sent > 0 and hasattr(file, 'seek'):
1186                file.seek(offset + total_sent)
1187            await proto.restore()
1188
1189    async def start_tls(self, transport, protocol, sslcontext, *,
1190                        server_side=False,
1191                        server_hostname=None,
1192                        ssl_handshake_timeout=None):
1193        """Upgrade transport to TLS.
1194
1195        Return a new transport that *protocol* should start using
1196        immediately.
1197        """
1198        if ssl is None:
1199            raise RuntimeError('Python ssl module is not available')
1200
1201        if not isinstance(sslcontext, ssl.SSLContext):
1202            raise TypeError(
1203                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1204                f'got {sslcontext!r}')
1205
1206        if not getattr(transport, '_start_tls_compatible', False):
1207            raise TypeError(
1208                f'transport {transport!r} is not supported by start_tls()')
1209
1210        waiter = self.create_future()
1211        ssl_protocol = sslproto.SSLProtocol(
1212            self, protocol, sslcontext, waiter,
1213            server_side, server_hostname,
1214            ssl_handshake_timeout=ssl_handshake_timeout,
1215            call_connection_made=False)
1216
1217        # Pause early so that "ssl_protocol.data_received()" doesn't
1218        # have a chance to get called before "ssl_protocol.connection_made()".
1219        transport.pause_reading()
1220
1221        transport.set_protocol(ssl_protocol)
1222        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1223        resume_cb = self.call_soon(transport.resume_reading)
1224
1225        try:
1226            await waiter
1227        except BaseException:
1228            transport.close()
1229            conmade_cb.cancel()
1230            resume_cb.cancel()
1231            raise
1232
1233        return ssl_protocol._app_transport
1234
1235    async def create_datagram_endpoint(self, protocol_factory,
1236                                       local_addr=None, remote_addr=None, *,
1237                                       family=0, proto=0, flags=0,
1238                                       reuse_port=None,
1239                                       allow_broadcast=None, sock=None):
1240        """Create datagram connection."""
1241        if sock is not None:
1242            if sock.type != socket.SOCK_DGRAM:
1243                raise ValueError(
1244                    f'A UDP Socket was expected, got {sock!r}')
1245            if (local_addr or remote_addr or
1246                    family or proto or flags or
1247                    reuse_port or allow_broadcast):
1248                # show the problematic kwargs in exception msg
1249                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1250                            family=family, proto=proto, flags=flags,
1251                            reuse_port=reuse_port,
1252                            allow_broadcast=allow_broadcast)
1253                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1254                raise ValueError(
1255                    f'socket modifier keyword arguments can not be used '
1256                    f'when sock is specified. ({problems})')
1257            sock.setblocking(False)
1258            r_addr = None
1259        else:
1260            if not (local_addr or remote_addr):
1261                if family == 0:
1262                    raise ValueError('unexpected address family')
1263                addr_pairs_info = (((family, proto), (None, None)),)
1264            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1265                for addr in (local_addr, remote_addr):
1266                    if addr is not None and not isinstance(addr, str):
1267                        raise TypeError('string is expected')
1268
1269                if local_addr and local_addr[0] not in (0, '\x00'):
1270                    try:
1271                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1272                            os.remove(local_addr)
1273                    except FileNotFoundError:
1274                        pass
1275                    except OSError as err:
1276                        # Directory may have permissions only to create socket.
1277                        logger.error('Unable to check or remove stale UNIX '
1278                                     'socket %r: %r',
1279                                     local_addr, err)
1280
1281                addr_pairs_info = (((family, proto),
1282                                    (local_addr, remote_addr)), )
1283            else:
1284                # join address by (family, protocol)
1285                addr_infos = {}  # Using order preserving dict
1286                for idx, addr in ((0, local_addr), (1, remote_addr)):
1287                    if addr is not None:
1288                        assert isinstance(addr, tuple) and len(addr) == 2, (
1289                            '2-tuple is expected')
1290
1291                        infos = await self._ensure_resolved(
1292                            addr, family=family, type=socket.SOCK_DGRAM,
1293                            proto=proto, flags=flags, loop=self)
1294                        if not infos:
1295                            raise OSError('getaddrinfo() returned empty list')
1296
1297                        for fam, _, pro, _, address in infos:
1298                            key = (fam, pro)
1299                            if key not in addr_infos:
1300                                addr_infos[key] = [None, None]
1301                            addr_infos[key][idx] = address
1302
1303                # each addr has to have info for each (family, proto) pair
1304                addr_pairs_info = [
1305                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1306                    if not ((local_addr and addr_pair[0] is None) or
1307                            (remote_addr and addr_pair[1] is None))]
1308
1309                if not addr_pairs_info:
1310                    raise ValueError('can not get address information')
1311
1312            exceptions = []
1313
1314            for ((family, proto),
1315                 (local_address, remote_address)) in addr_pairs_info:
1316                sock = None
1317                r_addr = None
1318                try:
1319                    sock = socket.socket(
1320                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1321                    if reuse_port:
1322                        _set_reuseport(sock)
1323                    if allow_broadcast:
1324                        sock.setsockopt(
1325                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1326                    sock.setblocking(False)
1327
1328                    if local_addr:
1329                        sock.bind(local_address)
1330                    if remote_addr:
1331                        if not allow_broadcast:
1332                            await self.sock_connect(sock, remote_address)
1333                        r_addr = remote_address
1334                except OSError as exc:
1335                    if sock is not None:
1336                        sock.close()
1337                    exceptions.append(exc)
1338                except:
1339                    if sock is not None:
1340                        sock.close()
1341                    raise
1342                else:
1343                    break
1344            else:
1345                raise exceptions[0]
1346
1347        protocol = protocol_factory()
1348        waiter = self.create_future()
1349        transport = self._make_datagram_transport(
1350            sock, protocol, r_addr, waiter)
1351        if self._debug:
1352            if local_addr:
1353                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1354                            "created: (%r, %r)",
1355                            local_addr, remote_addr, transport, protocol)
1356            else:
1357                logger.debug("Datagram endpoint remote_addr=%r created: "
1358                             "(%r, %r)",
1359                             remote_addr, transport, protocol)
1360
1361        try:
1362            await waiter
1363        except:
1364            transport.close()
1365            raise
1366
1367        return transport, protocol
1368
1369    async def _ensure_resolved(self, address, *,
1370                               family=0, type=socket.SOCK_STREAM,
1371                               proto=0, flags=0, loop):
1372        host, port = address[:2]
1373        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1374        if info is not None:
1375            # "host" is already a resolved IP.
1376            return [info]
1377        else:
1378            return await loop.getaddrinfo(host, port, family=family, type=type,
1379                                          proto=proto, flags=flags)
1380
1381    async def _create_server_getaddrinfo(self, host, port, family, flags):
1382        infos = await self._ensure_resolved((host, port), family=family,
1383                                            type=socket.SOCK_STREAM,
1384                                            flags=flags, loop=self)
1385        if not infos:
1386            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1387        return infos
1388
1389    async def create_server(
1390            self, protocol_factory, host=None, port=None,
1391            *,
1392            family=socket.AF_UNSPEC,
1393            flags=socket.AI_PASSIVE,
1394            sock=None,
1395            backlog=100,
1396            ssl=None,
1397            reuse_port=None,
1398            ssl_handshake_timeout=None,
1399            start_serving=True):
1400        """Create a TCP server.
1401
1402        The host parameter can be a string, in that case the TCP server is
1403        bound to host and port.
1404
1405        The host parameter can also be a sequence of strings and in that case
1406        the TCP server is bound to all hosts of the sequence. If a host
1407        appears multiple times (possibly indirectly e.g. when hostnames
1408        resolve to the same IP address), the server is only bound once to that
1409        host.
1410
1411        Return a Server object which can be used to stop the service.
1412
1413        This method is a coroutine.
1414        """
1415        if isinstance(ssl, bool):
1416            raise TypeError('ssl argument must be an SSLContext or None')
1417
1418        if ssl_handshake_timeout is not None and ssl is None:
1419            raise ValueError(
1420                'ssl_handshake_timeout is only meaningful with ssl')
1421
1422        if host is not None or port is not None:
1423            if sock is not None:
1424                raise ValueError(
1425                    'host/port and sock can not be specified at the same time')
1426
1427            sockets = []
1428            if host == '':
1429                hosts = [None]
1430            elif (isinstance(host, str) or
1431                  not isinstance(host, collections.abc.Iterable)):
1432                hosts = [host]
1433            else:
1434                hosts = host
1435
1436            fs = [self._create_server_getaddrinfo(host, port, family=family,
1437                                                  flags=flags)
1438                  for host in hosts]
1439            infos = await tasks.gather(*fs)
1440            infos = set(itertools.chain.from_iterable(infos))
1441
1442            completed = False
1443            try:
1444                for res in infos:
1445                    af, socktype, proto, canonname, sa = res
1446                    try:
1447                        sock = socket.socket(af, socktype, proto)
1448                    except socket.error:
1449                        # Assume it's a bad family/type/protocol combination.
1450                        if self._debug:
1451                            logger.warning('create_server() failed to create '
1452                                           'socket.socket(%r, %r, %r)',
1453                                           af, socktype, proto, exc_info=True)
1454                        continue
1455                    sockets.append(sock)
1456                    if reuse_port:
1457                        _set_reuseport(sock)
1458                    # Disable IPv4/IPv6 dual stack support (enabled by
1459                    # default on Linux) which makes a single socket
1460                    # listen on both address families.
1461                    if (_HAS_IPv6 and
1462                            af == socket.AF_INET6 and
1463                            hasattr(socket, 'IPPROTO_IPV6')):
1464                        sock.setsockopt(socket.IPPROTO_IPV6,
1465                                        socket.IPV6_V6ONLY,
1466                                        True)
1467                    try:
1468                        sock.bind(sa)
1469                    except OSError as err:
1470                        raise OSError(err.errno, 'error while attempting '
1471                                      'to bind on address %r: %s'
1472                                      % (sa, err.strerror.lower())) from None
1473                completed = True
1474            finally:
1475                if not completed:
1476                    for sock in sockets:
1477                        sock.close()
1478        else:
1479            if sock is None:
1480                raise ValueError('Neither host/port nor sock were specified')
1481            if sock.type != socket.SOCK_STREAM:
1482                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1483            sockets = [sock]
1484
1485        for sock in sockets:
1486            sock.setblocking(False)
1487
1488        server = Server(self, sockets, protocol_factory,
1489                        ssl, backlog, ssl_handshake_timeout)
1490        if start_serving:
1491            server._start_serving()
1492            # Skip one loop iteration so that all 'loop.add_reader'
1493            # go through.
1494            await tasks.sleep(0)
1495
1496        if self._debug:
1497            logger.info("%r is serving", server)
1498        return server
1499
1500    async def connect_accepted_socket(
1501            self, protocol_factory, sock,
1502            *, ssl=None,
1503            ssl_handshake_timeout=None):
1504        if sock.type != socket.SOCK_STREAM:
1505            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1506
1507        if ssl_handshake_timeout is not None and not ssl:
1508            raise ValueError(
1509                'ssl_handshake_timeout is only meaningful with ssl')
1510
1511        transport, protocol = await self._create_connection_transport(
1512            sock, protocol_factory, ssl, '', server_side=True,
1513            ssl_handshake_timeout=ssl_handshake_timeout)
1514        if self._debug:
1515            # Get the socket from the transport because SSL transport closes
1516            # the old socket and creates a new SSL socket
1517            sock = transport.get_extra_info('socket')
1518            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1519        return transport, protocol
1520
1521    async def connect_read_pipe(self, protocol_factory, pipe):
1522        protocol = protocol_factory()
1523        waiter = self.create_future()
1524        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1525
1526        try:
1527            await waiter
1528        except:
1529            transport.close()
1530            raise
1531
1532        if self._debug:
1533            logger.debug('Read pipe %r connected: (%r, %r)',
1534                         pipe.fileno(), transport, protocol)
1535        return transport, protocol
1536
1537    async def connect_write_pipe(self, protocol_factory, pipe):
1538        protocol = protocol_factory()
1539        waiter = self.create_future()
1540        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1541
1542        try:
1543            await waiter
1544        except:
1545            transport.close()
1546            raise
1547
1548        if self._debug:
1549            logger.debug('Write pipe %r connected: (%r, %r)',
1550                         pipe.fileno(), transport, protocol)
1551        return transport, protocol
1552
1553    def _log_subprocess(self, msg, stdin, stdout, stderr):
1554        info = [msg]
1555        if stdin is not None:
1556            info.append(f'stdin={_format_pipe(stdin)}')
1557        if stdout is not None and stderr == subprocess.STDOUT:
1558            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1559        else:
1560            if stdout is not None:
1561                info.append(f'stdout={_format_pipe(stdout)}')
1562            if stderr is not None:
1563                info.append(f'stderr={_format_pipe(stderr)}')
1564        logger.debug(' '.join(info))
1565
1566    async def subprocess_shell(self, protocol_factory, cmd, *,
1567                               stdin=subprocess.PIPE,
1568                               stdout=subprocess.PIPE,
1569                               stderr=subprocess.PIPE,
1570                               universal_newlines=False,
1571                               shell=True, bufsize=0,
1572                               encoding=None, errors=None, text=None,
1573                               **kwargs):
1574        if not isinstance(cmd, (bytes, str)):
1575            raise ValueError("cmd must be a string")
1576        if universal_newlines:
1577            raise ValueError("universal_newlines must be False")
1578        if not shell:
1579            raise ValueError("shell must be True")
1580        if bufsize != 0:
1581            raise ValueError("bufsize must be 0")
1582        if text:
1583            raise ValueError("text must be False")
1584        if encoding is not None:
1585            raise ValueError("encoding must be None")
1586        if errors is not None:
1587            raise ValueError("errors must be None")
1588
1589        protocol = protocol_factory()
1590        debug_log = None
1591        if self._debug:
1592            # don't log parameters: they may contain sensitive information
1593            # (password) and may be too long
1594            debug_log = 'run shell command %r' % cmd
1595            self._log_subprocess(debug_log, stdin, stdout, stderr)
1596        transport = await self._make_subprocess_transport(
1597            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1598        if self._debug and debug_log is not None:
1599            logger.info('%s: %r', debug_log, transport)
1600        return transport, protocol
1601
1602    async def subprocess_exec(self, protocol_factory, program, *args,
1603                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1604                              stderr=subprocess.PIPE, universal_newlines=False,
1605                              shell=False, bufsize=0,
1606                              encoding=None, errors=None, text=None,
1607                              **kwargs):
1608        if universal_newlines:
1609            raise ValueError("universal_newlines must be False")
1610        if shell:
1611            raise ValueError("shell must be False")
1612        if bufsize != 0:
1613            raise ValueError("bufsize must be 0")
1614        if text:
1615            raise ValueError("text must be False")
1616        if encoding is not None:
1617            raise ValueError("encoding must be None")
1618        if errors is not None:
1619            raise ValueError("errors must be None")
1620
1621        popen_args = (program,) + args
1622        protocol = protocol_factory()
1623        debug_log = None
1624        if self._debug:
1625            # don't log parameters: they may contain sensitive information
1626            # (password) and may be too long
1627            debug_log = f'execute program {program!r}'
1628            self._log_subprocess(debug_log, stdin, stdout, stderr)
1629        transport = await self._make_subprocess_transport(
1630            protocol, popen_args, False, stdin, stdout, stderr,
1631            bufsize, **kwargs)
1632        if self._debug and debug_log is not None:
1633            logger.info('%s: %r', debug_log, transport)
1634        return transport, protocol
1635
1636    def get_exception_handler(self):
1637        """Return an exception handler, or None if the default one is in use.
1638        """
1639        return self._exception_handler
1640
1641    def set_exception_handler(self, handler):
1642        """Set handler as the new event loop exception handler.
1643
1644        If handler is None, the default exception handler will
1645        be set.
1646
1647        If handler is a callable object, it should have a
1648        signature matching '(loop, context)', where 'loop'
1649        will be a reference to the active event loop, 'context'
1650        will be a dict object (see `call_exception_handler()`
1651        documentation for details about context).
1652        """
1653        if handler is not None and not callable(handler):
1654            raise TypeError(f'A callable object or None is expected, '
1655                            f'got {handler!r}')
1656        self._exception_handler = handler
1657
1658    def default_exception_handler(self, context):
1659        """Default exception handler.
1660
1661        This is called when an exception occurs and no exception
1662        handler is set, and can be called by a custom exception
1663        handler that wants to defer to the default behavior.
1664
1665        This default handler logs the error message and other
1666        context-dependent information.  In debug mode, a truncated
1667        stack trace is also appended showing where the given object
1668        (e.g. a handle or future or task) was created, if any.
1669
1670        The context parameter has the same meaning as in
1671        `call_exception_handler()`.
1672        """
1673        message = context.get('message')
1674        if not message:
1675            message = 'Unhandled exception in event loop'
1676
1677        exception = context.get('exception')
1678        if exception is not None:
1679            exc_info = (type(exception), exception, exception.__traceback__)
1680        else:
1681            exc_info = False
1682
1683        if ('source_traceback' not in context and
1684                self._current_handle is not None and
1685                self._current_handle._source_traceback):
1686            context['handle_traceback'] = \
1687                self._current_handle._source_traceback
1688
1689        log_lines = [message]
1690        for key in sorted(context):
1691            if key in {'message', 'exception'}:
1692                continue
1693            value = context[key]
1694            if key == 'source_traceback':
1695                tb = ''.join(traceback.format_list(value))
1696                value = 'Object created at (most recent call last):\n'
1697                value += tb.rstrip()
1698            elif key == 'handle_traceback':
1699                tb = ''.join(traceback.format_list(value))
1700                value = 'Handle created at (most recent call last):\n'
1701                value += tb.rstrip()
1702            else:
1703                value = repr(value)
1704            log_lines.append(f'{key}: {value}')
1705
1706        logger.error('\n'.join(log_lines), exc_info=exc_info)
1707
1708    def call_exception_handler(self, context):
1709        """Call the current event loop's exception handler.
1710
1711        The context argument is a dict containing the following keys:
1712
1713        - 'message': Error message;
1714        - 'exception' (optional): Exception object;
1715        - 'future' (optional): Future instance;
1716        - 'task' (optional): Task instance;
1717        - 'handle' (optional): Handle instance;
1718        - 'protocol' (optional): Protocol instance;
1719        - 'transport' (optional): Transport instance;
1720        - 'socket' (optional): Socket instance;
1721        - 'asyncgen' (optional): Asynchronous generator that caused
1722                                 the exception.
1723
1724        New keys maybe introduced in the future.
1725
1726        Note: do not overload this method in an event loop subclass.
1727        For custom exception handling, use the
1728        `set_exception_handler()` method.
1729        """
1730        if self._exception_handler is None:
1731            try:
1732                self.default_exception_handler(context)
1733            except (SystemExit, KeyboardInterrupt):
1734                raise
1735            except BaseException:
1736                # Second protection layer for unexpected errors
1737                # in the default implementation, as well as for subclassed
1738                # event loops with overloaded "default_exception_handler".
1739                logger.error('Exception in default exception handler',
1740                             exc_info=True)
1741        else:
1742            try:
1743                self._exception_handler(self, context)
1744            except (SystemExit, KeyboardInterrupt):
1745                raise
1746            except BaseException as exc:
1747                # Exception in the user set custom exception handler.
1748                try:
1749                    # Let's try default handler.
1750                    self.default_exception_handler({
1751                        'message': 'Unhandled error in exception handler',
1752                        'exception': exc,
1753                        'context': context,
1754                    })
1755                except (SystemExit, KeyboardInterrupt):
1756                    raise
1757                except BaseException:
1758                    # Guard 'default_exception_handler' in case it is
1759                    # overloaded.
1760                    logger.error('Exception in default exception handler '
1761                                 'while handling an unexpected error '
1762                                 'in custom exception handler',
1763                                 exc_info=True)
1764
1765    def _add_callback(self, handle):
1766        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1767        assert isinstance(handle, events.Handle), 'A Handle is required here'
1768        if handle._cancelled:
1769            return
1770        assert not isinstance(handle, events.TimerHandle)
1771        self._ready.append(handle)
1772
1773    def _add_callback_signalsafe(self, handle):
1774        """Like _add_callback() but called from a signal handler."""
1775        self._add_callback(handle)
1776        self._write_to_self()
1777
1778    def _timer_handle_cancelled(self, handle):
1779        """Notification that a TimerHandle has been cancelled."""
1780        if handle._scheduled:
1781            self._timer_cancelled_count += 1
1782
1783    def _run_once(self):
1784        """Run one full iteration of the event loop.
1785
1786        This calls all currently ready callbacks, polls for I/O,
1787        schedules the resulting callbacks, and finally schedules
1788        'call_later' callbacks.
1789        """
1790
1791        sched_count = len(self._scheduled)
1792        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1793            self._timer_cancelled_count / sched_count >
1794                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1795            # Remove delayed calls that were cancelled if their number
1796            # is too high
1797            new_scheduled = []
1798            for handle in self._scheduled:
1799                if handle._cancelled:
1800                    handle._scheduled = False
1801                else:
1802                    new_scheduled.append(handle)
1803
1804            heapq.heapify(new_scheduled)
1805            self._scheduled = new_scheduled
1806            self._timer_cancelled_count = 0
1807        else:
1808            # Remove delayed calls that were cancelled from head of queue.
1809            while self._scheduled and self._scheduled[0]._cancelled:
1810                self._timer_cancelled_count -= 1
1811                handle = heapq.heappop(self._scheduled)
1812                handle._scheduled = False
1813
1814        timeout = None
1815        if self._ready or self._stopping:
1816            timeout = 0
1817        elif self._scheduled:
1818            # Compute the desired timeout.
1819            when = self._scheduled[0]._when
1820            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1821
1822        event_list = self._selector.select(timeout)
1823        self._process_events(event_list)
1824
1825        # Handle 'later' callbacks that are ready.
1826        end_time = self.time() + self._clock_resolution
1827        while self._scheduled:
1828            handle = self._scheduled[0]
1829            if handle._when >= end_time:
1830                break
1831            handle = heapq.heappop(self._scheduled)
1832            handle._scheduled = False
1833            self._ready.append(handle)
1834
1835        # This is the only place where callbacks are actually *called*.
1836        # All other places just add them to ready.
1837        # Note: We run all currently scheduled callbacks, but not any
1838        # callbacks scheduled by callbacks run this time around --
1839        # they will be run the next time (after another I/O poll).
1840        # Use an idiom that is thread-safe without using locks.
1841        ntodo = len(self._ready)
1842        for i in range(ntodo):
1843            handle = self._ready.popleft()
1844            if handle._cancelled:
1845                continue
1846            if self._debug:
1847                try:
1848                    self._current_handle = handle
1849                    t0 = self.time()
1850                    handle._run()
1851                    dt = self.time() - t0
1852                    if dt >= self.slow_callback_duration:
1853                        logger.warning('Executing %s took %.3f seconds',
1854                                       _format_handle(handle), dt)
1855                finally:
1856                    self._current_handle = None
1857            else:
1858                handle._run()
1859        handle = None  # Needed to break cycles when an exception occurs.
1860
1861    def _set_coroutine_origin_tracking(self, enabled):
1862        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1863            return
1864
1865        if enabled:
1866            self._coroutine_origin_tracking_saved_depth = (
1867                sys.get_coroutine_origin_tracking_depth())
1868            sys.set_coroutine_origin_tracking_depth(
1869                constants.DEBUG_STACK_DEPTH)
1870        else:
1871            sys.set_coroutine_origin_tracking_depth(
1872                self._coroutine_origin_tracking_saved_depth)
1873
1874        self._coroutine_origin_tracking_enabled = enabled
1875
1876    def get_debug(self):
1877        return self._debug
1878
1879    def set_debug(self, enabled):
1880        self._debug = enabled
1881
1882        if self.is_running():
1883            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1884