1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
30
31
32def _test_selector_event(selector, fd, event):
33    # Test if the selector is monitoring 'event' events
34    # for the file descriptor 'fd'.
35    try:
36        key = selector.get_key(fd)
37    except KeyError:
38        return False
39    else:
40        return bool(key.events & event)
41
42
43def _check_ssl_socket(sock):
44    if ssl is not None and isinstance(sock, ssl.SSLSocket):
45        raise TypeError("Socket cannot be of type SSLSocket")
46
47
48class BaseSelectorEventLoop(base_events.BaseEventLoop):
49    """Selector event loop.
50
51    See events.EventLoop for API specification.
52    """
53
54    def __init__(self, selector=None):
55        super().__init__()
56
57        if selector is None:
58            selector = selectors.DefaultSelector()
59        logger.debug('Using selector: %s', selector.__class__.__name__)
60        self._selector = selector
61        self._make_self_pipe()
62        self._transports = weakref.WeakValueDictionary()
63
64    def _make_socket_transport(self, sock, protocol, waiter=None, *,
65                               extra=None, server=None):
66        return _SelectorSocketTransport(self, sock, protocol, waiter,
67                                        extra, server)
68
69    def _make_ssl_transport(
70            self, rawsock, protocol, sslcontext, waiter=None,
71            *, server_side=False, server_hostname=None,
72            extra=None, server=None,
73            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
74        ssl_protocol = sslproto.SSLProtocol(
75                self, protocol, sslcontext, waiter,
76                server_side, server_hostname,
77                ssl_handshake_timeout=ssl_handshake_timeout)
78        _SelectorSocketTransport(self, rawsock, ssl_protocol,
79                                 extra=extra, server=server)
80        return ssl_protocol._app_transport
81
82    def _make_datagram_transport(self, sock, protocol,
83                                 address=None, waiter=None, extra=None):
84        return _SelectorDatagramTransport(self, sock, protocol,
85                                          address, waiter, extra)
86
87    def close(self):
88        if self.is_running():
89            raise RuntimeError("Cannot close a running event loop")
90        if self.is_closed():
91            return
92        self._close_self_pipe()
93        super().close()
94        if self._selector is not None:
95            self._selector.close()
96            self._selector = None
97
98    def _close_self_pipe(self):
99        self._remove_reader(self._ssock.fileno())
100        self._ssock.close()
101        self._ssock = None
102        self._csock.close()
103        self._csock = None
104        self._internal_fds -= 1
105
106    def _make_self_pipe(self):
107        # A self-socket, really. :-)
108        self._ssock, self._csock = socket.socketpair()
109        self._ssock.setblocking(False)
110        self._csock.setblocking(False)
111        self._internal_fds += 1
112        self._add_reader(self._ssock.fileno(), self._read_from_self)
113
114    def _process_self_data(self, data):
115        pass
116
117    def _read_from_self(self):
118        while True:
119            try:
120                data = self._ssock.recv(4096)
121                if not data:
122                    break
123                self._process_self_data(data)
124            except InterruptedError:
125                continue
126            except BlockingIOError:
127                break
128
129    def _write_to_self(self):
130        # This may be called from a different thread, possibly after
131        # _close_self_pipe() has been called or even while it is
132        # running.  Guard for self._csock being None or closed.  When
133        # a socket is closed, send() raises OSError (with errno set to
134        # EBADF, but let's not rely on the exact error code).
135        csock = self._csock
136        if csock is None:
137            return
138
139        try:
140            csock.send(b'\0')
141        except OSError:
142            if self._debug:
143                logger.debug("Fail to write a null byte into the "
144                             "self-pipe socket",
145                             exc_info=True)
146
147    def _start_serving(self, protocol_factory, sock,
148                       sslcontext=None, server=None, backlog=100,
149                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
150        self._add_reader(sock.fileno(), self._accept_connection,
151                         protocol_factory, sock, sslcontext, server, backlog,
152                         ssl_handshake_timeout)
153
154    def _accept_connection(
155            self, protocol_factory, sock,
156            sslcontext=None, server=None, backlog=100,
157            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
158        # This method is only called once for each event loop tick where the
159        # listening socket has triggered an EVENT_READ. There may be multiple
160        # connections waiting for an .accept() so it is called in a loop.
161        # See https://bugs.python.org/issue27906 for more details.
162        for _ in range(backlog):
163            try:
164                conn, addr = sock.accept()
165                if self._debug:
166                    logger.debug("%r got a new connection from %r: %r",
167                                 server, addr, conn)
168                conn.setblocking(False)
169            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
170                # Early exit because the socket accept buffer is empty.
171                return None
172            except OSError as exc:
173                # There's nowhere to send the error, so just log it.
174                if exc.errno in (errno.EMFILE, errno.ENFILE,
175                                 errno.ENOBUFS, errno.ENOMEM):
176                    # Some platforms (e.g. Linux keep reporting the FD as
177                    # ready, so we remove the read handler temporarily.
178                    # We'll try again in a while.
179                    self.call_exception_handler({
180                        'message': 'socket.accept() out of system resource',
181                        'exception': exc,
182                        'socket': trsock.TransportSocket(sock),
183                    })
184                    self._remove_reader(sock.fileno())
185                    self.call_later(constants.ACCEPT_RETRY_DELAY,
186                                    self._start_serving,
187                                    protocol_factory, sock, sslcontext, server,
188                                    backlog, ssl_handshake_timeout)
189                else:
190                    raise  # The event loop will catch, log and ignore it.
191            else:
192                extra = {'peername': addr}
193                accept = self._accept_connection2(
194                    protocol_factory, conn, extra, sslcontext, server,
195                    ssl_handshake_timeout)
196                self.create_task(accept)
197
198    async def _accept_connection2(
199            self, protocol_factory, conn, extra,
200            sslcontext=None, server=None,
201            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
202        protocol = None
203        transport = None
204        try:
205            protocol = protocol_factory()
206            waiter = self.create_future()
207            if sslcontext:
208                transport = self._make_ssl_transport(
209                    conn, protocol, sslcontext, waiter=waiter,
210                    server_side=True, extra=extra, server=server,
211                    ssl_handshake_timeout=ssl_handshake_timeout)
212            else:
213                transport = self._make_socket_transport(
214                    conn, protocol, waiter=waiter, extra=extra,
215                    server=server)
216
217            try:
218                await waiter
219            except BaseException:
220                transport.close()
221                raise
222                # It's now up to the protocol to handle the connection.
223
224        except (SystemExit, KeyboardInterrupt):
225            raise
226        except BaseException as exc:
227            if self._debug:
228                context = {
229                    'message':
230                        'Error on transport creation for incoming connection',
231                    'exception': exc,
232                }
233                if protocol is not None:
234                    context['protocol'] = protocol
235                if transport is not None:
236                    context['transport'] = transport
237                self.call_exception_handler(context)
238
239    def _ensure_fd_no_transport(self, fd):
240        fileno = fd
241        if not isinstance(fileno, int):
242            try:
243                fileno = int(fileno.fileno())
244            except (AttributeError, TypeError, ValueError):
245                # This code matches selectors._fileobj_to_fd function.
246                raise ValueError(f"Invalid file object: {fd!r}") from None
247        try:
248            transport = self._transports[fileno]
249        except KeyError:
250            pass
251        else:
252            if not transport.is_closing():
253                raise RuntimeError(
254                    f'File descriptor {fd!r} is used by transport '
255                    f'{transport!r}')
256
257    def _add_reader(self, fd, callback, *args):
258        self._check_closed()
259        handle = events.Handle(callback, args, self, None)
260        try:
261            key = self._selector.get_key(fd)
262        except KeyError:
263            self._selector.register(fd, selectors.EVENT_READ,
264                                    (handle, None))
265        else:
266            mask, (reader, writer) = key.events, key.data
267            self._selector.modify(fd, mask | selectors.EVENT_READ,
268                                  (handle, writer))
269            if reader is not None:
270                reader.cancel()
271
272    def _remove_reader(self, fd):
273        if self.is_closed():
274            return False
275        try:
276            key = self._selector.get_key(fd)
277        except KeyError:
278            return False
279        else:
280            mask, (reader, writer) = key.events, key.data
281            mask &= ~selectors.EVENT_READ
282            if not mask:
283                self._selector.unregister(fd)
284            else:
285                self._selector.modify(fd, mask, (None, writer))
286
287            if reader is not None:
288                reader.cancel()
289                return True
290            else:
291                return False
292
293    def _add_writer(self, fd, callback, *args):
294        self._check_closed()
295        handle = events.Handle(callback, args, self, None)
296        try:
297            key = self._selector.get_key(fd)
298        except KeyError:
299            self._selector.register(fd, selectors.EVENT_WRITE,
300                                    (None, handle))
301        else:
302            mask, (reader, writer) = key.events, key.data
303            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
304                                  (reader, handle))
305            if writer is not None:
306                writer.cancel()
307
308    def _remove_writer(self, fd):
309        """Remove a writer callback."""
310        if self.is_closed():
311            return False
312        try:
313            key = self._selector.get_key(fd)
314        except KeyError:
315            return False
316        else:
317            mask, (reader, writer) = key.events, key.data
318            # Remove both writer and connector.
319            mask &= ~selectors.EVENT_WRITE
320            if not mask:
321                self._selector.unregister(fd)
322            else:
323                self._selector.modify(fd, mask, (reader, None))
324
325            if writer is not None:
326                writer.cancel()
327                return True
328            else:
329                return False
330
331    def add_reader(self, fd, callback, *args):
332        """Add a reader callback."""
333        self._ensure_fd_no_transport(fd)
334        return self._add_reader(fd, callback, *args)
335
336    def remove_reader(self, fd):
337        """Remove a reader callback."""
338        self._ensure_fd_no_transport(fd)
339        return self._remove_reader(fd)
340
341    def add_writer(self, fd, callback, *args):
342        """Add a writer callback.."""
343        self._ensure_fd_no_transport(fd)
344        return self._add_writer(fd, callback, *args)
345
346    def remove_writer(self, fd):
347        """Remove a writer callback."""
348        self._ensure_fd_no_transport(fd)
349        return self._remove_writer(fd)
350
351    async def sock_recv(self, sock, n):
352        """Receive data from the socket.
353
354        The return value is a bytes object representing the data received.
355        The maximum amount of data to be received at once is specified by
356        nbytes.
357        """
358        _check_ssl_socket(sock)
359        if self._debug and sock.gettimeout() != 0:
360            raise ValueError("the socket must be non-blocking")
361        try:
362            return sock.recv(n)
363        except (BlockingIOError, InterruptedError):
364            pass
365        fut = self.create_future()
366        fd = sock.fileno()
367        self.add_reader(fd, self._sock_recv, fut, sock, n)
368        fut.add_done_callback(
369            functools.partial(self._sock_read_done, fd))
370        return await fut
371
372    def _sock_read_done(self, fd, fut):
373        self.remove_reader(fd)
374
375    def _sock_recv(self, fut, sock, n):
376        # _sock_recv() can add itself as an I/O callback if the operation can't
377        # be done immediately. Don't use it directly, call sock_recv().
378        if fut.done():
379            return
380        try:
381            data = sock.recv(n)
382        except (BlockingIOError, InterruptedError):
383            return  # try again next time
384        except (SystemExit, KeyboardInterrupt):
385            raise
386        except BaseException as exc:
387            fut.set_exception(exc)
388        else:
389            fut.set_result(data)
390
391    async def sock_recv_into(self, sock, buf):
392        """Receive data from the socket.
393
394        The received data is written into *buf* (a writable buffer).
395        The return value is the number of bytes written.
396        """
397        _check_ssl_socket(sock)
398        if self._debug and sock.gettimeout() != 0:
399            raise ValueError("the socket must be non-blocking")
400        try:
401            return sock.recv_into(buf)
402        except (BlockingIOError, InterruptedError):
403            pass
404        fut = self.create_future()
405        fd = sock.fileno()
406        self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
407        fut.add_done_callback(
408            functools.partial(self._sock_read_done, fd))
409        return await fut
410
411    def _sock_recv_into(self, fut, sock, buf):
412        # _sock_recv_into() can add itself as an I/O callback if the operation
413        # can't be done immediately. Don't use it directly, call
414        # sock_recv_into().
415        if fut.done():
416            return
417        try:
418            nbytes = sock.recv_into(buf)
419        except (BlockingIOError, InterruptedError):
420            return  # try again next time
421        except (SystemExit, KeyboardInterrupt):
422            raise
423        except BaseException as exc:
424            fut.set_exception(exc)
425        else:
426            fut.set_result(nbytes)
427
428    async def sock_sendall(self, sock, data):
429        """Send data to the socket.
430
431        The socket must be connected to a remote socket. This method continues
432        to send data from data until either all data has been sent or an
433        error occurs. None is returned on success. On error, an exception is
434        raised, and there is no way to determine how much data, if any, was
435        successfully processed by the receiving end of the connection.
436        """
437        _check_ssl_socket(sock)
438        if self._debug and sock.gettimeout() != 0:
439            raise ValueError("the socket must be non-blocking")
440        try:
441            n = sock.send(data)
442        except (BlockingIOError, InterruptedError):
443            n = 0
444
445        if n == len(data):
446            # all data sent
447            return
448
449        fut = self.create_future()
450        fd = sock.fileno()
451        fut.add_done_callback(
452            functools.partial(self._sock_write_done, fd))
453        # use a trick with a list in closure to store a mutable state
454        self.add_writer(fd, self._sock_sendall, fut, sock,
455                        memoryview(data), [n])
456        return await fut
457
458    def _sock_sendall(self, fut, sock, view, pos):
459        if fut.done():
460            # Future cancellation can be scheduled on previous loop iteration
461            return
462        start = pos[0]
463        try:
464            n = sock.send(view[start:])
465        except (BlockingIOError, InterruptedError):
466            return
467        except (SystemExit, KeyboardInterrupt):
468            raise
469        except BaseException as exc:
470            fut.set_exception(exc)
471            return
472
473        start += n
474
475        if start == len(view):
476            fut.set_result(None)
477        else:
478            pos[0] = start
479
480    async def sock_connect(self, sock, address):
481        """Connect to a remote socket at address.
482
483        This method is a coroutine.
484        """
485        _check_ssl_socket(sock)
486        if self._debug and sock.gettimeout() != 0:
487            raise ValueError("the socket must be non-blocking")
488
489        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
490            resolved = await self._ensure_resolved(
491                address, family=sock.family, proto=sock.proto, loop=self)
492            _, _, _, _, address = resolved[0]
493
494        fut = self.create_future()
495        self._sock_connect(fut, sock, address)
496        return await fut
497
498    def _sock_connect(self, fut, sock, address):
499        fd = sock.fileno()
500        try:
501            sock.connect(address)
502        except (BlockingIOError, InterruptedError):
503            # Issue #23618: When the C function connect() fails with EINTR, the
504            # connection runs in background. We have to wait until the socket
505            # becomes writable to be notified when the connection succeed or
506            # fails.
507            fut.add_done_callback(
508                functools.partial(self._sock_write_done, fd))
509            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
510        except (SystemExit, KeyboardInterrupt):
511            raise
512        except BaseException as exc:
513            fut.set_exception(exc)
514        else:
515            fut.set_result(None)
516
517    def _sock_write_done(self, fd, fut):
518        self.remove_writer(fd)
519
520    def _sock_connect_cb(self, fut, sock, address):
521        if fut.done():
522            return
523
524        try:
525            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
526            if err != 0:
527                # Jump to any except clause below.
528                raise OSError(err, f'Connect call failed {address}')
529        except (BlockingIOError, InterruptedError):
530            # socket is still registered, the callback will be retried later
531            pass
532        except (SystemExit, KeyboardInterrupt):
533            raise
534        except BaseException as exc:
535            fut.set_exception(exc)
536        else:
537            fut.set_result(None)
538
539    async def sock_accept(self, sock):
540        """Accept a connection.
541
542        The socket must be bound to an address and listening for connections.
543        The return value is a pair (conn, address) where conn is a new socket
544        object usable to send and receive data on the connection, and address
545        is the address bound to the socket on the other end of the connection.
546        """
547        _check_ssl_socket(sock)
548        if self._debug and sock.gettimeout() != 0:
549            raise ValueError("the socket must be non-blocking")
550        fut = self.create_future()
551        self._sock_accept(fut, False, sock)
552        return await fut
553
554    def _sock_accept(self, fut, registered, sock):
555        fd = sock.fileno()
556        if registered:
557            self.remove_reader(fd)
558        if fut.done():
559            return
560        try:
561            conn, address = sock.accept()
562            conn.setblocking(False)
563        except (BlockingIOError, InterruptedError):
564            self.add_reader(fd, self._sock_accept, fut, True, sock)
565        except (SystemExit, KeyboardInterrupt):
566            raise
567        except BaseException as exc:
568            fut.set_exception(exc)
569        else:
570            fut.set_result((conn, address))
571
572    async def _sendfile_native(self, transp, file, offset, count):
573        del self._transports[transp._sock_fd]
574        resume_reading = transp.is_reading()
575        transp.pause_reading()
576        await transp._make_empty_waiter()
577        try:
578            return await self.sock_sendfile(transp._sock, file, offset, count,
579                                            fallback=False)
580        finally:
581            transp._reset_empty_waiter()
582            if resume_reading:
583                transp.resume_reading()
584            self._transports[transp._sock_fd] = transp
585
586    def _process_events(self, event_list):
587        for key, mask in event_list:
588            fileobj, (reader, writer) = key.fileobj, key.data
589            if mask & selectors.EVENT_READ and reader is not None:
590                if reader._cancelled:
591                    self._remove_reader(fileobj)
592                else:
593                    self._add_callback(reader)
594            if mask & selectors.EVENT_WRITE and writer is not None:
595                if writer._cancelled:
596                    self._remove_writer(fileobj)
597                else:
598                    self._add_callback(writer)
599
600    def _stop_serving(self, sock):
601        self._remove_reader(sock.fileno())
602        sock.close()
603
604
605class _SelectorTransport(transports._FlowControlMixin,
606                         transports.Transport):
607
608    max_size = 256 * 1024  # Buffer size passed to recv().
609
610    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
611
612    # Attribute used in the destructor: it must be set even if the constructor
613    # is not called (see _SelectorSslTransport which may start by raising an
614    # exception)
615    _sock = None
616
617    def __init__(self, loop, sock, protocol, extra=None, server=None):
618        super().__init__(extra, loop)
619        self._extra['socket'] = trsock.TransportSocket(sock)
620        try:
621            self._extra['sockname'] = sock.getsockname()
622        except OSError:
623            self._extra['sockname'] = None
624        if 'peername' not in self._extra:
625            try:
626                self._extra['peername'] = sock.getpeername()
627            except socket.error:
628                self._extra['peername'] = None
629        self._sock = sock
630        self._sock_fd = sock.fileno()
631
632        self._protocol_connected = False
633        self.set_protocol(protocol)
634
635        self._server = server
636        self._buffer = self._buffer_factory()
637        self._conn_lost = 0  # Set when call to connection_lost scheduled.
638        self._closing = False  # Set when close() called.
639        if self._server is not None:
640            self._server._attach()
641        loop._transports[self._sock_fd] = self
642
643    def __repr__(self):
644        info = [self.__class__.__name__]
645        if self._sock is None:
646            info.append('closed')
647        elif self._closing:
648            info.append('closing')
649        info.append(f'fd={self._sock_fd}')
650        # test if the transport was closed
651        if self._loop is not None and not self._loop.is_closed():
652            polling = _test_selector_event(self._loop._selector,
653                                           self._sock_fd, selectors.EVENT_READ)
654            if polling:
655                info.append('read=polling')
656            else:
657                info.append('read=idle')
658
659            polling = _test_selector_event(self._loop._selector,
660                                           self._sock_fd,
661                                           selectors.EVENT_WRITE)
662            if polling:
663                state = 'polling'
664            else:
665                state = 'idle'
666
667            bufsize = self.get_write_buffer_size()
668            info.append(f'write=<{state}, bufsize={bufsize}>')
669        return '<{}>'.format(' '.join(info))
670
671    def abort(self):
672        self._force_close(None)
673
674    def set_protocol(self, protocol):
675        self._protocol = protocol
676        self._protocol_connected = True
677
678    def get_protocol(self):
679        return self._protocol
680
681    def is_closing(self):
682        return self._closing
683
684    def close(self):
685        if self._closing:
686            return
687        self._closing = True
688        self._loop._remove_reader(self._sock_fd)
689        if not self._buffer:
690            self._conn_lost += 1
691            self._loop._remove_writer(self._sock_fd)
692            self._loop.call_soon(self._call_connection_lost, None)
693
694    def __del__(self, _warn=warnings.warn):
695        if self._sock is not None:
696            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
697            self._sock.close()
698
699    def _fatal_error(self, exc, message='Fatal error on transport'):
700        # Should be called from exception handler only.
701        if isinstance(exc, OSError):
702            if self._loop.get_debug():
703                logger.debug("%r: %s", self, message, exc_info=True)
704        else:
705            self._loop.call_exception_handler({
706                'message': message,
707                'exception': exc,
708                'transport': self,
709                'protocol': self._protocol,
710            })
711        self._force_close(exc)
712
713    def _force_close(self, exc):
714        if self._conn_lost:
715            return
716        if self._buffer:
717            self._buffer.clear()
718            self._loop._remove_writer(self._sock_fd)
719        if not self._closing:
720            self._closing = True
721            self._loop._remove_reader(self._sock_fd)
722        self._conn_lost += 1
723        self._loop.call_soon(self._call_connection_lost, exc)
724
725    def _call_connection_lost(self, exc):
726        try:
727            if self._protocol_connected:
728                self._protocol.connection_lost(exc)
729        finally:
730            self._sock.close()
731            self._sock = None
732            self._protocol = None
733            self._loop = None
734            server = self._server
735            if server is not None:
736                server._detach()
737                self._server = None
738
739    def get_write_buffer_size(self):
740        return len(self._buffer)
741
742    def _add_reader(self, fd, callback, *args):
743        if self._closing:
744            return
745
746        self._loop._add_reader(fd, callback, *args)
747
748
749class _SelectorSocketTransport(_SelectorTransport):
750
751    _start_tls_compatible = True
752    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
753
754    def __init__(self, loop, sock, protocol, waiter=None,
755                 extra=None, server=None):
756
757        self._read_ready_cb = None
758        super().__init__(loop, sock, protocol, extra, server)
759        self._eof = False
760        self._paused = False
761        self._empty_waiter = None
762
763        # Disable the Nagle algorithm -- small writes will be
764        # sent without waiting for the TCP ACK.  This generally
765        # decreases the latency (in some cases significantly.)
766        base_events._set_nodelay(self._sock)
767
768        self._loop.call_soon(self._protocol.connection_made, self)
769        # only start reading when connection_made() has been called
770        self._loop.call_soon(self._add_reader,
771                             self._sock_fd, self._read_ready)
772        if waiter is not None:
773            # only wake up the waiter when connection_made() has been called
774            self._loop.call_soon(futures._set_result_unless_cancelled,
775                                 waiter, None)
776
777    def set_protocol(self, protocol):
778        if isinstance(protocol, protocols.BufferedProtocol):
779            self._read_ready_cb = self._read_ready__get_buffer
780        else:
781            self._read_ready_cb = self._read_ready__data_received
782
783        super().set_protocol(protocol)
784
785    def is_reading(self):
786        return not self._paused and not self._closing
787
788    def pause_reading(self):
789        if self._closing or self._paused:
790            return
791        self._paused = True
792        self._loop._remove_reader(self._sock_fd)
793        if self._loop.get_debug():
794            logger.debug("%r pauses reading", self)
795
796    def resume_reading(self):
797        if self._closing or not self._paused:
798            return
799        self._paused = False
800        self._add_reader(self._sock_fd, self._read_ready)
801        if self._loop.get_debug():
802            logger.debug("%r resumes reading", self)
803
804    def _read_ready(self):
805        self._read_ready_cb()
806
807    def _read_ready__get_buffer(self):
808        if self._conn_lost:
809            return
810
811        try:
812            buf = self._protocol.get_buffer(-1)
813            if not len(buf):
814                raise RuntimeError('get_buffer() returned an empty buffer')
815        except (SystemExit, KeyboardInterrupt):
816            raise
817        except BaseException as exc:
818            self._fatal_error(
819                exc, 'Fatal error: protocol.get_buffer() call failed.')
820            return
821
822        try:
823            nbytes = self._sock.recv_into(buf)
824        except (BlockingIOError, InterruptedError):
825            return
826        except (SystemExit, KeyboardInterrupt):
827            raise
828        except BaseException as exc:
829            self._fatal_error(exc, 'Fatal read error on socket transport')
830            return
831
832        if not nbytes:
833            self._read_ready__on_eof()
834            return
835
836        try:
837            self._protocol.buffer_updated(nbytes)
838        except (SystemExit, KeyboardInterrupt):
839            raise
840        except BaseException as exc:
841            self._fatal_error(
842                exc, 'Fatal error: protocol.buffer_updated() call failed.')
843
844    def _read_ready__data_received(self):
845        if self._conn_lost:
846            return
847        try:
848            data = self._sock.recv(self.max_size)
849        except (BlockingIOError, InterruptedError):
850            return
851        except (SystemExit, KeyboardInterrupt):
852            raise
853        except BaseException as exc:
854            self._fatal_error(exc, 'Fatal read error on socket transport')
855            return
856
857        if not data:
858            self._read_ready__on_eof()
859            return
860
861        try:
862            self._protocol.data_received(data)
863        except (SystemExit, KeyboardInterrupt):
864            raise
865        except BaseException as exc:
866            self._fatal_error(
867                exc, 'Fatal error: protocol.data_received() call failed.')
868
869    def _read_ready__on_eof(self):
870        if self._loop.get_debug():
871            logger.debug("%r received EOF", self)
872
873        try:
874            keep_open = self._protocol.eof_received()
875        except (SystemExit, KeyboardInterrupt):
876            raise
877        except BaseException as exc:
878            self._fatal_error(
879                exc, 'Fatal error: protocol.eof_received() call failed.')
880            return
881
882        if keep_open:
883            # We're keeping the connection open so the
884            # protocol can write more, but we still can't
885            # receive more, so remove the reader callback.
886            self._loop._remove_reader(self._sock_fd)
887        else:
888            self.close()
889
890    def write(self, data):
891        if not isinstance(data, (bytes, bytearray, memoryview)):
892            raise TypeError(f'data argument must be a bytes-like object, '
893                            f'not {type(data).__name__!r}')
894        if self._eof:
895            raise RuntimeError('Cannot call write() after write_eof()')
896        if self._empty_waiter is not None:
897            raise RuntimeError('unable to write; sendfile is in progress')
898        if not data:
899            return
900
901        if self._conn_lost:
902            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
903                logger.warning('socket.send() raised exception.')
904            self._conn_lost += 1
905            return
906
907        if not self._buffer:
908            # Optimization: try to send now.
909            try:
910                n = self._sock.send(data)
911            except (BlockingIOError, InterruptedError):
912                pass
913            except (SystemExit, KeyboardInterrupt):
914                raise
915            except BaseException as exc:
916                self._fatal_error(exc, 'Fatal write error on socket transport')
917                return
918            else:
919                data = data[n:]
920                if not data:
921                    return
922            # Not all was written; register write handler.
923            self._loop._add_writer(self._sock_fd, self._write_ready)
924
925        # Add it to the buffer.
926        self._buffer.extend(data)
927        self._maybe_pause_protocol()
928
929    def _write_ready(self):
930        assert self._buffer, 'Data should not be empty'
931
932        if self._conn_lost:
933            return
934        try:
935            n = self._sock.send(self._buffer)
936        except (BlockingIOError, InterruptedError):
937            pass
938        except (SystemExit, KeyboardInterrupt):
939            raise
940        except BaseException as exc:
941            self._loop._remove_writer(self._sock_fd)
942            self._buffer.clear()
943            self._fatal_error(exc, 'Fatal write error on socket transport')
944            if self._empty_waiter is not None:
945                self._empty_waiter.set_exception(exc)
946        else:
947            if n:
948                del self._buffer[:n]
949            self._maybe_resume_protocol()  # May append to buffer.
950            if not self._buffer:
951                self._loop._remove_writer(self._sock_fd)
952                if self._empty_waiter is not None:
953                    self._empty_waiter.set_result(None)
954                if self._closing:
955                    self._call_connection_lost(None)
956                elif self._eof:
957                    self._sock.shutdown(socket.SHUT_WR)
958
959    def write_eof(self):
960        if self._closing or self._eof:
961            return
962        self._eof = True
963        if not self._buffer:
964            self._sock.shutdown(socket.SHUT_WR)
965
966    def can_write_eof(self):
967        return True
968
969    def _call_connection_lost(self, exc):
970        super()._call_connection_lost(exc)
971        if self._empty_waiter is not None:
972            self._empty_waiter.set_exception(
973                ConnectionError("Connection is closed by peer"))
974
975    def _make_empty_waiter(self):
976        if self._empty_waiter is not None:
977            raise RuntimeError("Empty waiter is already set")
978        self._empty_waiter = self._loop.create_future()
979        if not self._buffer:
980            self._empty_waiter.set_result(None)
981        return self._empty_waiter
982
983    def _reset_empty_waiter(self):
984        self._empty_waiter = None
985
986
987class _SelectorDatagramTransport(_SelectorTransport):
988
989    _buffer_factory = collections.deque
990
991    def __init__(self, loop, sock, protocol, address=None,
992                 waiter=None, extra=None):
993        super().__init__(loop, sock, protocol, extra)
994        self._address = address
995        self._loop.call_soon(self._protocol.connection_made, self)
996        # only start reading when connection_made() has been called
997        self._loop.call_soon(self._add_reader,
998                             self._sock_fd, self._read_ready)
999        if waiter is not None:
1000            # only wake up the waiter when connection_made() has been called
1001            self._loop.call_soon(futures._set_result_unless_cancelled,
1002                                 waiter, None)
1003
1004    def get_write_buffer_size(self):
1005        return sum(len(data) for data, _ in self._buffer)
1006
1007    def _read_ready(self):
1008        if self._conn_lost:
1009            return
1010        try:
1011            data, addr = self._sock.recvfrom(self.max_size)
1012        except (BlockingIOError, InterruptedError):
1013            pass
1014        except OSError as exc:
1015            self._protocol.error_received(exc)
1016        except (SystemExit, KeyboardInterrupt):
1017            raise
1018        except BaseException as exc:
1019            self._fatal_error(exc, 'Fatal read error on datagram transport')
1020        else:
1021            self._protocol.datagram_received(data, addr)
1022
1023    def sendto(self, data, addr=None):
1024        if not isinstance(data, (bytes, bytearray, memoryview)):
1025            raise TypeError(f'data argument must be a bytes-like object, '
1026                            f'not {type(data).__name__!r}')
1027        if not data:
1028            return
1029
1030        if self._address:
1031            if addr not in (None, self._address):
1032                raise ValueError(
1033                    f'Invalid address: must be None or {self._address}')
1034            addr = self._address
1035
1036        if self._conn_lost and self._address:
1037            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1038                logger.warning('socket.send() raised exception.')
1039            self._conn_lost += 1
1040            return
1041
1042        if not self._buffer:
1043            # Attempt to send it right away first.
1044            try:
1045                if self._extra['peername']:
1046                    self._sock.send(data)
1047                else:
1048                    self._sock.sendto(data, addr)
1049                return
1050            except (BlockingIOError, InterruptedError):
1051                self._loop._add_writer(self._sock_fd, self._sendto_ready)
1052            except OSError as exc:
1053                self._protocol.error_received(exc)
1054                return
1055            except (SystemExit, KeyboardInterrupt):
1056                raise
1057            except BaseException as exc:
1058                self._fatal_error(
1059                    exc, 'Fatal write error on datagram transport')
1060                return
1061
1062        # Ensure that what we buffer is immutable.
1063        self._buffer.append((bytes(data), addr))
1064        self._maybe_pause_protocol()
1065
1066    def _sendto_ready(self):
1067        while self._buffer:
1068            data, addr = self._buffer.popleft()
1069            try:
1070                if self._extra['peername']:
1071                    self._sock.send(data)
1072                else:
1073                    self._sock.sendto(data, addr)
1074            except (BlockingIOError, InterruptedError):
1075                self._buffer.appendleft((data, addr))  # Try again later.
1076                break
1077            except OSError as exc:
1078                self._protocol.error_received(exc)
1079                return
1080            except (SystemExit, KeyboardInterrupt):
1081                raise
1082            except BaseException as exc:
1083                self._fatal_error(
1084                    exc, 'Fatal write error on datagram transport')
1085                return
1086
1087        self._maybe_resume_protocol()  # May append to buffer.
1088        if not self._buffer:
1089            self._loop._remove_writer(self._sock_fd)
1090            if self._closing:
1091                self._call_connection_lost(None)
1092