1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from . import trsock 29from .log import logger 30 31 32def _test_selector_event(selector, fd, event): 33 # Test if the selector is monitoring 'event' events 34 # for the file descriptor 'fd'. 35 try: 36 key = selector.get_key(fd) 37 except KeyError: 38 return False 39 else: 40 return bool(key.events & event) 41 42 43def _check_ssl_socket(sock): 44 if ssl is not None and isinstance(sock, ssl.SSLSocket): 45 raise TypeError("Socket cannot be of type SSLSocket") 46 47 48class BaseSelectorEventLoop(base_events.BaseEventLoop): 49 """Selector event loop. 50 51 See events.EventLoop for API specification. 52 """ 53 54 def __init__(self, selector=None): 55 super().__init__() 56 57 if selector is None: 58 selector = selectors.DefaultSelector() 59 logger.debug('Using selector: %s', selector.__class__.__name__) 60 self._selector = selector 61 self._make_self_pipe() 62 self._transports = weakref.WeakValueDictionary() 63 64 def _make_socket_transport(self, sock, protocol, waiter=None, *, 65 extra=None, server=None): 66 return _SelectorSocketTransport(self, sock, protocol, waiter, 67 extra, server) 68 69 def _make_ssl_transport( 70 self, rawsock, protocol, sslcontext, waiter=None, 71 *, server_side=False, server_hostname=None, 72 extra=None, server=None, 73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 74 ssl_protocol = sslproto.SSLProtocol( 75 self, protocol, sslcontext, waiter, 76 server_side, server_hostname, 77 ssl_handshake_timeout=ssl_handshake_timeout) 78 _SelectorSocketTransport(self, rawsock, ssl_protocol, 79 extra=extra, server=server) 80 return ssl_protocol._app_transport 81 82 def _make_datagram_transport(self, sock, protocol, 83 address=None, waiter=None, extra=None): 84 return _SelectorDatagramTransport(self, sock, protocol, 85 address, waiter, extra) 86 87 def close(self): 88 if self.is_running(): 89 raise RuntimeError("Cannot close a running event loop") 90 if self.is_closed(): 91 return 92 self._close_self_pipe() 93 super().close() 94 if self._selector is not None: 95 self._selector.close() 96 self._selector = None 97 98 def _close_self_pipe(self): 99 self._remove_reader(self._ssock.fileno()) 100 self._ssock.close() 101 self._ssock = None 102 self._csock.close() 103 self._csock = None 104 self._internal_fds -= 1 105 106 def _make_self_pipe(self): 107 # A self-socket, really. :-) 108 self._ssock, self._csock = socket.socketpair() 109 self._ssock.setblocking(False) 110 self._csock.setblocking(False) 111 self._internal_fds += 1 112 self._add_reader(self._ssock.fileno(), self._read_from_self) 113 114 def _process_self_data(self, data): 115 pass 116 117 def _read_from_self(self): 118 while True: 119 try: 120 data = self._ssock.recv(4096) 121 if not data: 122 break 123 self._process_self_data(data) 124 except InterruptedError: 125 continue 126 except BlockingIOError: 127 break 128 129 def _write_to_self(self): 130 # This may be called from a different thread, possibly after 131 # _close_self_pipe() has been called or even while it is 132 # running. Guard for self._csock being None or closed. When 133 # a socket is closed, send() raises OSError (with errno set to 134 # EBADF, but let's not rely on the exact error code). 135 csock = self._csock 136 if csock is None: 137 return 138 139 try: 140 csock.send(b'\0') 141 except OSError: 142 if self._debug: 143 logger.debug("Fail to write a null byte into the " 144 "self-pipe socket", 145 exc_info=True) 146 147 def _start_serving(self, protocol_factory, sock, 148 sslcontext=None, server=None, backlog=100, 149 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 150 self._add_reader(sock.fileno(), self._accept_connection, 151 protocol_factory, sock, sslcontext, server, backlog, 152 ssl_handshake_timeout) 153 154 def _accept_connection( 155 self, protocol_factory, sock, 156 sslcontext=None, server=None, backlog=100, 157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 158 # This method is only called once for each event loop tick where the 159 # listening socket has triggered an EVENT_READ. There may be multiple 160 # connections waiting for an .accept() so it is called in a loop. 161 # See https://bugs.python.org/issue27906 for more details. 162 for _ in range(backlog): 163 try: 164 conn, addr = sock.accept() 165 if self._debug: 166 logger.debug("%r got a new connection from %r: %r", 167 server, addr, conn) 168 conn.setblocking(False) 169 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 170 # Early exit because the socket accept buffer is empty. 171 return None 172 except OSError as exc: 173 # There's nowhere to send the error, so just log it. 174 if exc.errno in (errno.EMFILE, errno.ENFILE, 175 errno.ENOBUFS, errno.ENOMEM): 176 # Some platforms (e.g. Linux keep reporting the FD as 177 # ready, so we remove the read handler temporarily. 178 # We'll try again in a while. 179 self.call_exception_handler({ 180 'message': 'socket.accept() out of system resource', 181 'exception': exc, 182 'socket': trsock.TransportSocket(sock), 183 }) 184 self._remove_reader(sock.fileno()) 185 self.call_later(constants.ACCEPT_RETRY_DELAY, 186 self._start_serving, 187 protocol_factory, sock, sslcontext, server, 188 backlog, ssl_handshake_timeout) 189 else: 190 raise # The event loop will catch, log and ignore it. 191 else: 192 extra = {'peername': addr} 193 accept = self._accept_connection2( 194 protocol_factory, conn, extra, sslcontext, server, 195 ssl_handshake_timeout) 196 self.create_task(accept) 197 198 async def _accept_connection2( 199 self, protocol_factory, conn, extra, 200 sslcontext=None, server=None, 201 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 202 protocol = None 203 transport = None 204 try: 205 protocol = protocol_factory() 206 waiter = self.create_future() 207 if sslcontext: 208 transport = self._make_ssl_transport( 209 conn, protocol, sslcontext, waiter=waiter, 210 server_side=True, extra=extra, server=server, 211 ssl_handshake_timeout=ssl_handshake_timeout) 212 else: 213 transport = self._make_socket_transport( 214 conn, protocol, waiter=waiter, extra=extra, 215 server=server) 216 217 try: 218 await waiter 219 except BaseException: 220 transport.close() 221 raise 222 # It's now up to the protocol to handle the connection. 223 224 except (SystemExit, KeyboardInterrupt): 225 raise 226 except BaseException as exc: 227 if self._debug: 228 context = { 229 'message': 230 'Error on transport creation for incoming connection', 231 'exception': exc, 232 } 233 if protocol is not None: 234 context['protocol'] = protocol 235 if transport is not None: 236 context['transport'] = transport 237 self.call_exception_handler(context) 238 239 def _ensure_fd_no_transport(self, fd): 240 fileno = fd 241 if not isinstance(fileno, int): 242 try: 243 fileno = int(fileno.fileno()) 244 except (AttributeError, TypeError, ValueError): 245 # This code matches selectors._fileobj_to_fd function. 246 raise ValueError(f"Invalid file object: {fd!r}") from None 247 try: 248 transport = self._transports[fileno] 249 except KeyError: 250 pass 251 else: 252 if not transport.is_closing(): 253 raise RuntimeError( 254 f'File descriptor {fd!r} is used by transport ' 255 f'{transport!r}') 256 257 def _add_reader(self, fd, callback, *args): 258 self._check_closed() 259 handle = events.Handle(callback, args, self, None) 260 try: 261 key = self._selector.get_key(fd) 262 except KeyError: 263 self._selector.register(fd, selectors.EVENT_READ, 264 (handle, None)) 265 else: 266 mask, (reader, writer) = key.events, key.data 267 self._selector.modify(fd, mask | selectors.EVENT_READ, 268 (handle, writer)) 269 if reader is not None: 270 reader.cancel() 271 272 def _remove_reader(self, fd): 273 if self.is_closed(): 274 return False 275 try: 276 key = self._selector.get_key(fd) 277 except KeyError: 278 return False 279 else: 280 mask, (reader, writer) = key.events, key.data 281 mask &= ~selectors.EVENT_READ 282 if not mask: 283 self._selector.unregister(fd) 284 else: 285 self._selector.modify(fd, mask, (None, writer)) 286 287 if reader is not None: 288 reader.cancel() 289 return True 290 else: 291 return False 292 293 def _add_writer(self, fd, callback, *args): 294 self._check_closed() 295 handle = events.Handle(callback, args, self, None) 296 try: 297 key = self._selector.get_key(fd) 298 except KeyError: 299 self._selector.register(fd, selectors.EVENT_WRITE, 300 (None, handle)) 301 else: 302 mask, (reader, writer) = key.events, key.data 303 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 304 (reader, handle)) 305 if writer is not None: 306 writer.cancel() 307 308 def _remove_writer(self, fd): 309 """Remove a writer callback.""" 310 if self.is_closed(): 311 return False 312 try: 313 key = self._selector.get_key(fd) 314 except KeyError: 315 return False 316 else: 317 mask, (reader, writer) = key.events, key.data 318 # Remove both writer and connector. 319 mask &= ~selectors.EVENT_WRITE 320 if not mask: 321 self._selector.unregister(fd) 322 else: 323 self._selector.modify(fd, mask, (reader, None)) 324 325 if writer is not None: 326 writer.cancel() 327 return True 328 else: 329 return False 330 331 def add_reader(self, fd, callback, *args): 332 """Add a reader callback.""" 333 self._ensure_fd_no_transport(fd) 334 return self._add_reader(fd, callback, *args) 335 336 def remove_reader(self, fd): 337 """Remove a reader callback.""" 338 self._ensure_fd_no_transport(fd) 339 return self._remove_reader(fd) 340 341 def add_writer(self, fd, callback, *args): 342 """Add a writer callback..""" 343 self._ensure_fd_no_transport(fd) 344 return self._add_writer(fd, callback, *args) 345 346 def remove_writer(self, fd): 347 """Remove a writer callback.""" 348 self._ensure_fd_no_transport(fd) 349 return self._remove_writer(fd) 350 351 async def sock_recv(self, sock, n): 352 """Receive data from the socket. 353 354 The return value is a bytes object representing the data received. 355 The maximum amount of data to be received at once is specified by 356 nbytes. 357 """ 358 _check_ssl_socket(sock) 359 if self._debug and sock.gettimeout() != 0: 360 raise ValueError("the socket must be non-blocking") 361 try: 362 return sock.recv(n) 363 except (BlockingIOError, InterruptedError): 364 pass 365 fut = self.create_future() 366 fd = sock.fileno() 367 self.add_reader(fd, self._sock_recv, fut, sock, n) 368 fut.add_done_callback( 369 functools.partial(self._sock_read_done, fd)) 370 return await fut 371 372 def _sock_read_done(self, fd, fut): 373 self.remove_reader(fd) 374 375 def _sock_recv(self, fut, sock, n): 376 # _sock_recv() can add itself as an I/O callback if the operation can't 377 # be done immediately. Don't use it directly, call sock_recv(). 378 if fut.done(): 379 return 380 try: 381 data = sock.recv(n) 382 except (BlockingIOError, InterruptedError): 383 return # try again next time 384 except (SystemExit, KeyboardInterrupt): 385 raise 386 except BaseException as exc: 387 fut.set_exception(exc) 388 else: 389 fut.set_result(data) 390 391 async def sock_recv_into(self, sock, buf): 392 """Receive data from the socket. 393 394 The received data is written into *buf* (a writable buffer). 395 The return value is the number of bytes written. 396 """ 397 _check_ssl_socket(sock) 398 if self._debug and sock.gettimeout() != 0: 399 raise ValueError("the socket must be non-blocking") 400 try: 401 return sock.recv_into(buf) 402 except (BlockingIOError, InterruptedError): 403 pass 404 fut = self.create_future() 405 fd = sock.fileno() 406 self.add_reader(fd, self._sock_recv_into, fut, sock, buf) 407 fut.add_done_callback( 408 functools.partial(self._sock_read_done, fd)) 409 return await fut 410 411 def _sock_recv_into(self, fut, sock, buf): 412 # _sock_recv_into() can add itself as an I/O callback if the operation 413 # can't be done immediately. Don't use it directly, call 414 # sock_recv_into(). 415 if fut.done(): 416 return 417 try: 418 nbytes = sock.recv_into(buf) 419 except (BlockingIOError, InterruptedError): 420 return # try again next time 421 except (SystemExit, KeyboardInterrupt): 422 raise 423 except BaseException as exc: 424 fut.set_exception(exc) 425 else: 426 fut.set_result(nbytes) 427 428 async def sock_sendall(self, sock, data): 429 """Send data to the socket. 430 431 The socket must be connected to a remote socket. This method continues 432 to send data from data until either all data has been sent or an 433 error occurs. None is returned on success. On error, an exception is 434 raised, and there is no way to determine how much data, if any, was 435 successfully processed by the receiving end of the connection. 436 """ 437 _check_ssl_socket(sock) 438 if self._debug and sock.gettimeout() != 0: 439 raise ValueError("the socket must be non-blocking") 440 try: 441 n = sock.send(data) 442 except (BlockingIOError, InterruptedError): 443 n = 0 444 445 if n == len(data): 446 # all data sent 447 return 448 449 fut = self.create_future() 450 fd = sock.fileno() 451 fut.add_done_callback( 452 functools.partial(self._sock_write_done, fd)) 453 # use a trick with a list in closure to store a mutable state 454 self.add_writer(fd, self._sock_sendall, fut, sock, 455 memoryview(data), [n]) 456 return await fut 457 458 def _sock_sendall(self, fut, sock, view, pos): 459 if fut.done(): 460 # Future cancellation can be scheduled on previous loop iteration 461 return 462 start = pos[0] 463 try: 464 n = sock.send(view[start:]) 465 except (BlockingIOError, InterruptedError): 466 return 467 except (SystemExit, KeyboardInterrupt): 468 raise 469 except BaseException as exc: 470 fut.set_exception(exc) 471 return 472 473 start += n 474 475 if start == len(view): 476 fut.set_result(None) 477 else: 478 pos[0] = start 479 480 async def sock_connect(self, sock, address): 481 """Connect to a remote socket at address. 482 483 This method is a coroutine. 484 """ 485 _check_ssl_socket(sock) 486 if self._debug and sock.gettimeout() != 0: 487 raise ValueError("the socket must be non-blocking") 488 489 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 490 resolved = await self._ensure_resolved( 491 address, family=sock.family, proto=sock.proto, loop=self) 492 _, _, _, _, address = resolved[0] 493 494 fut = self.create_future() 495 self._sock_connect(fut, sock, address) 496 return await fut 497 498 def _sock_connect(self, fut, sock, address): 499 fd = sock.fileno() 500 try: 501 sock.connect(address) 502 except (BlockingIOError, InterruptedError): 503 # Issue #23618: When the C function connect() fails with EINTR, the 504 # connection runs in background. We have to wait until the socket 505 # becomes writable to be notified when the connection succeed or 506 # fails. 507 fut.add_done_callback( 508 functools.partial(self._sock_write_done, fd)) 509 self.add_writer(fd, self._sock_connect_cb, fut, sock, address) 510 except (SystemExit, KeyboardInterrupt): 511 raise 512 except BaseException as exc: 513 fut.set_exception(exc) 514 else: 515 fut.set_result(None) 516 517 def _sock_write_done(self, fd, fut): 518 self.remove_writer(fd) 519 520 def _sock_connect_cb(self, fut, sock, address): 521 if fut.done(): 522 return 523 524 try: 525 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 526 if err != 0: 527 # Jump to any except clause below. 528 raise OSError(err, f'Connect call failed {address}') 529 except (BlockingIOError, InterruptedError): 530 # socket is still registered, the callback will be retried later 531 pass 532 except (SystemExit, KeyboardInterrupt): 533 raise 534 except BaseException as exc: 535 fut.set_exception(exc) 536 else: 537 fut.set_result(None) 538 539 async def sock_accept(self, sock): 540 """Accept a connection. 541 542 The socket must be bound to an address and listening for connections. 543 The return value is a pair (conn, address) where conn is a new socket 544 object usable to send and receive data on the connection, and address 545 is the address bound to the socket on the other end of the connection. 546 """ 547 _check_ssl_socket(sock) 548 if self._debug and sock.gettimeout() != 0: 549 raise ValueError("the socket must be non-blocking") 550 fut = self.create_future() 551 self._sock_accept(fut, False, sock) 552 return await fut 553 554 def _sock_accept(self, fut, registered, sock): 555 fd = sock.fileno() 556 if registered: 557 self.remove_reader(fd) 558 if fut.done(): 559 return 560 try: 561 conn, address = sock.accept() 562 conn.setblocking(False) 563 except (BlockingIOError, InterruptedError): 564 self.add_reader(fd, self._sock_accept, fut, True, sock) 565 except (SystemExit, KeyboardInterrupt): 566 raise 567 except BaseException as exc: 568 fut.set_exception(exc) 569 else: 570 fut.set_result((conn, address)) 571 572 async def _sendfile_native(self, transp, file, offset, count): 573 del self._transports[transp._sock_fd] 574 resume_reading = transp.is_reading() 575 transp.pause_reading() 576 await transp._make_empty_waiter() 577 try: 578 return await self.sock_sendfile(transp._sock, file, offset, count, 579 fallback=False) 580 finally: 581 transp._reset_empty_waiter() 582 if resume_reading: 583 transp.resume_reading() 584 self._transports[transp._sock_fd] = transp 585 586 def _process_events(self, event_list): 587 for key, mask in event_list: 588 fileobj, (reader, writer) = key.fileobj, key.data 589 if mask & selectors.EVENT_READ and reader is not None: 590 if reader._cancelled: 591 self._remove_reader(fileobj) 592 else: 593 self._add_callback(reader) 594 if mask & selectors.EVENT_WRITE and writer is not None: 595 if writer._cancelled: 596 self._remove_writer(fileobj) 597 else: 598 self._add_callback(writer) 599 600 def _stop_serving(self, sock): 601 self._remove_reader(sock.fileno()) 602 sock.close() 603 604 605class _SelectorTransport(transports._FlowControlMixin, 606 transports.Transport): 607 608 max_size = 256 * 1024 # Buffer size passed to recv(). 609 610 _buffer_factory = bytearray # Constructs initial value for self._buffer. 611 612 # Attribute used in the destructor: it must be set even if the constructor 613 # is not called (see _SelectorSslTransport which may start by raising an 614 # exception) 615 _sock = None 616 617 def __init__(self, loop, sock, protocol, extra=None, server=None): 618 super().__init__(extra, loop) 619 self._extra['socket'] = trsock.TransportSocket(sock) 620 try: 621 self._extra['sockname'] = sock.getsockname() 622 except OSError: 623 self._extra['sockname'] = None 624 if 'peername' not in self._extra: 625 try: 626 self._extra['peername'] = sock.getpeername() 627 except socket.error: 628 self._extra['peername'] = None 629 self._sock = sock 630 self._sock_fd = sock.fileno() 631 632 self._protocol_connected = False 633 self.set_protocol(protocol) 634 635 self._server = server 636 self._buffer = self._buffer_factory() 637 self._conn_lost = 0 # Set when call to connection_lost scheduled. 638 self._closing = False # Set when close() called. 639 if self._server is not None: 640 self._server._attach() 641 loop._transports[self._sock_fd] = self 642 643 def __repr__(self): 644 info = [self.__class__.__name__] 645 if self._sock is None: 646 info.append('closed') 647 elif self._closing: 648 info.append('closing') 649 info.append(f'fd={self._sock_fd}') 650 # test if the transport was closed 651 if self._loop is not None and not self._loop.is_closed(): 652 polling = _test_selector_event(self._loop._selector, 653 self._sock_fd, selectors.EVENT_READ) 654 if polling: 655 info.append('read=polling') 656 else: 657 info.append('read=idle') 658 659 polling = _test_selector_event(self._loop._selector, 660 self._sock_fd, 661 selectors.EVENT_WRITE) 662 if polling: 663 state = 'polling' 664 else: 665 state = 'idle' 666 667 bufsize = self.get_write_buffer_size() 668 info.append(f'write=<{state}, bufsize={bufsize}>') 669 return '<{}>'.format(' '.join(info)) 670 671 def abort(self): 672 self._force_close(None) 673 674 def set_protocol(self, protocol): 675 self._protocol = protocol 676 self._protocol_connected = True 677 678 def get_protocol(self): 679 return self._protocol 680 681 def is_closing(self): 682 return self._closing 683 684 def close(self): 685 if self._closing: 686 return 687 self._closing = True 688 self._loop._remove_reader(self._sock_fd) 689 if not self._buffer: 690 self._conn_lost += 1 691 self._loop._remove_writer(self._sock_fd) 692 self._loop.call_soon(self._call_connection_lost, None) 693 694 def __del__(self, _warn=warnings.warn): 695 if self._sock is not None: 696 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 697 self._sock.close() 698 699 def _fatal_error(self, exc, message='Fatal error on transport'): 700 # Should be called from exception handler only. 701 if isinstance(exc, OSError): 702 if self._loop.get_debug(): 703 logger.debug("%r: %s", self, message, exc_info=True) 704 else: 705 self._loop.call_exception_handler({ 706 'message': message, 707 'exception': exc, 708 'transport': self, 709 'protocol': self._protocol, 710 }) 711 self._force_close(exc) 712 713 def _force_close(self, exc): 714 if self._conn_lost: 715 return 716 if self._buffer: 717 self._buffer.clear() 718 self._loop._remove_writer(self._sock_fd) 719 if not self._closing: 720 self._closing = True 721 self._loop._remove_reader(self._sock_fd) 722 self._conn_lost += 1 723 self._loop.call_soon(self._call_connection_lost, exc) 724 725 def _call_connection_lost(self, exc): 726 try: 727 if self._protocol_connected: 728 self._protocol.connection_lost(exc) 729 finally: 730 self._sock.close() 731 self._sock = None 732 self._protocol = None 733 self._loop = None 734 server = self._server 735 if server is not None: 736 server._detach() 737 self._server = None 738 739 def get_write_buffer_size(self): 740 return len(self._buffer) 741 742 def _add_reader(self, fd, callback, *args): 743 if self._closing: 744 return 745 746 self._loop._add_reader(fd, callback, *args) 747 748 749class _SelectorSocketTransport(_SelectorTransport): 750 751 _start_tls_compatible = True 752 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 753 754 def __init__(self, loop, sock, protocol, waiter=None, 755 extra=None, server=None): 756 757 self._read_ready_cb = None 758 super().__init__(loop, sock, protocol, extra, server) 759 self._eof = False 760 self._paused = False 761 self._empty_waiter = None 762 763 # Disable the Nagle algorithm -- small writes will be 764 # sent without waiting for the TCP ACK. This generally 765 # decreases the latency (in some cases significantly.) 766 base_events._set_nodelay(self._sock) 767 768 self._loop.call_soon(self._protocol.connection_made, self) 769 # only start reading when connection_made() has been called 770 self._loop.call_soon(self._add_reader, 771 self._sock_fd, self._read_ready) 772 if waiter is not None: 773 # only wake up the waiter when connection_made() has been called 774 self._loop.call_soon(futures._set_result_unless_cancelled, 775 waiter, None) 776 777 def set_protocol(self, protocol): 778 if isinstance(protocol, protocols.BufferedProtocol): 779 self._read_ready_cb = self._read_ready__get_buffer 780 else: 781 self._read_ready_cb = self._read_ready__data_received 782 783 super().set_protocol(protocol) 784 785 def is_reading(self): 786 return not self._paused and not self._closing 787 788 def pause_reading(self): 789 if self._closing or self._paused: 790 return 791 self._paused = True 792 self._loop._remove_reader(self._sock_fd) 793 if self._loop.get_debug(): 794 logger.debug("%r pauses reading", self) 795 796 def resume_reading(self): 797 if self._closing or not self._paused: 798 return 799 self._paused = False 800 self._add_reader(self._sock_fd, self._read_ready) 801 if self._loop.get_debug(): 802 logger.debug("%r resumes reading", self) 803 804 def _read_ready(self): 805 self._read_ready_cb() 806 807 def _read_ready__get_buffer(self): 808 if self._conn_lost: 809 return 810 811 try: 812 buf = self._protocol.get_buffer(-1) 813 if not len(buf): 814 raise RuntimeError('get_buffer() returned an empty buffer') 815 except (SystemExit, KeyboardInterrupt): 816 raise 817 except BaseException as exc: 818 self._fatal_error( 819 exc, 'Fatal error: protocol.get_buffer() call failed.') 820 return 821 822 try: 823 nbytes = self._sock.recv_into(buf) 824 except (BlockingIOError, InterruptedError): 825 return 826 except (SystemExit, KeyboardInterrupt): 827 raise 828 except BaseException as exc: 829 self._fatal_error(exc, 'Fatal read error on socket transport') 830 return 831 832 if not nbytes: 833 self._read_ready__on_eof() 834 return 835 836 try: 837 self._protocol.buffer_updated(nbytes) 838 except (SystemExit, KeyboardInterrupt): 839 raise 840 except BaseException as exc: 841 self._fatal_error( 842 exc, 'Fatal error: protocol.buffer_updated() call failed.') 843 844 def _read_ready__data_received(self): 845 if self._conn_lost: 846 return 847 try: 848 data = self._sock.recv(self.max_size) 849 except (BlockingIOError, InterruptedError): 850 return 851 except (SystemExit, KeyboardInterrupt): 852 raise 853 except BaseException as exc: 854 self._fatal_error(exc, 'Fatal read error on socket transport') 855 return 856 857 if not data: 858 self._read_ready__on_eof() 859 return 860 861 try: 862 self._protocol.data_received(data) 863 except (SystemExit, KeyboardInterrupt): 864 raise 865 except BaseException as exc: 866 self._fatal_error( 867 exc, 'Fatal error: protocol.data_received() call failed.') 868 869 def _read_ready__on_eof(self): 870 if self._loop.get_debug(): 871 logger.debug("%r received EOF", self) 872 873 try: 874 keep_open = self._protocol.eof_received() 875 except (SystemExit, KeyboardInterrupt): 876 raise 877 except BaseException as exc: 878 self._fatal_error( 879 exc, 'Fatal error: protocol.eof_received() call failed.') 880 return 881 882 if keep_open: 883 # We're keeping the connection open so the 884 # protocol can write more, but we still can't 885 # receive more, so remove the reader callback. 886 self._loop._remove_reader(self._sock_fd) 887 else: 888 self.close() 889 890 def write(self, data): 891 if not isinstance(data, (bytes, bytearray, memoryview)): 892 raise TypeError(f'data argument must be a bytes-like object, ' 893 f'not {type(data).__name__!r}') 894 if self._eof: 895 raise RuntimeError('Cannot call write() after write_eof()') 896 if self._empty_waiter is not None: 897 raise RuntimeError('unable to write; sendfile is in progress') 898 if not data: 899 return 900 901 if self._conn_lost: 902 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 903 logger.warning('socket.send() raised exception.') 904 self._conn_lost += 1 905 return 906 907 if not self._buffer: 908 # Optimization: try to send now. 909 try: 910 n = self._sock.send(data) 911 except (BlockingIOError, InterruptedError): 912 pass 913 except (SystemExit, KeyboardInterrupt): 914 raise 915 except BaseException as exc: 916 self._fatal_error(exc, 'Fatal write error on socket transport') 917 return 918 else: 919 data = data[n:] 920 if not data: 921 return 922 # Not all was written; register write handler. 923 self._loop._add_writer(self._sock_fd, self._write_ready) 924 925 # Add it to the buffer. 926 self._buffer.extend(data) 927 self._maybe_pause_protocol() 928 929 def _write_ready(self): 930 assert self._buffer, 'Data should not be empty' 931 932 if self._conn_lost: 933 return 934 try: 935 n = self._sock.send(self._buffer) 936 except (BlockingIOError, InterruptedError): 937 pass 938 except (SystemExit, KeyboardInterrupt): 939 raise 940 except BaseException as exc: 941 self._loop._remove_writer(self._sock_fd) 942 self._buffer.clear() 943 self._fatal_error(exc, 'Fatal write error on socket transport') 944 if self._empty_waiter is not None: 945 self._empty_waiter.set_exception(exc) 946 else: 947 if n: 948 del self._buffer[:n] 949 self._maybe_resume_protocol() # May append to buffer. 950 if not self._buffer: 951 self._loop._remove_writer(self._sock_fd) 952 if self._empty_waiter is not None: 953 self._empty_waiter.set_result(None) 954 if self._closing: 955 self._call_connection_lost(None) 956 elif self._eof: 957 self._sock.shutdown(socket.SHUT_WR) 958 959 def write_eof(self): 960 if self._closing or self._eof: 961 return 962 self._eof = True 963 if not self._buffer: 964 self._sock.shutdown(socket.SHUT_WR) 965 966 def can_write_eof(self): 967 return True 968 969 def _call_connection_lost(self, exc): 970 super()._call_connection_lost(exc) 971 if self._empty_waiter is not None: 972 self._empty_waiter.set_exception( 973 ConnectionError("Connection is closed by peer")) 974 975 def _make_empty_waiter(self): 976 if self._empty_waiter is not None: 977 raise RuntimeError("Empty waiter is already set") 978 self._empty_waiter = self._loop.create_future() 979 if not self._buffer: 980 self._empty_waiter.set_result(None) 981 return self._empty_waiter 982 983 def _reset_empty_waiter(self): 984 self._empty_waiter = None 985 986 987class _SelectorDatagramTransport(_SelectorTransport): 988 989 _buffer_factory = collections.deque 990 991 def __init__(self, loop, sock, protocol, address=None, 992 waiter=None, extra=None): 993 super().__init__(loop, sock, protocol, extra) 994 self._address = address 995 self._loop.call_soon(self._protocol.connection_made, self) 996 # only start reading when connection_made() has been called 997 self._loop.call_soon(self._add_reader, 998 self._sock_fd, self._read_ready) 999 if waiter is not None: 1000 # only wake up the waiter when connection_made() has been called 1001 self._loop.call_soon(futures._set_result_unless_cancelled, 1002 waiter, None) 1003 1004 def get_write_buffer_size(self): 1005 return sum(len(data) for data, _ in self._buffer) 1006 1007 def _read_ready(self): 1008 if self._conn_lost: 1009 return 1010 try: 1011 data, addr = self._sock.recvfrom(self.max_size) 1012 except (BlockingIOError, InterruptedError): 1013 pass 1014 except OSError as exc: 1015 self._protocol.error_received(exc) 1016 except (SystemExit, KeyboardInterrupt): 1017 raise 1018 except BaseException as exc: 1019 self._fatal_error(exc, 'Fatal read error on datagram transport') 1020 else: 1021 self._protocol.datagram_received(data, addr) 1022 1023 def sendto(self, data, addr=None): 1024 if not isinstance(data, (bytes, bytearray, memoryview)): 1025 raise TypeError(f'data argument must be a bytes-like object, ' 1026 f'not {type(data).__name__!r}') 1027 if not data: 1028 return 1029 1030 if self._address: 1031 if addr not in (None, self._address): 1032 raise ValueError( 1033 f'Invalid address: must be None or {self._address}') 1034 addr = self._address 1035 1036 if self._conn_lost and self._address: 1037 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1038 logger.warning('socket.send() raised exception.') 1039 self._conn_lost += 1 1040 return 1041 1042 if not self._buffer: 1043 # Attempt to send it right away first. 1044 try: 1045 if self._extra['peername']: 1046 self._sock.send(data) 1047 else: 1048 self._sock.sendto(data, addr) 1049 return 1050 except (BlockingIOError, InterruptedError): 1051 self._loop._add_writer(self._sock_fd, self._sendto_ready) 1052 except OSError as exc: 1053 self._protocol.error_received(exc) 1054 return 1055 except (SystemExit, KeyboardInterrupt): 1056 raise 1057 except BaseException as exc: 1058 self._fatal_error( 1059 exc, 'Fatal write error on datagram transport') 1060 return 1061 1062 # Ensure that what we buffer is immutable. 1063 self._buffer.append((bytes(data), addr)) 1064 self._maybe_pause_protocol() 1065 1066 def _sendto_ready(self): 1067 while self._buffer: 1068 data, addr = self._buffer.popleft() 1069 try: 1070 if self._extra['peername']: 1071 self._sock.send(data) 1072 else: 1073 self._sock.sendto(data, addr) 1074 except (BlockingIOError, InterruptedError): 1075 self._buffer.appendleft((data, addr)) # Try again later. 1076 break 1077 except OSError as exc: 1078 self._protocol.error_received(exc) 1079 return 1080 except (SystemExit, KeyboardInterrupt): 1081 raise 1082 except BaseException as exc: 1083 self._fatal_error( 1084 exc, 'Fatal write error on datagram transport') 1085 return 1086 1087 self._maybe_resume_protocol() # May append to buffer. 1088 if not self._buffer: 1089 self._loop._remove_writer(self._sock_fd) 1090 if self._closing: 1091 self._call_connection_lost(None) 1092