1""" 2IPC transport classes 3""" 4 5 6import errno 7import logging 8import socket 9import time 10 11import salt.ext.tornado 12import salt.ext.tornado.concurrent 13import salt.ext.tornado.gen 14import salt.ext.tornado.ioloop 15import salt.ext.tornado.netutil 16import salt.transport.client 17import salt.transport.frame 18import salt.utils.msgpack 19from salt.ext.tornado.ioloop import IOLoop 20from salt.ext.tornado.ioloop import TimeoutError as TornadoTimeoutError 21from salt.ext.tornado.iostream import IOStream, StreamClosedError 22from salt.ext.tornado.locks import Lock 23 24log = logging.getLogger(__name__) 25 26 27# 'tornado.concurrent.Future' doesn't support 28# remove_done_callback() which we would have called 29# in the timeout case. Due to this, we have this 30# callback function outside of FutureWithTimeout. 31def future_with_timeout_callback(future): 32 if future._future_with_timeout is not None: 33 future._future_with_timeout._done_callback(future) 34 35 36class FutureWithTimeout(salt.ext.tornado.concurrent.Future): 37 def __init__(self, io_loop, future, timeout): 38 super().__init__() 39 self.io_loop = io_loop 40 self._future = future 41 if timeout is not None: 42 if timeout < 0.1: 43 timeout = 0.1 44 self._timeout_handle = self.io_loop.add_timeout( 45 self.io_loop.time() + timeout, self._timeout_callback 46 ) 47 else: 48 self._timeout_handle = None 49 50 if hasattr(self._future, "_future_with_timeout"): 51 # Reusing a future that has previously been used. 52 # Due to this, no need to call add_done_callback() 53 # because we did that before. 54 self._future._future_with_timeout = self 55 if self._future.done(): 56 future_with_timeout_callback(self._future) 57 else: 58 self._future._future_with_timeout = self 59 self._future.add_done_callback(future_with_timeout_callback) 60 61 def _timeout_callback(self): 62 self._timeout_handle = None 63 # 'tornado.concurrent.Future' doesn't support 64 # remove_done_callback(). So we set an attribute 65 # inside the future itself to track what happens 66 # when it completes. 67 self._future._future_with_timeout = None 68 self.set_exception(TornadoTimeoutError()) 69 70 def _done_callback(self, future): 71 try: 72 if self._timeout_handle is not None: 73 self.io_loop.remove_timeout(self._timeout_handle) 74 self._timeout_handle = None 75 76 self.set_result(future.result()) 77 except Exception as exc: # pylint: disable=broad-except 78 self.set_exception(exc) 79 80 81class IPCServer: 82 """ 83 A Tornado IPC server very similar to Tornado's TCPServer class 84 but using either UNIX domain sockets or TCP sockets 85 """ 86 87 async_methods = [ 88 "handle_stream", 89 ] 90 close_methods = [ 91 "close", 92 ] 93 94 def __init__(self, socket_path, io_loop=None, payload_handler=None): 95 """ 96 Create a new Tornado IPC server 97 98 :param str/int socket_path: Path on the filesystem for the 99 socket to bind to. This socket does 100 not need to exist prior to calling 101 this method, but parent directories 102 should. 103 It may also be of type 'int', in 104 which case it is used as the port 105 for a tcp localhost connection. 106 :param IOLoop io_loop: A Tornado ioloop to handle scheduling 107 :param func payload_handler: A function to customize handling of 108 incoming data. 109 """ 110 self.socket_path = socket_path 111 self._started = False 112 self.payload_handler = payload_handler 113 114 # Placeholders for attributes to be populated by method calls 115 self.sock = None 116 self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() 117 self._closing = False 118 119 def start(self): 120 """ 121 Perform the work necessary to start up a Tornado IPC server 122 123 Blocks until socket is established 124 """ 125 # Start up the ioloop 126 log.trace("IPCServer: binding to socket: %s", self.socket_path) 127 if isinstance(self.socket_path, int): 128 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 129 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 130 self.sock.setblocking(0) 131 self.sock.bind(("127.0.0.1", self.socket_path)) 132 # Based on default used in tornado.netutil.bind_sockets() 133 self.sock.listen(128) 134 else: 135 self.sock = salt.ext.tornado.netutil.bind_unix_socket(self.socket_path) 136 137 with salt.utils.asynchronous.current_ioloop(self.io_loop): 138 salt.ext.tornado.netutil.add_accept_handler( 139 self.sock, 140 self.handle_connection, 141 ) 142 self._started = True 143 144 @salt.ext.tornado.gen.coroutine 145 def handle_stream(self, stream): 146 """ 147 Override this to handle the streams as they arrive 148 149 :param IOStream stream: An IOStream for processing 150 151 See https://tornado.readthedocs.io/en/latest/iostream.html#tornado.iostream.IOStream 152 for additional details. 153 """ 154 155 @salt.ext.tornado.gen.coroutine 156 def _null(msg): 157 raise salt.ext.tornado.gen.Return(None) 158 159 def write_callback(stream, header): 160 if header.get("mid"): 161 162 @salt.ext.tornado.gen.coroutine 163 def return_message(msg): 164 pack = salt.transport.frame.frame_msg_ipc( 165 msg, 166 header={"mid": header["mid"]}, 167 raw_body=True, 168 ) 169 yield stream.write(pack) 170 171 return return_message 172 else: 173 return _null 174 175 # msgpack deprecated `encoding` starting with version 0.5.2 176 if salt.utils.msgpack.version >= (0, 5, 2): 177 # Under Py2 we still want raw to be set to True 178 msgpack_kwargs = {"raw": False} 179 else: 180 msgpack_kwargs = {"encoding": "utf-8"} 181 unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs) 182 while not stream.closed(): 183 try: 184 wire_bytes = yield stream.read_bytes(4096, partial=True) 185 unpacker.feed(wire_bytes) 186 for framed_msg in unpacker: 187 body = framed_msg["body"] 188 self.io_loop.spawn_callback( 189 self.payload_handler, 190 body, 191 write_callback(stream, framed_msg["head"]), 192 ) 193 except StreamClosedError: 194 log.trace("Client disconnected from IPC %s", self.socket_path) 195 break 196 except OSError as exc: 197 # On occasion an exception will occur with 198 # an error code of 0, it's a spurious exception. 199 if exc.errno == 0: 200 log.trace( 201 "Exception occurred with error number 0, " 202 "spurious exception: %s", 203 exc, 204 ) 205 else: 206 log.error("Exception occurred while handling stream: %s", exc) 207 except Exception as exc: # pylint: disable=broad-except 208 log.error("Exception occurred while handling stream: %s", exc) 209 210 def handle_connection(self, connection, address): 211 log.trace("IPCServer: Handling connection to address: %s", address) 212 try: 213 with salt.utils.asynchronous.current_ioloop(self.io_loop): 214 stream = IOStream( 215 connection, 216 ) 217 self.io_loop.spawn_callback(self.handle_stream, stream) 218 except Exception as exc: # pylint: disable=broad-except 219 log.error("IPC streaming error: %s", exc) 220 221 def close(self): 222 """ 223 Routines to handle any cleanup before the instance shuts down. 224 Sockets and filehandles should be closed explicitly, to prevent 225 leaks. 226 """ 227 if self._closing: 228 return 229 self._closing = True 230 if hasattr(self.sock, "close"): 231 self.sock.close() 232 233 # pylint: disable=W1701 234 def __del__(self): 235 try: 236 self.close() 237 except TypeError: 238 # This is raised when Python's GC has collected objects which 239 # would be needed when calling self.close() 240 pass 241 242 # pylint: enable=W1701 243 244 def __enter__(self): 245 return self 246 247 def __exit__(self, *args): 248 self.close() 249 250 251class IPCClient: 252 """ 253 A Tornado IPC client very similar to Tornado's TCPClient class 254 but using either UNIX domain sockets or TCP sockets 255 256 This was written because Tornado does not have its own IPC 257 server/client implementation. 258 259 :param IOLoop io_loop: A Tornado ioloop to handle scheduling 260 :param str/int socket_path: A path on the filesystem where a socket 261 belonging to a running IPCServer can be 262 found. 263 It may also be of type 'int', in which 264 case it is used as the port for a tcp 265 localhost connection. 266 """ 267 268 def __init__(self, socket_path, io_loop=None): 269 """ 270 Create a new IPC client 271 272 IPC clients cannot bind to ports, but must connect to 273 existing IPC servers. Clients can then send messages 274 to the server. 275 276 """ 277 self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() 278 self.socket_path = socket_path 279 self._closing = False 280 self.stream = None 281 # msgpack deprecated `encoding` starting with version 0.5.2 282 if salt.utils.msgpack.version >= (0, 5, 2): 283 # Under Py2 we still want raw to be set to True 284 msgpack_kwargs = {"raw": False} 285 else: 286 msgpack_kwargs = {"encoding": "utf-8"} 287 self.unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs) 288 self._connecting_future = None 289 290 def connected(self): 291 return self.stream is not None and not self.stream.closed() 292 293 def connect(self, callback=None, timeout=None): 294 """ 295 Connect to the IPC socket 296 """ 297 if self._connecting_future is not None and not self._connecting_future.done(): 298 future = self._connecting_future 299 else: 300 if self._connecting_future is not None: 301 # read previous future result to prevent the "unhandled future exception" error 302 self._connecting_future.exception() # pylint: disable=E0203 303 future = salt.ext.tornado.concurrent.Future() 304 self._connecting_future = future 305 self._connect(timeout) 306 307 if callback is not None: 308 309 def handle_future(future): 310 response = future.result() 311 self.io_loop.add_callback(callback, response) 312 313 future.add_done_callback(handle_future) 314 315 return future 316 317 @salt.ext.tornado.gen.coroutine 318 def _connect(self, timeout=None): 319 """ 320 Connect to a running IPCServer 321 """ 322 if isinstance(self.socket_path, int): 323 sock_type = socket.AF_INET 324 sock_addr = ("127.0.0.1", self.socket_path) 325 else: 326 sock_type = socket.AF_UNIX 327 sock_addr = self.socket_path 328 329 self.stream = None 330 if timeout is not None: 331 timeout_at = time.time() + timeout 332 333 while True: 334 if self._closing: 335 break 336 337 if self.stream is None: 338 with salt.utils.asynchronous.current_ioloop(self.io_loop): 339 self.stream = IOStream(socket.socket(sock_type, socket.SOCK_STREAM)) 340 try: 341 log.trace("IPCClient: Connecting to socket: %s", self.socket_path) 342 yield self.stream.connect(sock_addr) 343 self._connecting_future.set_result(True) 344 break 345 except Exception as e: # pylint: disable=broad-except 346 if self.stream.closed(): 347 self.stream = None 348 349 if timeout is None or time.time() > timeout_at: 350 if self.stream is not None: 351 self.stream.close() 352 self.stream = None 353 self._connecting_future.set_exception(e) 354 break 355 356 yield salt.ext.tornado.gen.sleep(1) 357 358 def close(self): 359 """ 360 Routines to handle any cleanup before the instance shuts down. 361 Sockets and filehandles should be closed explicitly, to prevent 362 leaks. 363 """ 364 if self._closing: 365 return 366 367 self._closing = True 368 self._connecting_future = None 369 370 log.debug("Closing %s instance", self.__class__.__name__) 371 372 if self.stream is not None and not self.stream.closed(): 373 try: 374 self.stream.close() 375 except OSError as exc: 376 if exc.errno != errno.EBADF: 377 # If its not a bad file descriptor error, raise 378 raise 379 380 # pylint: disable=W1701 381 def __del__(self): 382 try: 383 self.close() 384 except TypeError: 385 # This is raised when Python's GC has collected objects which 386 # would be needed when calling self.close() 387 pass 388 389 # pylint: enable=W1701 390 391 def __enter__(self): 392 return self 393 394 def __exit__(self, *args): 395 self.close() 396 397 398class IPCMessageClient(IPCClient): 399 """ 400 Salt IPC message client 401 402 Create an IPC client to send messages to an IPC server 403 404 An example of a very simple IPCMessageClient connecting to an IPCServer. This 405 example assumes an already running IPCMessage server. 406 407 IMPORTANT: The below example also assumes a running IOLoop process. 408 409 # Import Tornado libs 410 import salt.ext.tornado.ioloop 411 412 # Import Salt libs 413 import salt.config 414 import salt.transport.ipc 415 416 io_loop = salt.ext.tornado.ioloop.IOLoop.current() 417 418 ipc_server_socket_path = '/var/run/ipc_server.ipc' 419 420 ipc_client = salt.transport.ipc.IPCMessageClient(ipc_server_socket_path, io_loop=io_loop) 421 422 # Connect to the server 423 ipc_client.connect() 424 425 # Send some data 426 ipc_client.send('Hello world') 427 """ 428 429 async_methods = [ 430 "send", 431 "connect", 432 "_connect", 433 ] 434 close_methods = [ 435 "close", 436 ] 437 438 # FIXME timeout unimplemented 439 # FIXME tries unimplemented 440 @salt.ext.tornado.gen.coroutine 441 def send(self, msg, timeout=None, tries=None): 442 """ 443 Send a message to an IPC socket 444 445 If the socket is not currently connected, a connection will be established. 446 447 :param dict msg: The message to be sent 448 :param int timeout: Timeout when sending message (Currently unimplemented) 449 """ 450 if not self.connected(): 451 yield self.connect() 452 pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) 453 yield self.stream.write(pack) 454 455 456class IPCMessageServer(IPCServer): 457 """ 458 Salt IPC message server 459 460 Creates a message server which can create and bind to a socket on a given 461 path and then respond to messages asynchronously. 462 463 An example of a very simple IPCServer which prints received messages to 464 a console: 465 466 # Import Tornado libs 467 import salt.ext.tornado.ioloop 468 469 # Import Salt libs 470 import salt.transport.ipc 471 472 io_loop = salt.ext.tornado.ioloop.IOLoop.current() 473 ipc_server_socket_path = '/var/run/ipc_server.ipc' 474 ipc_server = salt.transport.ipc.IPCMessageServer(ipc_server_socket_path, io_loop=io_loop, 475 payload_handler=print_to_console) 476 # Bind to the socket and prepare to run 477 ipc_server.start() 478 479 # Start the server 480 io_loop.start() 481 482 # This callback is run whenever a message is received 483 def print_to_console(payload): 484 print(payload) 485 486 See IPCMessageClient() for an example of sending messages to an IPCMessageServer instance 487 """ 488 489 490class IPCMessagePublisher: 491 """ 492 A Tornado IPC Publisher similar to Tornado's TCPServer class 493 but using either UNIX domain sockets or TCP sockets 494 """ 495 496 def __init__(self, opts, socket_path, io_loop=None): 497 """ 498 Create a new Tornado IPC server 499 :param dict opts: Salt options 500 :param str/int socket_path: Path on the filesystem for the 501 socket to bind to. This socket does 502 not need to exist prior to calling 503 this method, but parent directories 504 should. 505 It may also be of type 'int', in 506 which case it is used as the port 507 for a tcp localhost connection. 508 :param IOLoop io_loop: A Tornado ioloop to handle scheduling 509 """ 510 self.opts = opts 511 self.socket_path = socket_path 512 self._started = False 513 514 # Placeholders for attributes to be populated by method calls 515 self.sock = None 516 self.io_loop = io_loop or IOLoop.current() 517 self._closing = False 518 self.streams = set() 519 520 def start(self): 521 """ 522 Perform the work necessary to start up a Tornado IPC server 523 524 Blocks until socket is established 525 """ 526 # Start up the ioloop 527 log.trace("IPCMessagePublisher: binding to socket: %s", self.socket_path) 528 if isinstance(self.socket_path, int): 529 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 530 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 531 self.sock.setblocking(0) 532 self.sock.bind(("127.0.0.1", self.socket_path)) 533 # Based on default used in salt.ext.tornado.netutil.bind_sockets() 534 self.sock.listen(128) 535 else: 536 self.sock = salt.ext.tornado.netutil.bind_unix_socket(self.socket_path) 537 538 with salt.utils.asynchronous.current_ioloop(self.io_loop): 539 salt.ext.tornado.netutil.add_accept_handler( 540 self.sock, 541 self.handle_connection, 542 ) 543 self._started = True 544 545 @salt.ext.tornado.gen.coroutine 546 def _write(self, stream, pack): 547 try: 548 yield stream.write(pack) 549 except StreamClosedError: 550 log.trace("Client disconnected from IPC %s", self.socket_path) 551 self.streams.discard(stream) 552 except Exception as exc: # pylint: disable=broad-except 553 log.error("Exception occurred while handling stream: %s", exc) 554 if not stream.closed(): 555 stream.close() 556 self.streams.discard(stream) 557 558 def publish(self, msg): 559 """ 560 Send message to all connected sockets 561 """ 562 if not self.streams: 563 return 564 565 pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) 566 for stream in self.streams: 567 self.io_loop.spawn_callback(self._write, stream, pack) 568 569 def handle_connection(self, connection, address): 570 log.trace("IPCServer: Handling connection to address: %s", address) 571 try: 572 kwargs = {} 573 if self.opts["ipc_write_buffer"] > 0: 574 kwargs["max_write_buffer_size"] = self.opts["ipc_write_buffer"] 575 log.trace( 576 "Setting IPC connection write buffer: %s", 577 (self.opts["ipc_write_buffer"]), 578 ) 579 with salt.utils.asynchronous.current_ioloop(self.io_loop): 580 stream = IOStream(connection, **kwargs) 581 self.streams.add(stream) 582 583 def discard_after_closed(): 584 self.streams.discard(stream) 585 586 stream.set_close_callback(discard_after_closed) 587 except Exception as exc: # pylint: disable=broad-except 588 log.error("IPC streaming error: %s", exc) 589 590 def close(self): 591 """ 592 Routines to handle any cleanup before the instance shuts down. 593 Sockets and filehandles should be closed explicitly, to prevent 594 leaks. 595 """ 596 if self._closing: 597 return 598 self._closing = True 599 for stream in self.streams: 600 stream.close() 601 self.streams.clear() 602 if hasattr(self.sock, "close"): 603 self.sock.close() 604 605 def __enter__(self): 606 return self 607 608 def __exit__(self, *args): 609 self.close() 610 611 612class IPCMessageSubscriber(IPCClient): 613 """ 614 Salt IPC message subscriber 615 616 Create an IPC client to receive messages from IPC publisher 617 618 An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher. 619 This example assumes an already running IPCMessagePublisher. 620 621 IMPORTANT: The below example also assumes the IOLoop is NOT running. 622 623 # Import Tornado libs 624 import salt.ext.tornado.ioloop 625 626 # Import Salt libs 627 import salt.config 628 import salt.transport.ipc 629 630 # Create a new IO Loop. 631 # We know that this new IO Loop is not currently running. 632 io_loop = salt.ext.tornado.ioloop.IOLoop() 633 634 ipc_publisher_socket_path = '/var/run/ipc_publisher.ipc' 635 636 ipc_subscriber = salt.transport.ipc.IPCMessageSubscriber(ipc_server_socket_path, io_loop=io_loop) 637 638 # Connect to the server 639 # Use the associated IO Loop that isn't running. 640 io_loop.run_sync(ipc_subscriber.connect) 641 642 # Wait for some data 643 package = ipc_subscriber.read_sync() 644 """ 645 646 async_methods = [ 647 "read", 648 "connect", 649 ] 650 close_methods = [ 651 "close", 652 ] 653 654 def __init__(self, socket_path, io_loop=None): 655 super().__init__(socket_path, io_loop=io_loop) 656 self._read_stream_future = None 657 self._saved_data = [] 658 self._read_in_progress = Lock() 659 660 @salt.ext.tornado.gen.coroutine 661 def _read(self, timeout, callback=None): 662 try: 663 yield self._read_in_progress.acquire(timeout=0.00000001) 664 except salt.ext.tornado.gen.TimeoutError: 665 raise salt.ext.tornado.gen.Return(None) 666 667 exc_to_raise = None 668 ret = None 669 try: 670 while True: 671 if self._read_stream_future is None: 672 self._read_stream_future = self.stream.read_bytes( 673 4096, partial=True 674 ) 675 676 if timeout is None: 677 wire_bytes = yield self._read_stream_future 678 else: 679 wire_bytes = yield FutureWithTimeout( 680 self.io_loop, self._read_stream_future, timeout 681 ) 682 self._read_stream_future = None 683 684 # Remove the timeout once we get some data or an exception 685 # occurs. We will assume that the rest of the data is already 686 # there or is coming soon if an exception doesn't occur. 687 timeout = None 688 689 self.unpacker.feed(wire_bytes) 690 first_sync_msg = True 691 for framed_msg in self.unpacker: 692 if callback: 693 self.io_loop.spawn_callback(callback, framed_msg["body"]) 694 elif first_sync_msg: 695 ret = framed_msg["body"] 696 first_sync_msg = False 697 else: 698 self._saved_data.append(framed_msg["body"]) 699 if not first_sync_msg: 700 # We read at least one piece of data and we're on sync run 701 break 702 except TornadoTimeoutError: 703 # In the timeout case, just return None. 704 # Keep 'self._read_stream_future' alive. 705 ret = None 706 except StreamClosedError as exc: 707 log.trace("Subscriber disconnected from IPC %s", self.socket_path) 708 self._read_stream_future = None 709 except Exception as exc: # pylint: disable=broad-except 710 log.error("Exception occurred in Subscriber while handling stream: %s", exc) 711 self._read_stream_future = None 712 exc_to_raise = exc 713 714 self._read_in_progress.release() 715 716 if exc_to_raise is not None: 717 raise exc_to_raise # pylint: disable=E0702 718 raise salt.ext.tornado.gen.Return(ret) 719 720 @salt.ext.tornado.gen.coroutine 721 def read(self, timeout): 722 """ 723 Asynchronously read messages and invoke a callback when they are ready. 724 :param callback: A callback with the received data 725 """ 726 if self._saved_data: 727 res = self._saved_data.pop(0) 728 raise salt.ext.tornado.gen.Return(res) 729 while not self.connected(): 730 try: 731 yield self.connect(timeout=5) 732 except StreamClosedError: 733 log.trace( 734 "Subscriber closed stream on IPC %s before connect", 735 self.socket_path, 736 ) 737 yield salt.ext.tornado.gen.sleep(1) 738 except Exception as exc: # pylint: disable=broad-except 739 log.error("Exception occurred while Subscriber connecting: %s", exc) 740 yield salt.ext.tornado.gen.sleep(1) 741 res = yield self._read(timeout) 742 raise salt.ext.tornado.gen.Return(res) 743 744 def read_sync(self, timeout=None): 745 """ 746 Read a message from an IPC socket 747 748 The socket must already be connected. 749 The associated IO Loop must NOT be running. 750 :param int timeout: Timeout when receiving message 751 :return: message data if successful. None if timed out. Will raise an 752 exception for all other error conditions. 753 """ 754 if self._saved_data: 755 return self._saved_data.pop(0) 756 return self.io_loop.run_sync(lambda: self._read(timeout)) 757 758 @salt.ext.tornado.gen.coroutine 759 def read_async(self, callback): 760 """ 761 Asynchronously read messages and invoke a callback when they are ready. 762 763 :param callback: A callback with the received data 764 """ 765 while not self.connected(): 766 try: 767 yield self.connect(timeout=5) 768 except StreamClosedError: 769 log.trace( 770 "Subscriber closed stream on IPC %s before connect", 771 self.socket_path, 772 ) 773 yield salt.ext.tornado.gen.sleep(1) 774 except Exception as exc: # pylint: disable=broad-except 775 log.error("Exception occurred while Subscriber connecting: %s", exc) 776 yield salt.ext.tornado.gen.sleep(1) 777 yield self._read(None, callback) 778 779 def close(self): 780 """ 781 Routines to handle any cleanup before the instance shuts down. 782 Sockets and filehandles should be closed explicitly, to prevent 783 leaks. 784 """ 785 if self._closing: 786 return 787 super().close() 788 # This will prevent this message from showing up: 789 # '[ERROR ] Future exception was never retrieved: 790 # StreamClosedError' 791 if self._read_stream_future is not None and self._read_stream_future.done(): 792 exc = self._read_stream_future.exception() 793 if exc and not isinstance(exc, StreamClosedError): 794 log.error("Read future returned exception %r", exc) 795