1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
73
74def _format_handle(handle):
75    cb = handle._callback
76    if isinstance(getattr(cb, '__self__', None), tasks.Task):
77        # format the task
78        return repr(cb.__self__)
79    else:
80        return str(handle)
81
82
83def _format_pipe(fd):
84    if fd == subprocess.PIPE:
85        return '<pipe>'
86    elif fd == subprocess.STDOUT:
87        return '<stdout>'
88    else:
89        return repr(fd)
90
91
92def _set_reuseport(sock):
93    if not hasattr(socket, 'SO_REUSEPORT'):
94        raise ValueError('reuse_port not supported by socket module')
95    else:
96        try:
97            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98        except OSError:
99            raise ValueError('reuse_port not supported by socket module, '
100                             'SO_REUSEPORT defined but not implemented.')
101
102
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104    # Try to skip getaddrinfo if "host" is already an IP. Users might have
105    # handled name resolution in their own code and pass in resolved IPs.
106    if not hasattr(socket, 'inet_pton'):
107        return
108
109    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110            host is None:
111        return None
112
113    if type == socket.SOCK_STREAM:
114        proto = socket.IPPROTO_TCP
115    elif type == socket.SOCK_DGRAM:
116        proto = socket.IPPROTO_UDP
117    else:
118        return None
119
120    if port is None:
121        port = 0
122    elif isinstance(port, bytes) and port == b'':
123        port = 0
124    elif isinstance(port, str) and port == '':
125        port = 0
126    else:
127        # If port's a service name like "http", don't skip getaddrinfo.
128        try:
129            port = int(port)
130        except (TypeError, ValueError):
131            return None
132
133    if family == socket.AF_UNSPEC:
134        afs = [socket.AF_INET]
135        if _HAS_IPv6:
136            afs.append(socket.AF_INET6)
137    else:
138        afs = [family]
139
140    if isinstance(host, bytes):
141        host = host.decode('idna')
142    if '%' in host:
143        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144        # like '::1%lo0'.
145        return None
146
147    for af in afs:
148        try:
149            socket.inet_pton(af, host)
150            # The host has already been resolved.
151            if _HAS_IPv6 and af == socket.AF_INET6:
152                return af, type, proto, '', (host, port, flowinfo, scopeid)
153            else:
154                return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163    """Interleave list of addrinfo tuples by family."""
164    # Group addresses by family
165    addrinfos_by_family = collections.OrderedDict()
166    for addr in addrinfos:
167        family = addr[0]
168        if family not in addrinfos_by_family:
169            addrinfos_by_family[family] = []
170        addrinfos_by_family[family].append(addr)
171    addrinfos_lists = list(addrinfos_by_family.values())
172
173    reordered = []
174    if first_address_family_count > 1:
175        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176        del addrinfos_lists[0][:first_address_family_count - 1]
177    reordered.extend(
178        a for a in itertools.chain.from_iterable(
179            itertools.zip_longest(*addrinfos_lists)
180        ) if a is not None)
181    return reordered
182
183
184def _run_until_complete_cb(fut):
185    if not fut.cancelled():
186        exc = fut.exception()
187        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188            # Issue #22429: run_forever() already finished, no need to
189            # stop it.
190            return
191    futures._get_loop(fut).stop()
192
193
194if hasattr(socket, 'TCP_NODELAY'):
195    def _set_nodelay(sock):
196        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197                sock.type == socket.SOCK_STREAM and
198                sock.proto == socket.IPPROTO_TCP):
199            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201    def _set_nodelay(sock):
202        pass
203
204
205class _SendfileFallbackProtocol(protocols.Protocol):
206    def __init__(self, transp):
207        if not isinstance(transp, transports._FlowControlMixin):
208            raise TypeError("transport should be _FlowControlMixin instance")
209        self._transport = transp
210        self._proto = transp.get_protocol()
211        self._should_resume_reading = transp.is_reading()
212        self._should_resume_writing = transp._protocol_paused
213        transp.pause_reading()
214        transp.set_protocol(self)
215        if self._should_resume_writing:
216            self._write_ready_fut = self._transport._loop.create_future()
217        else:
218            self._write_ready_fut = None
219
220    async def drain(self):
221        if self._transport.is_closing():
222            raise ConnectionError("Connection closed by peer")
223        fut = self._write_ready_fut
224        if fut is None:
225            return
226        await fut
227
228    def connection_made(self, transport):
229        raise RuntimeError("Invalid state: "
230                           "connection should have been established already.")
231
232    def connection_lost(self, exc):
233        if self._write_ready_fut is not None:
234            # Never happens if peer disconnects after sending the whole content
235            # Thus disconnection is always an exception from user perspective
236            if exc is None:
237                self._write_ready_fut.set_exception(
238                    ConnectionError("Connection is closed by peer"))
239            else:
240                self._write_ready_fut.set_exception(exc)
241        self._proto.connection_lost(exc)
242
243    def pause_writing(self):
244        if self._write_ready_fut is not None:
245            return
246        self._write_ready_fut = self._transport._loop.create_future()
247
248    def resume_writing(self):
249        if self._write_ready_fut is None:
250            return
251        self._write_ready_fut.set_result(False)
252        self._write_ready_fut = None
253
254    def data_received(self, data):
255        raise RuntimeError("Invalid state: reading should be paused")
256
257    def eof_received(self):
258        raise RuntimeError("Invalid state: reading should be paused")
259
260    async def restore(self):
261        self._transport.set_protocol(self._proto)
262        if self._should_resume_reading:
263            self._transport.resume_reading()
264        if self._write_ready_fut is not None:
265            # Cancel the future.
266            # Basically it has no effect because protocol is switched back,
267            # no code should wait for it anymore.
268            self._write_ready_fut.cancel()
269        if self._should_resume_writing:
270            self._proto.resume_writing()
271
272
273class Server(events.AbstractServer):
274
275    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276                 ssl_handshake_timeout):
277        self._loop = loop
278        self._sockets = sockets
279        self._active_count = 0
280        self._waiters = []
281        self._protocol_factory = protocol_factory
282        self._backlog = backlog
283        self._ssl_context = ssl_context
284        self._ssl_handshake_timeout = ssl_handshake_timeout
285        self._serving = False
286        self._serving_forever_fut = None
287
288    def __repr__(self):
289        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
290
291    def _attach(self):
292        assert self._sockets is not None
293        self._active_count += 1
294
295    def _detach(self):
296        assert self._active_count > 0
297        self._active_count -= 1
298        if self._active_count == 0 and self._sockets is None:
299            self._wakeup()
300
301    def _wakeup(self):
302        waiters = self._waiters
303        self._waiters = None
304        for waiter in waiters:
305            if not waiter.done():
306                waiter.set_result(waiter)
307
308    def _start_serving(self):
309        if self._serving:
310            return
311        self._serving = True
312        for sock in self._sockets:
313            sock.listen(self._backlog)
314            self._loop._start_serving(
315                self._protocol_factory, sock, self._ssl_context,
316                self, self._backlog, self._ssl_handshake_timeout)
317
318    def get_loop(self):
319        return self._loop
320
321    def is_serving(self):
322        return self._serving
323
324    @property
325    def sockets(self):
326        if self._sockets is None:
327            return ()
328        return tuple(trsock.TransportSocket(s) for s in self._sockets)
329
330    def close(self):
331        sockets = self._sockets
332        if sockets is None:
333            return
334        self._sockets = None
335
336        for sock in sockets:
337            self._loop._stop_serving(sock)
338
339        self._serving = False
340
341        if (self._serving_forever_fut is not None and
342                not self._serving_forever_fut.done()):
343            self._serving_forever_fut.cancel()
344            self._serving_forever_fut = None
345
346        if self._active_count == 0:
347            self._wakeup()
348
349    async def start_serving(self):
350        self._start_serving()
351        # Skip one loop iteration so that all 'loop.add_reader'
352        # go through.
353        await tasks.sleep(0, loop=self._loop)
354
355    async def serve_forever(self):
356        if self._serving_forever_fut is not None:
357            raise RuntimeError(
358                f'server {self!r} is already being awaited on serve_forever()')
359        if self._sockets is None:
360            raise RuntimeError(f'server {self!r} is closed')
361
362        self._start_serving()
363        self._serving_forever_fut = self._loop.create_future()
364
365        try:
366            await self._serving_forever_fut
367        except exceptions.CancelledError:
368            try:
369                self.close()
370                await self.wait_closed()
371            finally:
372                raise
373        finally:
374            self._serving_forever_fut = None
375
376    async def wait_closed(self):
377        if self._sockets is None or self._waiters is None:
378            return
379        waiter = self._loop.create_future()
380        self._waiters.append(waiter)
381        await waiter
382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386    def __init__(self):
387        self._timer_cancelled_count = 0
388        self._closed = False
389        self._stopping = False
390        self._ready = collections.deque()
391        self._scheduled = []
392        self._default_executor = None
393        self._internal_fds = 0
394        # Identifier of the thread running the event loop, or None if the
395        # event loop is not running
396        self._thread_id = None
397        self._clock_resolution = time.get_clock_info('monotonic').resolution
398        self._exception_handler = None
399        self.set_debug(coroutines._is_debug_mode())
400        # In debug mode, if the execution of a callback or a step of a task
401        # exceed this duration in seconds, the slow callback/task is logged.
402        self.slow_callback_duration = 0.1
403        self._current_handle = None
404        self._task_factory = None
405        self._coroutine_origin_tracking_enabled = False
406        self._coroutine_origin_tracking_saved_depth = None
407
408        # A weak set of all asynchronous generators that are
409        # being iterated by the loop.
410        self._asyncgens = weakref.WeakSet()
411        # Set to True when `loop.shutdown_asyncgens` is called.
412        self._asyncgens_shutdown_called = False
413
414    def __repr__(self):
415        return (
416            f'<{self.__class__.__name__} running={self.is_running()} '
417            f'closed={self.is_closed()} debug={self.get_debug()}>'
418        )
419
420    def create_future(self):
421        """Create a Future object attached to the loop."""
422        return futures.Future(loop=self)
423
424    def create_task(self, coro, *, name=None):
425        """Schedule a coroutine object.
426
427        Return a task object.
428        """
429        self._check_closed()
430        if self._task_factory is None:
431            task = tasks.Task(coro, loop=self, name=name)
432            if task._source_traceback:
433                del task._source_traceback[-1]
434        else:
435            task = self._task_factory(self, coro)
436            tasks._set_task_name(task, name)
437
438        return task
439
440    def set_task_factory(self, factory):
441        """Set a task factory that will be used by loop.create_task().
442
443        If factory is None the default task factory will be set.
444
445        If factory is a callable, it should have a signature matching
446        '(loop, coro)', where 'loop' will be a reference to the active
447        event loop, 'coro' will be a coroutine object.  The callable
448        must return a Future.
449        """
450        if factory is not None and not callable(factory):
451            raise TypeError('task factory must be a callable or None')
452        self._task_factory = factory
453
454    def get_task_factory(self):
455        """Return a task factory, or None if the default one is in use."""
456        return self._task_factory
457
458    def _make_socket_transport(self, sock, protocol, waiter=None, *,
459                               extra=None, server=None):
460        """Create socket transport."""
461        raise NotImplementedError
462
463    def _make_ssl_transport(
464            self, rawsock, protocol, sslcontext, waiter=None,
465            *, server_side=False, server_hostname=None,
466            extra=None, server=None,
467            ssl_handshake_timeout=None,
468            call_connection_made=True):
469        """Create SSL transport."""
470        raise NotImplementedError
471
472    def _make_datagram_transport(self, sock, protocol,
473                                 address=None, waiter=None, extra=None):
474        """Create datagram transport."""
475        raise NotImplementedError
476
477    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
478                                  extra=None):
479        """Create read pipe transport."""
480        raise NotImplementedError
481
482    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
483                                   extra=None):
484        """Create write pipe transport."""
485        raise NotImplementedError
486
487    async def _make_subprocess_transport(self, protocol, args, shell,
488                                         stdin, stdout, stderr, bufsize,
489                                         extra=None, **kwargs):
490        """Create subprocess transport."""
491        raise NotImplementedError
492
493    def _write_to_self(self):
494        """Write a byte to self-pipe, to wake up the event loop.
495
496        This may be called from a different thread.
497
498        The subclass is responsible for implementing the self-pipe.
499        """
500        raise NotImplementedError
501
502    def _process_events(self, event_list):
503        """Process selector events."""
504        raise NotImplementedError
505
506    def _check_closed(self):
507        if self._closed:
508            raise RuntimeError('Event loop is closed')
509
510    def _asyncgen_finalizer_hook(self, agen):
511        self._asyncgens.discard(agen)
512        if not self.is_closed():
513            self.call_soon_threadsafe(self.create_task, agen.aclose())
514
515    def _asyncgen_firstiter_hook(self, agen):
516        if self._asyncgens_shutdown_called:
517            warnings.warn(
518                f"asynchronous generator {agen!r} was scheduled after "
519                f"loop.shutdown_asyncgens() call",
520                ResourceWarning, source=self)
521
522        self._asyncgens.add(agen)
523
524    async def shutdown_asyncgens(self):
525        """Shutdown all active asynchronous generators."""
526        self._asyncgens_shutdown_called = True
527
528        if not len(self._asyncgens):
529            # If Python version is <3.6 or we don't have any asynchronous
530            # generators alive.
531            return
532
533        closing_agens = list(self._asyncgens)
534        self._asyncgens.clear()
535
536        results = await tasks.gather(
537            *[ag.aclose() for ag in closing_agens],
538            return_exceptions=True,
539            loop=self)
540
541        for result, agen in zip(results, closing_agens):
542            if isinstance(result, Exception):
543                self.call_exception_handler({
544                    'message': f'an error occurred during closing of '
545                               f'asynchronous generator {agen!r}',
546                    'exception': result,
547                    'asyncgen': agen
548                })
549
550    def _check_running(self):
551        if self.is_running():
552            raise RuntimeError('This event loop is already running')
553        if events._get_running_loop() is not None:
554            raise RuntimeError(
555                'Cannot run the event loop while another loop is running')
556
557    def run_forever(self):
558        """Run until stop() is called."""
559        self._check_closed()
560        self._check_running()
561        self._set_coroutine_origin_tracking(self._debug)
562        self._thread_id = threading.get_ident()
563
564        old_agen_hooks = sys.get_asyncgen_hooks()
565        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
566                               finalizer=self._asyncgen_finalizer_hook)
567        try:
568            events._set_running_loop(self)
569            while True:
570                self._run_once()
571                if self._stopping:
572                    break
573        finally:
574            self._stopping = False
575            self._thread_id = None
576            events._set_running_loop(None)
577            self._set_coroutine_origin_tracking(False)
578            sys.set_asyncgen_hooks(*old_agen_hooks)
579
580    def run_until_complete(self, future):
581        """Run until the Future is done.
582
583        If the argument is a coroutine, it is wrapped in a Task.
584
585        WARNING: It would be disastrous to call run_until_complete()
586        with the same coroutine twice -- it would wrap it in two
587        different Tasks and that can't be good.
588
589        Return the Future's result, or raise its exception.
590        """
591        self._check_closed()
592        self._check_running()
593
594        new_task = not futures.isfuture(future)
595        future = tasks.ensure_future(future, loop=self)
596        if new_task:
597            # An exception is raised if the future didn't complete, so there
598            # is no need to log the "destroy pending task" message
599            future._log_destroy_pending = False
600
601        future.add_done_callback(_run_until_complete_cb)
602        try:
603            self.run_forever()
604        except:
605            if new_task and future.done() and not future.cancelled():
606                # The coroutine raised a BaseException. Consume the exception
607                # to not log a warning, the caller doesn't have access to the
608                # local task.
609                future.exception()
610            raise
611        finally:
612            future.remove_done_callback(_run_until_complete_cb)
613        if not future.done():
614            raise RuntimeError('Event loop stopped before Future completed.')
615
616        return future.result()
617
618    def stop(self):
619        """Stop running the event loop.
620
621        Every callback already scheduled will still run.  This simply informs
622        run_forever to stop looping after a complete iteration.
623        """
624        self._stopping = True
625
626    def close(self):
627        """Close the event loop.
628
629        This clears the queues and shuts down the executor,
630        but does not wait for the executor to finish.
631
632        The event loop must not be running.
633        """
634        if self.is_running():
635            raise RuntimeError("Cannot close a running event loop")
636        if self._closed:
637            return
638        if self._debug:
639            logger.debug("Close %r", self)
640        self._closed = True
641        self._ready.clear()
642        self._scheduled.clear()
643        executor = self._default_executor
644        if executor is not None:
645            self._default_executor = None
646            executor.shutdown(wait=False)
647
648    def is_closed(self):
649        """Returns True if the event loop was closed."""
650        return self._closed
651
652    def __del__(self, _warn=warnings.warn):
653        if not self.is_closed():
654            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
655            if not self.is_running():
656                self.close()
657
658    def is_running(self):
659        """Returns True if the event loop is running."""
660        return (self._thread_id is not None)
661
662    def time(self):
663        """Return the time according to the event loop's clock.
664
665        This is a float expressed in seconds since an epoch, but the
666        epoch, precision, accuracy and drift are unspecified and may
667        differ per event loop.
668        """
669        return time.monotonic()
670
671    def call_later(self, delay, callback, *args, context=None):
672        """Arrange for a callback to be called at a given time.
673
674        Return a Handle: an opaque object with a cancel() method that
675        can be used to cancel the call.
676
677        The delay can be an int or float, expressed in seconds.  It is
678        always relative to the current time.
679
680        Each callback will be called exactly once.  If two callbacks
681        are scheduled for exactly the same time, it undefined which
682        will be called first.
683
684        Any positional arguments after the callback will be passed to
685        the callback when it is called.
686        """
687        timer = self.call_at(self.time() + delay, callback, *args,
688                             context=context)
689        if timer._source_traceback:
690            del timer._source_traceback[-1]
691        return timer
692
693    def call_at(self, when, callback, *args, context=None):
694        """Like call_later(), but uses an absolute time.
695
696        Absolute time corresponds to the event loop's time() method.
697        """
698        self._check_closed()
699        if self._debug:
700            self._check_thread()
701            self._check_callback(callback, 'call_at')
702        timer = events.TimerHandle(when, callback, args, self, context)
703        if timer._source_traceback:
704            del timer._source_traceback[-1]
705        heapq.heappush(self._scheduled, timer)
706        timer._scheduled = True
707        return timer
708
709    def call_soon(self, callback, *args, context=None):
710        """Arrange for a callback to be called as soon as possible.
711
712        This operates as a FIFO queue: callbacks are called in the
713        order in which they are registered.  Each callback will be
714        called exactly once.
715
716        Any positional arguments after the callback will be passed to
717        the callback when it is called.
718        """
719        self._check_closed()
720        if self._debug:
721            self._check_thread()
722            self._check_callback(callback, 'call_soon')
723        handle = self._call_soon(callback, args, context)
724        if handle._source_traceback:
725            del handle._source_traceback[-1]
726        return handle
727
728    def _check_callback(self, callback, method):
729        if (coroutines.iscoroutine(callback) or
730                coroutines.iscoroutinefunction(callback)):
731            raise TypeError(
732                f"coroutines cannot be used with {method}()")
733        if not callable(callback):
734            raise TypeError(
735                f'a callable object was expected by {method}(), '
736                f'got {callback!r}')
737
738    def _call_soon(self, callback, args, context):
739        handle = events.Handle(callback, args, self, context)
740        if handle._source_traceback:
741            del handle._source_traceback[-1]
742        self._ready.append(handle)
743        return handle
744
745    def _check_thread(self):
746        """Check that the current thread is the thread running the event loop.
747
748        Non-thread-safe methods of this class make this assumption and will
749        likely behave incorrectly when the assumption is violated.
750
751        Should only be called when (self._debug == True).  The caller is
752        responsible for checking this condition for performance reasons.
753        """
754        if self._thread_id is None:
755            return
756        thread_id = threading.get_ident()
757        if thread_id != self._thread_id:
758            raise RuntimeError(
759                "Non-thread-safe operation invoked on an event loop other "
760                "than the current one")
761
762    def call_soon_threadsafe(self, callback, *args, context=None):
763        """Like call_soon(), but thread-safe."""
764        self._check_closed()
765        if self._debug:
766            self._check_callback(callback, 'call_soon_threadsafe')
767        handle = self._call_soon(callback, args, context)
768        if handle._source_traceback:
769            del handle._source_traceback[-1]
770        self._write_to_self()
771        return handle
772
773    def run_in_executor(self, executor, func, *args):
774        self._check_closed()
775        if self._debug:
776            self._check_callback(func, 'run_in_executor')
777        if executor is None:
778            executor = self._default_executor
779            if executor is None:
780                executor = concurrent.futures.ThreadPoolExecutor()
781                self._default_executor = executor
782        return futures.wrap_future(
783            executor.submit(func, *args), loop=self)
784
785    def set_default_executor(self, executor):
786        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
787            warnings.warn(
788                'Using the default executor that is not an instance of '
789                'ThreadPoolExecutor is deprecated and will be prohibited '
790                'in Python 3.9',
791                DeprecationWarning, 2)
792        self._default_executor = executor
793
794    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
795        msg = [f"{host}:{port!r}"]
796        if family:
797            msg.append(f'family={family!r}')
798        if type:
799            msg.append(f'type={type!r}')
800        if proto:
801            msg.append(f'proto={proto!r}')
802        if flags:
803            msg.append(f'flags={flags!r}')
804        msg = ', '.join(msg)
805        logger.debug('Get address info %s', msg)
806
807        t0 = self.time()
808        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
809        dt = self.time() - t0
810
811        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
812        if dt >= self.slow_callback_duration:
813            logger.info(msg)
814        else:
815            logger.debug(msg)
816        return addrinfo
817
818    async def getaddrinfo(self, host, port, *,
819                          family=0, type=0, proto=0, flags=0):
820        if self._debug:
821            getaddr_func = self._getaddrinfo_debug
822        else:
823            getaddr_func = socket.getaddrinfo
824
825        return await self.run_in_executor(
826            None, getaddr_func, host, port, family, type, proto, flags)
827
828    async def getnameinfo(self, sockaddr, flags=0):
829        return await self.run_in_executor(
830            None, socket.getnameinfo, sockaddr, flags)
831
832    async def sock_sendfile(self, sock, file, offset=0, count=None,
833                            *, fallback=True):
834        if self._debug and sock.gettimeout() != 0:
835            raise ValueError("the socket must be non-blocking")
836        self._check_sendfile_params(sock, file, offset, count)
837        try:
838            return await self._sock_sendfile_native(sock, file,
839                                                    offset, count)
840        except exceptions.SendfileNotAvailableError as exc:
841            if not fallback:
842                raise
843        return await self._sock_sendfile_fallback(sock, file,
844                                                  offset, count)
845
846    async def _sock_sendfile_native(self, sock, file, offset, count):
847        # NB: sendfile syscall is not supported for SSL sockets and
848        # non-mmap files even if sendfile is supported by OS
849        raise exceptions.SendfileNotAvailableError(
850            f"syscall sendfile is not available for socket {sock!r} "
851            "and file {file!r} combination")
852
853    async def _sock_sendfile_fallback(self, sock, file, offset, count):
854        if offset:
855            file.seek(offset)
856        blocksize = (
857            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
858            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
859        )
860        buf = bytearray(blocksize)
861        total_sent = 0
862        try:
863            while True:
864                if count:
865                    blocksize = min(count - total_sent, blocksize)
866                    if blocksize <= 0:
867                        break
868                view = memoryview(buf)[:blocksize]
869                read = await self.run_in_executor(None, file.readinto, view)
870                if not read:
871                    break  # EOF
872                await self.sock_sendall(sock, view[:read])
873                total_sent += read
874            return total_sent
875        finally:
876            if total_sent > 0 and hasattr(file, 'seek'):
877                file.seek(offset + total_sent)
878
879    def _check_sendfile_params(self, sock, file, offset, count):
880        if 'b' not in getattr(file, 'mode', 'b'):
881            raise ValueError("file should be opened in binary mode")
882        if not sock.type == socket.SOCK_STREAM:
883            raise ValueError("only SOCK_STREAM type sockets are supported")
884        if count is not None:
885            if not isinstance(count, int):
886                raise TypeError(
887                    "count must be a positive integer (got {!r})".format(count))
888            if count <= 0:
889                raise ValueError(
890                    "count must be a positive integer (got {!r})".format(count))
891        if not isinstance(offset, int):
892            raise TypeError(
893                "offset must be a non-negative integer (got {!r})".format(
894                    offset))
895        if offset < 0:
896            raise ValueError(
897                "offset must be a non-negative integer (got {!r})".format(
898                    offset))
899
900    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
901        """Create, bind and connect one socket."""
902        my_exceptions = []
903        exceptions.append(my_exceptions)
904        family, type_, proto, _, address = addr_info
905        sock = None
906        try:
907            sock = socket.socket(family=family, type=type_, proto=proto)
908            sock.setblocking(False)
909            if local_addr_infos is not None:
910                for _, _, _, _, laddr in local_addr_infos:
911                    try:
912                        sock.bind(laddr)
913                        break
914                    except OSError as exc:
915                        msg = (
916                            f'error while attempting to bind on '
917                            f'address {laddr!r}: '
918                            f'{exc.strerror.lower()}'
919                        )
920                        exc = OSError(exc.errno, msg)
921                        my_exceptions.append(exc)
922                else:  # all bind attempts failed
923                    raise my_exceptions.pop()
924            await self.sock_connect(sock, address)
925            return sock
926        except OSError as exc:
927            my_exceptions.append(exc)
928            if sock is not None:
929                sock.close()
930            raise
931        except:
932            if sock is not None:
933                sock.close()
934            raise
935
936    async def create_connection(
937            self, protocol_factory, host=None, port=None,
938            *, ssl=None, family=0,
939            proto=0, flags=0, sock=None,
940            local_addr=None, server_hostname=None,
941            ssl_handshake_timeout=None,
942            happy_eyeballs_delay=None, interleave=None):
943        """Connect to a TCP server.
944
945        Create a streaming transport connection to a given Internet host and
946        port: socket family AF_INET or socket.AF_INET6 depending on host (or
947        family if specified), socket type SOCK_STREAM. protocol_factory must be
948        a callable returning a protocol instance.
949
950        This method is a coroutine which will try to establish the connection
951        in the background.  When successful, the coroutine returns a
952        (transport, protocol) pair.
953        """
954        if server_hostname is not None and not ssl:
955            raise ValueError('server_hostname is only meaningful with ssl')
956
957        if server_hostname is None and ssl:
958            # Use host as default for server_hostname.  It is an error
959            # if host is empty or not set, e.g. when an
960            # already-connected socket was passed or when only a port
961            # is given.  To avoid this error, you can pass
962            # server_hostname='' -- this will bypass the hostname
963            # check.  (This also means that if host is a numeric
964            # IP/IPv6 address, we will attempt to verify that exact
965            # address; this will probably fail, but it is possible to
966            # create a certificate for a specific IP address, so we
967            # don't judge it here.)
968            if not host:
969                raise ValueError('You must set server_hostname '
970                                 'when using ssl without a host')
971            server_hostname = host
972
973        if ssl_handshake_timeout is not None and not ssl:
974            raise ValueError(
975                'ssl_handshake_timeout is only meaningful with ssl')
976
977        if happy_eyeballs_delay is not None and interleave is None:
978            # If using happy eyeballs, default to interleave addresses by family
979            interleave = 1
980
981        if host is not None or port is not None:
982            if sock is not None:
983                raise ValueError(
984                    'host/port and sock can not be specified at the same time')
985
986            infos = await self._ensure_resolved(
987                (host, port), family=family,
988                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
989            if not infos:
990                raise OSError('getaddrinfo() returned empty list')
991
992            if local_addr is not None:
993                laddr_infos = await self._ensure_resolved(
994                    local_addr, family=family,
995                    type=socket.SOCK_STREAM, proto=proto,
996                    flags=flags, loop=self)
997                if not laddr_infos:
998                    raise OSError('getaddrinfo() returned empty list')
999            else:
1000                laddr_infos = None
1001
1002            if interleave:
1003                infos = _interleave_addrinfos(infos, interleave)
1004
1005            exceptions = []
1006            if happy_eyeballs_delay is None:
1007                # not using happy eyeballs
1008                for addrinfo in infos:
1009                    try:
1010                        sock = await self._connect_sock(
1011                            exceptions, addrinfo, laddr_infos)
1012                        break
1013                    except OSError:
1014                        continue
1015            else:  # using happy eyeballs
1016                sock, _, _ = await staggered.staggered_race(
1017                    (functools.partial(self._connect_sock,
1018                                       exceptions, addrinfo, laddr_infos)
1019                     for addrinfo in infos),
1020                    happy_eyeballs_delay, loop=self)
1021
1022            if sock is None:
1023                exceptions = [exc for sub in exceptions for exc in sub]
1024                if len(exceptions) == 1:
1025                    raise exceptions[0]
1026                else:
1027                    # If they all have the same str(), raise one.
1028                    model = str(exceptions[0])
1029                    if all(str(exc) == model for exc in exceptions):
1030                        raise exceptions[0]
1031                    # Raise a combined exception so the user can see all
1032                    # the various error messages.
1033                    raise OSError('Multiple exceptions: {}'.format(
1034                        ', '.join(str(exc) for exc in exceptions)))
1035
1036        else:
1037            if sock is None:
1038                raise ValueError(
1039                    'host and port was not specified and no sock specified')
1040            if sock.type != socket.SOCK_STREAM:
1041                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1042                # are SOCK_STREAM.
1043                # We support passing AF_UNIX sockets even though we have
1044                # a dedicated API for that: create_unix_connection.
1045                # Disallowing AF_UNIX in this method, breaks backwards
1046                # compatibility.
1047                raise ValueError(
1048                    f'A Stream Socket was expected, got {sock!r}')
1049
1050        transport, protocol = await self._create_connection_transport(
1051            sock, protocol_factory, ssl, server_hostname,
1052            ssl_handshake_timeout=ssl_handshake_timeout)
1053        if self._debug:
1054            # Get the socket from the transport because SSL transport closes
1055            # the old socket and creates a new SSL socket
1056            sock = transport.get_extra_info('socket')
1057            logger.debug("%r connected to %s:%r: (%r, %r)",
1058                         sock, host, port, transport, protocol)
1059        return transport, protocol
1060
1061    async def _create_connection_transport(
1062            self, sock, protocol_factory, ssl,
1063            server_hostname, server_side=False,
1064            ssl_handshake_timeout=None):
1065
1066        sock.setblocking(False)
1067
1068        protocol = protocol_factory()
1069        waiter = self.create_future()
1070        if ssl:
1071            sslcontext = None if isinstance(ssl, bool) else ssl
1072            transport = self._make_ssl_transport(
1073                sock, protocol, sslcontext, waiter,
1074                server_side=server_side, server_hostname=server_hostname,
1075                ssl_handshake_timeout=ssl_handshake_timeout)
1076        else:
1077            transport = self._make_socket_transport(sock, protocol, waiter)
1078
1079        try:
1080            await waiter
1081        except:
1082            transport.close()
1083            raise
1084
1085        return transport, protocol
1086
1087    async def sendfile(self, transport, file, offset=0, count=None,
1088                       *, fallback=True):
1089        """Send a file to transport.
1090
1091        Return the total number of bytes which were sent.
1092
1093        The method uses high-performance os.sendfile if available.
1094
1095        file must be a regular file object opened in binary mode.
1096
1097        offset tells from where to start reading the file. If specified,
1098        count is the total number of bytes to transmit as opposed to
1099        sending the file until EOF is reached. File position is updated on
1100        return or also in case of error in which case file.tell()
1101        can be used to figure out the number of bytes
1102        which were sent.
1103
1104        fallback set to True makes asyncio to manually read and send
1105        the file when the platform does not support the sendfile syscall
1106        (e.g. Windows or SSL socket on Unix).
1107
1108        Raise SendfileNotAvailableError if the system does not support
1109        sendfile syscall and fallback is False.
1110        """
1111        if transport.is_closing():
1112            raise RuntimeError("Transport is closing")
1113        mode = getattr(transport, '_sendfile_compatible',
1114                       constants._SendfileMode.UNSUPPORTED)
1115        if mode is constants._SendfileMode.UNSUPPORTED:
1116            raise RuntimeError(
1117                f"sendfile is not supported for transport {transport!r}")
1118        if mode is constants._SendfileMode.TRY_NATIVE:
1119            try:
1120                return await self._sendfile_native(transport, file,
1121                                                   offset, count)
1122            except exceptions.SendfileNotAvailableError as exc:
1123                if not fallback:
1124                    raise
1125
1126        if not fallback:
1127            raise RuntimeError(
1128                f"fallback is disabled and native sendfile is not "
1129                f"supported for transport {transport!r}")
1130
1131        return await self._sendfile_fallback(transport, file,
1132                                             offset, count)
1133
1134    async def _sendfile_native(self, transp, file, offset, count):
1135        raise exceptions.SendfileNotAvailableError(
1136            "sendfile syscall is not supported")
1137
1138    async def _sendfile_fallback(self, transp, file, offset, count):
1139        if offset:
1140            file.seek(offset)
1141        blocksize = min(count, 16384) if count else 16384
1142        buf = bytearray(blocksize)
1143        total_sent = 0
1144        proto = _SendfileFallbackProtocol(transp)
1145        try:
1146            while True:
1147                if count:
1148                    blocksize = min(count - total_sent, blocksize)
1149                    if blocksize <= 0:
1150                        return total_sent
1151                view = memoryview(buf)[:blocksize]
1152                read = await self.run_in_executor(None, file.readinto, view)
1153                if not read:
1154                    return total_sent  # EOF
1155                await proto.drain()
1156                transp.write(view[:read])
1157                total_sent += read
1158        finally:
1159            if total_sent > 0 and hasattr(file, 'seek'):
1160                file.seek(offset + total_sent)
1161            await proto.restore()
1162
1163    async def start_tls(self, transport, protocol, sslcontext, *,
1164                        server_side=False,
1165                        server_hostname=None,
1166                        ssl_handshake_timeout=None):
1167        """Upgrade transport to TLS.
1168
1169        Return a new transport that *protocol* should start using
1170        immediately.
1171        """
1172        if ssl is None:
1173            raise RuntimeError('Python ssl module is not available')
1174
1175        if not isinstance(sslcontext, ssl.SSLContext):
1176            raise TypeError(
1177                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1178                f'got {sslcontext!r}')
1179
1180        if not getattr(transport, '_start_tls_compatible', False):
1181            raise TypeError(
1182                f'transport {transport!r} is not supported by start_tls()')
1183
1184        waiter = self.create_future()
1185        ssl_protocol = sslproto.SSLProtocol(
1186            self, protocol, sslcontext, waiter,
1187            server_side, server_hostname,
1188            ssl_handshake_timeout=ssl_handshake_timeout,
1189            call_connection_made=False)
1190
1191        # Pause early so that "ssl_protocol.data_received()" doesn't
1192        # have a chance to get called before "ssl_protocol.connection_made()".
1193        transport.pause_reading()
1194
1195        transport.set_protocol(ssl_protocol)
1196        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1197        resume_cb = self.call_soon(transport.resume_reading)
1198
1199        try:
1200            await waiter
1201        except BaseException:
1202            transport.close()
1203            conmade_cb.cancel()
1204            resume_cb.cancel()
1205            raise
1206
1207        return ssl_protocol._app_transport
1208
1209    async def create_datagram_endpoint(self, protocol_factory,
1210                                       local_addr=None, remote_addr=None, *,
1211                                       family=0, proto=0, flags=0,
1212                                       reuse_address=_unset, reuse_port=None,
1213                                       allow_broadcast=None, sock=None):
1214        """Create datagram connection."""
1215        if sock is not None:
1216            if sock.type != socket.SOCK_DGRAM:
1217                raise ValueError(
1218                    f'A UDP Socket was expected, got {sock!r}')
1219            if (local_addr or remote_addr or
1220                    family or proto or flags or
1221                    reuse_port or allow_broadcast):
1222                # show the problematic kwargs in exception msg
1223                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1224                            family=family, proto=proto, flags=flags,
1225                            reuse_address=reuse_address, reuse_port=reuse_port,
1226                            allow_broadcast=allow_broadcast)
1227                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1228                raise ValueError(
1229                    f'socket modifier keyword arguments can not be used '
1230                    f'when sock is specified. ({problems})')
1231            sock.setblocking(False)
1232            r_addr = None
1233        else:
1234            if not (local_addr or remote_addr):
1235                if family == 0:
1236                    raise ValueError('unexpected address family')
1237                addr_pairs_info = (((family, proto), (None, None)),)
1238            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1239                for addr in (local_addr, remote_addr):
1240                    if addr is not None and not isinstance(addr, str):
1241                        raise TypeError('string is expected')
1242
1243                if local_addr and local_addr[0] not in (0, '\x00'):
1244                    try:
1245                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1246                            os.remove(local_addr)
1247                    except FileNotFoundError:
1248                        pass
1249                    except OSError as err:
1250                        # Directory may have permissions only to create socket.
1251                        logger.error('Unable to check or remove stale UNIX '
1252                                     'socket %r: %r',
1253                                     local_addr, err)
1254
1255                addr_pairs_info = (((family, proto),
1256                                    (local_addr, remote_addr)), )
1257            else:
1258                # join address by (family, protocol)
1259                addr_infos = {}  # Using order preserving dict
1260                for idx, addr in ((0, local_addr), (1, remote_addr)):
1261                    if addr is not None:
1262                        assert isinstance(addr, tuple) and len(addr) == 2, (
1263                            '2-tuple is expected')
1264
1265                        infos = await self._ensure_resolved(
1266                            addr, family=family, type=socket.SOCK_DGRAM,
1267                            proto=proto, flags=flags, loop=self)
1268                        if not infos:
1269                            raise OSError('getaddrinfo() returned empty list')
1270
1271                        for fam, _, pro, _, address in infos:
1272                            key = (fam, pro)
1273                            if key not in addr_infos:
1274                                addr_infos[key] = [None, None]
1275                            addr_infos[key][idx] = address
1276
1277                # each addr has to have info for each (family, proto) pair
1278                addr_pairs_info = [
1279                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1280                    if not ((local_addr and addr_pair[0] is None) or
1281                            (remote_addr and addr_pair[1] is None))]
1282
1283                if not addr_pairs_info:
1284                    raise ValueError('can not get address information')
1285
1286            exceptions = []
1287
1288            # bpo-37228
1289            if reuse_address is not _unset:
1290                if reuse_address:
1291                    raise ValueError("Passing `reuse_address=True` is no "
1292                                     "longer supported, as the usage of "
1293                                     "SO_REUSEPORT in UDP poses a significant "
1294                                     "security concern.")
1295                else:
1296                    warnings.warn("The *reuse_address* parameter has been "
1297                                  "deprecated as of 3.5.10 and is scheduled "
1298                                  "for removal in 3.11.", DeprecationWarning,
1299                                  stacklevel=2)
1300
1301            for ((family, proto),
1302                 (local_address, remote_address)) in addr_pairs_info:
1303                sock = None
1304                r_addr = None
1305                try:
1306                    sock = socket.socket(
1307                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1308                    if reuse_port:
1309                        _set_reuseport(sock)
1310                    if allow_broadcast:
1311                        sock.setsockopt(
1312                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1313                    sock.setblocking(False)
1314
1315                    if local_addr:
1316                        sock.bind(local_address)
1317                    if remote_addr:
1318                        if not allow_broadcast:
1319                            await self.sock_connect(sock, remote_address)
1320                        r_addr = remote_address
1321                except OSError as exc:
1322                    if sock is not None:
1323                        sock.close()
1324                    exceptions.append(exc)
1325                except:
1326                    if sock is not None:
1327                        sock.close()
1328                    raise
1329                else:
1330                    break
1331            else:
1332                raise exceptions[0]
1333
1334        protocol = protocol_factory()
1335        waiter = self.create_future()
1336        transport = self._make_datagram_transport(
1337            sock, protocol, r_addr, waiter)
1338        if self._debug:
1339            if local_addr:
1340                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1341                            "created: (%r, %r)",
1342                            local_addr, remote_addr, transport, protocol)
1343            else:
1344                logger.debug("Datagram endpoint remote_addr=%r created: "
1345                             "(%r, %r)",
1346                             remote_addr, transport, protocol)
1347
1348        try:
1349            await waiter
1350        except:
1351            transport.close()
1352            raise
1353
1354        return transport, protocol
1355
1356    async def _ensure_resolved(self, address, *,
1357                               family=0, type=socket.SOCK_STREAM,
1358                               proto=0, flags=0, loop):
1359        host, port = address[:2]
1360        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1361        if info is not None:
1362            # "host" is already a resolved IP.
1363            return [info]
1364        else:
1365            return await loop.getaddrinfo(host, port, family=family, type=type,
1366                                          proto=proto, flags=flags)
1367
1368    async def _create_server_getaddrinfo(self, host, port, family, flags):
1369        infos = await self._ensure_resolved((host, port), family=family,
1370                                            type=socket.SOCK_STREAM,
1371                                            flags=flags, loop=self)
1372        if not infos:
1373            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1374        return infos
1375
1376    async def create_server(
1377            self, protocol_factory, host=None, port=None,
1378            *,
1379            family=socket.AF_UNSPEC,
1380            flags=socket.AI_PASSIVE,
1381            sock=None,
1382            backlog=100,
1383            ssl=None,
1384            reuse_address=None,
1385            reuse_port=None,
1386            ssl_handshake_timeout=None,
1387            start_serving=True):
1388        """Create a TCP server.
1389
1390        The host parameter can be a string, in that case the TCP server is
1391        bound to host and port.
1392
1393        The host parameter can also be a sequence of strings and in that case
1394        the TCP server is bound to all hosts of the sequence. If a host
1395        appears multiple times (possibly indirectly e.g. when hostnames
1396        resolve to the same IP address), the server is only bound once to that
1397        host.
1398
1399        Return a Server object which can be used to stop the service.
1400
1401        This method is a coroutine.
1402        """
1403        if isinstance(ssl, bool):
1404            raise TypeError('ssl argument must be an SSLContext or None')
1405
1406        if ssl_handshake_timeout is not None and ssl is None:
1407            raise ValueError(
1408                'ssl_handshake_timeout is only meaningful with ssl')
1409
1410        if host is not None or port is not None:
1411            if sock is not None:
1412                raise ValueError(
1413                    'host/port and sock can not be specified at the same time')
1414
1415            if reuse_address is None:
1416                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1417            sockets = []
1418            if host == '':
1419                hosts = [None]
1420            elif (isinstance(host, str) or
1421                  not isinstance(host, collections.abc.Iterable)):
1422                hosts = [host]
1423            else:
1424                hosts = host
1425
1426            fs = [self._create_server_getaddrinfo(host, port, family=family,
1427                                                  flags=flags)
1428                  for host in hosts]
1429            infos = await tasks.gather(*fs, loop=self)
1430            infos = set(itertools.chain.from_iterable(infos))
1431
1432            completed = False
1433            try:
1434                for res in infos:
1435                    af, socktype, proto, canonname, sa = res
1436                    try:
1437                        sock = socket.socket(af, socktype, proto)
1438                    except socket.error:
1439                        # Assume it's a bad family/type/protocol combination.
1440                        if self._debug:
1441                            logger.warning('create_server() failed to create '
1442                                           'socket.socket(%r, %r, %r)',
1443                                           af, socktype, proto, exc_info=True)
1444                        continue
1445                    sockets.append(sock)
1446                    if reuse_address:
1447                        sock.setsockopt(
1448                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1449                    if reuse_port:
1450                        _set_reuseport(sock)
1451                    # Disable IPv4/IPv6 dual stack support (enabled by
1452                    # default on Linux) which makes a single socket
1453                    # listen on both address families.
1454                    if (_HAS_IPv6 and
1455                            af == socket.AF_INET6 and
1456                            hasattr(socket, 'IPPROTO_IPV6')):
1457                        sock.setsockopt(socket.IPPROTO_IPV6,
1458                                        socket.IPV6_V6ONLY,
1459                                        True)
1460                    try:
1461                        sock.bind(sa)
1462                    except OSError as err:
1463                        raise OSError(err.errno, 'error while attempting '
1464                                      'to bind on address %r: %s'
1465                                      % (sa, err.strerror.lower())) from None
1466                completed = True
1467            finally:
1468                if not completed:
1469                    for sock in sockets:
1470                        sock.close()
1471        else:
1472            if sock is None:
1473                raise ValueError('Neither host/port nor sock were specified')
1474            if sock.type != socket.SOCK_STREAM:
1475                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1476            sockets = [sock]
1477
1478        for sock in sockets:
1479            sock.setblocking(False)
1480
1481        server = Server(self, sockets, protocol_factory,
1482                        ssl, backlog, ssl_handshake_timeout)
1483        if start_serving:
1484            server._start_serving()
1485            # Skip one loop iteration so that all 'loop.add_reader'
1486            # go through.
1487            await tasks.sleep(0, loop=self)
1488
1489        if self._debug:
1490            logger.info("%r is serving", server)
1491        return server
1492
1493    async def connect_accepted_socket(
1494            self, protocol_factory, sock,
1495            *, ssl=None,
1496            ssl_handshake_timeout=None):
1497        """Handle an accepted connection.
1498
1499        This is used by servers that accept connections outside of
1500        asyncio but that use asyncio to handle connections.
1501
1502        This method is a coroutine.  When completed, the coroutine
1503        returns a (transport, protocol) pair.
1504        """
1505        if sock.type != socket.SOCK_STREAM:
1506            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1507
1508        if ssl_handshake_timeout is not None and not ssl:
1509            raise ValueError(
1510                'ssl_handshake_timeout is only meaningful with ssl')
1511
1512        transport, protocol = await self._create_connection_transport(
1513            sock, protocol_factory, ssl, '', server_side=True,
1514            ssl_handshake_timeout=ssl_handshake_timeout)
1515        if self._debug:
1516            # Get the socket from the transport because SSL transport closes
1517            # the old socket and creates a new SSL socket
1518            sock = transport.get_extra_info('socket')
1519            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1520        return transport, protocol
1521
1522    async def connect_read_pipe(self, protocol_factory, pipe):
1523        protocol = protocol_factory()
1524        waiter = self.create_future()
1525        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1526
1527        try:
1528            await waiter
1529        except:
1530            transport.close()
1531            raise
1532
1533        if self._debug:
1534            logger.debug('Read pipe %r connected: (%r, %r)',
1535                         pipe.fileno(), transport, protocol)
1536        return transport, protocol
1537
1538    async def connect_write_pipe(self, protocol_factory, pipe):
1539        protocol = protocol_factory()
1540        waiter = self.create_future()
1541        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1542
1543        try:
1544            await waiter
1545        except:
1546            transport.close()
1547            raise
1548
1549        if self._debug:
1550            logger.debug('Write pipe %r connected: (%r, %r)',
1551                         pipe.fileno(), transport, protocol)
1552        return transport, protocol
1553
1554    def _log_subprocess(self, msg, stdin, stdout, stderr):
1555        info = [msg]
1556        if stdin is not None:
1557            info.append(f'stdin={_format_pipe(stdin)}')
1558        if stdout is not None and stderr == subprocess.STDOUT:
1559            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1560        else:
1561            if stdout is not None:
1562                info.append(f'stdout={_format_pipe(stdout)}')
1563            if stderr is not None:
1564                info.append(f'stderr={_format_pipe(stderr)}')
1565        logger.debug(' '.join(info))
1566
1567    async def subprocess_shell(self, protocol_factory, cmd, *,
1568                               stdin=subprocess.PIPE,
1569                               stdout=subprocess.PIPE,
1570                               stderr=subprocess.PIPE,
1571                               universal_newlines=False,
1572                               shell=True, bufsize=0,
1573                               encoding=None, errors=None, text=None,
1574                               **kwargs):
1575        if not isinstance(cmd, (bytes, str)):
1576            raise ValueError("cmd must be a string")
1577        if universal_newlines:
1578            raise ValueError("universal_newlines must be False")
1579        if not shell:
1580            raise ValueError("shell must be True")
1581        if bufsize != 0:
1582            raise ValueError("bufsize must be 0")
1583        if text:
1584            raise ValueError("text must be False")
1585        if encoding is not None:
1586            raise ValueError("encoding must be None")
1587        if errors is not None:
1588            raise ValueError("errors must be None")
1589
1590        protocol = protocol_factory()
1591        debug_log = None
1592        if self._debug:
1593            # don't log parameters: they may contain sensitive information
1594            # (password) and may be too long
1595            debug_log = 'run shell command %r' % cmd
1596            self._log_subprocess(debug_log, stdin, stdout, stderr)
1597        transport = await self._make_subprocess_transport(
1598            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1599        if self._debug and debug_log is not None:
1600            logger.info('%s: %r', debug_log, transport)
1601        return transport, protocol
1602
1603    async def subprocess_exec(self, protocol_factory, program, *args,
1604                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1605                              stderr=subprocess.PIPE, universal_newlines=False,
1606                              shell=False, bufsize=0,
1607                              encoding=None, errors=None, text=None,
1608                              **kwargs):
1609        if universal_newlines:
1610            raise ValueError("universal_newlines must be False")
1611        if shell:
1612            raise ValueError("shell must be False")
1613        if bufsize != 0:
1614            raise ValueError("bufsize must be 0")
1615        if text:
1616            raise ValueError("text must be False")
1617        if encoding is not None:
1618            raise ValueError("encoding must be None")
1619        if errors is not None:
1620            raise ValueError("errors must be None")
1621
1622        popen_args = (program,) + args
1623        protocol = protocol_factory()
1624        debug_log = None
1625        if self._debug:
1626            # don't log parameters: they may contain sensitive information
1627            # (password) and may be too long
1628            debug_log = f'execute program {program!r}'
1629            self._log_subprocess(debug_log, stdin, stdout, stderr)
1630        transport = await self._make_subprocess_transport(
1631            protocol, popen_args, False, stdin, stdout, stderr,
1632            bufsize, **kwargs)
1633        if self._debug and debug_log is not None:
1634            logger.info('%s: %r', debug_log, transport)
1635        return transport, protocol
1636
1637    def get_exception_handler(self):
1638        """Return an exception handler, or None if the default one is in use.
1639        """
1640        return self._exception_handler
1641
1642    def set_exception_handler(self, handler):
1643        """Set handler as the new event loop exception handler.
1644
1645        If handler is None, the default exception handler will
1646        be set.
1647
1648        If handler is a callable object, it should have a
1649        signature matching '(loop, context)', where 'loop'
1650        will be a reference to the active event loop, 'context'
1651        will be a dict object (see `call_exception_handler()`
1652        documentation for details about context).
1653        """
1654        if handler is not None and not callable(handler):
1655            raise TypeError(f'A callable object or None is expected, '
1656                            f'got {handler!r}')
1657        self._exception_handler = handler
1658
1659    def default_exception_handler(self, context):
1660        """Default exception handler.
1661
1662        This is called when an exception occurs and no exception
1663        handler is set, and can be called by a custom exception
1664        handler that wants to defer to the default behavior.
1665
1666        This default handler logs the error message and other
1667        context-dependent information.  In debug mode, a truncated
1668        stack trace is also appended showing where the given object
1669        (e.g. a handle or future or task) was created, if any.
1670
1671        The context parameter has the same meaning as in
1672        `call_exception_handler()`.
1673        """
1674        message = context.get('message')
1675        if not message:
1676            message = 'Unhandled exception in event loop'
1677
1678        exception = context.get('exception')
1679        if exception is not None:
1680            exc_info = (type(exception), exception, exception.__traceback__)
1681        else:
1682            exc_info = False
1683
1684        if ('source_traceback' not in context and
1685                self._current_handle is not None and
1686                self._current_handle._source_traceback):
1687            context['handle_traceback'] = \
1688                self._current_handle._source_traceback
1689
1690        log_lines = [message]
1691        for key in sorted(context):
1692            if key in {'message', 'exception'}:
1693                continue
1694            value = context[key]
1695            if key == 'source_traceback':
1696                tb = ''.join(traceback.format_list(value))
1697                value = 'Object created at (most recent call last):\n'
1698                value += tb.rstrip()
1699            elif key == 'handle_traceback':
1700                tb = ''.join(traceback.format_list(value))
1701                value = 'Handle created at (most recent call last):\n'
1702                value += tb.rstrip()
1703            else:
1704                value = repr(value)
1705            log_lines.append(f'{key}: {value}')
1706
1707        logger.error('\n'.join(log_lines), exc_info=exc_info)
1708
1709    def call_exception_handler(self, context):
1710        """Call the current event loop's exception handler.
1711
1712        The context argument is a dict containing the following keys:
1713
1714        - 'message': Error message;
1715        - 'exception' (optional): Exception object;
1716        - 'future' (optional): Future instance;
1717        - 'task' (optional): Task instance;
1718        - 'handle' (optional): Handle instance;
1719        - 'protocol' (optional): Protocol instance;
1720        - 'transport' (optional): Transport instance;
1721        - 'socket' (optional): Socket instance;
1722        - 'asyncgen' (optional): Asynchronous generator that caused
1723                                 the exception.
1724
1725        New keys maybe introduced in the future.
1726
1727        Note: do not overload this method in an event loop subclass.
1728        For custom exception handling, use the
1729        `set_exception_handler()` method.
1730        """
1731        if self._exception_handler is None:
1732            try:
1733                self.default_exception_handler(context)
1734            except (SystemExit, KeyboardInterrupt):
1735                raise
1736            except BaseException:
1737                # Second protection layer for unexpected errors
1738                # in the default implementation, as well as for subclassed
1739                # event loops with overloaded "default_exception_handler".
1740                logger.error('Exception in default exception handler',
1741                             exc_info=True)
1742        else:
1743            try:
1744                self._exception_handler(self, context)
1745            except (SystemExit, KeyboardInterrupt):
1746                raise
1747            except BaseException as exc:
1748                # Exception in the user set custom exception handler.
1749                try:
1750                    # Let's try default handler.
1751                    self.default_exception_handler({
1752                        'message': 'Unhandled error in exception handler',
1753                        'exception': exc,
1754                        'context': context,
1755                    })
1756                except (SystemExit, KeyboardInterrupt):
1757                    raise
1758                except BaseException:
1759                    # Guard 'default_exception_handler' in case it is
1760                    # overloaded.
1761                    logger.error('Exception in default exception handler '
1762                                 'while handling an unexpected error '
1763                                 'in custom exception handler',
1764                                 exc_info=True)
1765
1766    def _add_callback(self, handle):
1767        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1768        assert isinstance(handle, events.Handle), 'A Handle is required here'
1769        if handle._cancelled:
1770            return
1771        assert not isinstance(handle, events.TimerHandle)
1772        self._ready.append(handle)
1773
1774    def _add_callback_signalsafe(self, handle):
1775        """Like _add_callback() but called from a signal handler."""
1776        self._add_callback(handle)
1777        self._write_to_self()
1778
1779    def _timer_handle_cancelled(self, handle):
1780        """Notification that a TimerHandle has been cancelled."""
1781        if handle._scheduled:
1782            self._timer_cancelled_count += 1
1783
1784    def _run_once(self):
1785        """Run one full iteration of the event loop.
1786
1787        This calls all currently ready callbacks, polls for I/O,
1788        schedules the resulting callbacks, and finally schedules
1789        'call_later' callbacks.
1790        """
1791
1792        sched_count = len(self._scheduled)
1793        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1794            self._timer_cancelled_count / sched_count >
1795                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1796            # Remove delayed calls that were cancelled if their number
1797            # is too high
1798            new_scheduled = []
1799            for handle in self._scheduled:
1800                if handle._cancelled:
1801                    handle._scheduled = False
1802                else:
1803                    new_scheduled.append(handle)
1804
1805            heapq.heapify(new_scheduled)
1806            self._scheduled = new_scheduled
1807            self._timer_cancelled_count = 0
1808        else:
1809            # Remove delayed calls that were cancelled from head of queue.
1810            while self._scheduled and self._scheduled[0]._cancelled:
1811                self._timer_cancelled_count -= 1
1812                handle = heapq.heappop(self._scheduled)
1813                handle._scheduled = False
1814
1815        timeout = None
1816        if self._ready or self._stopping:
1817            timeout = 0
1818        elif self._scheduled:
1819            # Compute the desired timeout.
1820            when = self._scheduled[0]._when
1821            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1822
1823        event_list = self._selector.select(timeout)
1824        self._process_events(event_list)
1825
1826        # Handle 'later' callbacks that are ready.
1827        end_time = self.time() + self._clock_resolution
1828        while self._scheduled:
1829            handle = self._scheduled[0]
1830            if handle._when >= end_time:
1831                break
1832            handle = heapq.heappop(self._scheduled)
1833            handle._scheduled = False
1834            self._ready.append(handle)
1835
1836        # This is the only place where callbacks are actually *called*.
1837        # All other places just add them to ready.
1838        # Note: We run all currently scheduled callbacks, but not any
1839        # callbacks scheduled by callbacks run this time around --
1840        # they will be run the next time (after another I/O poll).
1841        # Use an idiom that is thread-safe without using locks.
1842        ntodo = len(self._ready)
1843        for i in range(ntodo):
1844            handle = self._ready.popleft()
1845            if handle._cancelled:
1846                continue
1847            if self._debug:
1848                try:
1849                    self._current_handle = handle
1850                    t0 = self.time()
1851                    handle._run()
1852                    dt = self.time() - t0
1853                    if dt >= self.slow_callback_duration:
1854                        logger.warning('Executing %s took %.3f seconds',
1855                                       _format_handle(handle), dt)
1856                finally:
1857                    self._current_handle = None
1858            else:
1859                handle._run()
1860        handle = None  # Needed to break cycles when an exception occurs.
1861
1862    def _set_coroutine_origin_tracking(self, enabled):
1863        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1864            return
1865
1866        if enabled:
1867            self._coroutine_origin_tracking_saved_depth = (
1868                sys.get_coroutine_origin_tracking_depth())
1869            sys.set_coroutine_origin_tracking_depth(
1870                constants.DEBUG_STACK_DEPTH)
1871        else:
1872            sys.set_coroutine_origin_tracking_depth(
1873                self._coroutine_origin_tracking_saved_depth)
1874
1875        self._coroutine_origin_tracking_enabled = enabled
1876
1877    def get_debug(self):
1878        return self._debug
1879
1880    def set_debug(self, enabled):
1881        self._debug = enabled
1882
1883        if self.is_running():
1884            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1885