1"""
2IPC transport classes
3"""
4
5
6import errno
7import logging
8import socket
9import time
10
11import salt.ext.tornado
12import salt.ext.tornado.concurrent
13import salt.ext.tornado.gen
14import salt.ext.tornado.ioloop
15import salt.ext.tornado.netutil
16import salt.transport.client
17import salt.transport.frame
18import salt.utils.msgpack
19from salt.ext.tornado.ioloop import IOLoop
20from salt.ext.tornado.ioloop import TimeoutError as TornadoTimeoutError
21from salt.ext.tornado.iostream import IOStream, StreamClosedError
22from salt.ext.tornado.locks import Lock
23
24log = logging.getLogger(__name__)
25
26
27# 'tornado.concurrent.Future' doesn't support
28# remove_done_callback() which we would have called
29# in the timeout case. Due to this, we have this
30# callback function outside of FutureWithTimeout.
31def future_with_timeout_callback(future):
32    if future._future_with_timeout is not None:
33        future._future_with_timeout._done_callback(future)
34
35
36class FutureWithTimeout(salt.ext.tornado.concurrent.Future):
37    def __init__(self, io_loop, future, timeout):
38        super().__init__()
39        self.io_loop = io_loop
40        self._future = future
41        if timeout is not None:
42            if timeout < 0.1:
43                timeout = 0.1
44            self._timeout_handle = self.io_loop.add_timeout(
45                self.io_loop.time() + timeout, self._timeout_callback
46            )
47        else:
48            self._timeout_handle = None
49
50        if hasattr(self._future, "_future_with_timeout"):
51            # Reusing a future that has previously been used.
52            # Due to this, no need to call add_done_callback()
53            # because we did that before.
54            self._future._future_with_timeout = self
55            if self._future.done():
56                future_with_timeout_callback(self._future)
57        else:
58            self._future._future_with_timeout = self
59            self._future.add_done_callback(future_with_timeout_callback)
60
61    def _timeout_callback(self):
62        self._timeout_handle = None
63        # 'tornado.concurrent.Future' doesn't support
64        # remove_done_callback(). So we set an attribute
65        # inside the future itself to track what happens
66        # when it completes.
67        self._future._future_with_timeout = None
68        self.set_exception(TornadoTimeoutError())
69
70    def _done_callback(self, future):
71        try:
72            if self._timeout_handle is not None:
73                self.io_loop.remove_timeout(self._timeout_handle)
74                self._timeout_handle = None
75
76            self.set_result(future.result())
77        except Exception as exc:  # pylint: disable=broad-except
78            self.set_exception(exc)
79
80
81class IPCServer:
82    """
83    A Tornado IPC server very similar to Tornado's TCPServer class
84    but using either UNIX domain sockets or TCP sockets
85    """
86
87    async_methods = [
88        "handle_stream",
89    ]
90    close_methods = [
91        "close",
92    ]
93
94    def __init__(self, socket_path, io_loop=None, payload_handler=None):
95        """
96        Create a new Tornado IPC server
97
98        :param str/int socket_path: Path on the filesystem for the
99                                    socket to bind to. This socket does
100                                    not need to exist prior to calling
101                                    this method, but parent directories
102                                    should.
103                                    It may also be of type 'int', in
104                                    which case it is used as the port
105                                    for a tcp localhost connection.
106        :param IOLoop io_loop: A Tornado ioloop to handle scheduling
107        :param func payload_handler: A function to customize handling of
108                                     incoming data.
109        """
110        self.socket_path = socket_path
111        self._started = False
112        self.payload_handler = payload_handler
113
114        # Placeholders for attributes to be populated by method calls
115        self.sock = None
116        self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
117        self._closing = False
118
119    def start(self):
120        """
121        Perform the work necessary to start up a Tornado IPC server
122
123        Blocks until socket is established
124        """
125        # Start up the ioloop
126        log.trace("IPCServer: binding to socket: %s", self.socket_path)
127        if isinstance(self.socket_path, int):
128            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
130            self.sock.setblocking(0)
131            self.sock.bind(("127.0.0.1", self.socket_path))
132            # Based on default used in tornado.netutil.bind_sockets()
133            self.sock.listen(128)
134        else:
135            self.sock = salt.ext.tornado.netutil.bind_unix_socket(self.socket_path)
136
137        with salt.utils.asynchronous.current_ioloop(self.io_loop):
138            salt.ext.tornado.netutil.add_accept_handler(
139                self.sock,
140                self.handle_connection,
141            )
142        self._started = True
143
144    @salt.ext.tornado.gen.coroutine
145    def handle_stream(self, stream):
146        """
147        Override this to handle the streams as they arrive
148
149        :param IOStream stream: An IOStream for processing
150
151        See https://tornado.readthedocs.io/en/latest/iostream.html#tornado.iostream.IOStream
152        for additional details.
153        """
154
155        @salt.ext.tornado.gen.coroutine
156        def _null(msg):
157            raise salt.ext.tornado.gen.Return(None)
158
159        def write_callback(stream, header):
160            if header.get("mid"):
161
162                @salt.ext.tornado.gen.coroutine
163                def return_message(msg):
164                    pack = salt.transport.frame.frame_msg_ipc(
165                        msg,
166                        header={"mid": header["mid"]},
167                        raw_body=True,
168                    )
169                    yield stream.write(pack)
170
171                return return_message
172            else:
173                return _null
174
175        # msgpack deprecated `encoding` starting with version 0.5.2
176        if salt.utils.msgpack.version >= (0, 5, 2):
177            # Under Py2 we still want raw to be set to True
178            msgpack_kwargs = {"raw": False}
179        else:
180            msgpack_kwargs = {"encoding": "utf-8"}
181        unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)
182        while not stream.closed():
183            try:
184                wire_bytes = yield stream.read_bytes(4096, partial=True)
185                unpacker.feed(wire_bytes)
186                for framed_msg in unpacker:
187                    body = framed_msg["body"]
188                    self.io_loop.spawn_callback(
189                        self.payload_handler,
190                        body,
191                        write_callback(stream, framed_msg["head"]),
192                    )
193            except StreamClosedError:
194                log.trace("Client disconnected from IPC %s", self.socket_path)
195                break
196            except OSError as exc:
197                # On occasion an exception will occur with
198                # an error code of 0, it's a spurious exception.
199                if exc.errno == 0:
200                    log.trace(
201                        "Exception occurred with error number 0, "
202                        "spurious exception: %s",
203                        exc,
204                    )
205                else:
206                    log.error("Exception occurred while handling stream: %s", exc)
207            except Exception as exc:  # pylint: disable=broad-except
208                log.error("Exception occurred while handling stream: %s", exc)
209
210    def handle_connection(self, connection, address):
211        log.trace("IPCServer: Handling connection to address: %s", address)
212        try:
213            with salt.utils.asynchronous.current_ioloop(self.io_loop):
214                stream = IOStream(
215                    connection,
216                )
217            self.io_loop.spawn_callback(self.handle_stream, stream)
218        except Exception as exc:  # pylint: disable=broad-except
219            log.error("IPC streaming error: %s", exc)
220
221    def close(self):
222        """
223        Routines to handle any cleanup before the instance shuts down.
224        Sockets and filehandles should be closed explicitly, to prevent
225        leaks.
226        """
227        if self._closing:
228            return
229        self._closing = True
230        if hasattr(self.sock, "close"):
231            self.sock.close()
232
233    # pylint: disable=W1701
234    def __del__(self):
235        try:
236            self.close()
237        except TypeError:
238            # This is raised when Python's GC has collected objects which
239            # would be needed when calling self.close()
240            pass
241
242    # pylint: enable=W1701
243
244    def __enter__(self):
245        return self
246
247    def __exit__(self, *args):
248        self.close()
249
250
251class IPCClient:
252    """
253    A Tornado IPC client very similar to Tornado's TCPClient class
254    but using either UNIX domain sockets or TCP sockets
255
256    This was written because Tornado does not have its own IPC
257    server/client implementation.
258
259    :param IOLoop io_loop: A Tornado ioloop to handle scheduling
260    :param str/int socket_path: A path on the filesystem where a socket
261                                belonging to a running IPCServer can be
262                                found.
263                                It may also be of type 'int', in which
264                                case it is used as the port for a tcp
265                                localhost connection.
266    """
267
268    def __init__(self, socket_path, io_loop=None):
269        """
270        Create a new IPC client
271
272        IPC clients cannot bind to ports, but must connect to
273        existing IPC servers. Clients can then send messages
274        to the server.
275
276        """
277        self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
278        self.socket_path = socket_path
279        self._closing = False
280        self.stream = None
281        # msgpack deprecated `encoding` starting with version 0.5.2
282        if salt.utils.msgpack.version >= (0, 5, 2):
283            # Under Py2 we still want raw to be set to True
284            msgpack_kwargs = {"raw": False}
285        else:
286            msgpack_kwargs = {"encoding": "utf-8"}
287        self.unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)
288        self._connecting_future = None
289
290    def connected(self):
291        return self.stream is not None and not self.stream.closed()
292
293    def connect(self, callback=None, timeout=None):
294        """
295        Connect to the IPC socket
296        """
297        if self._connecting_future is not None and not self._connecting_future.done():
298            future = self._connecting_future
299        else:
300            if self._connecting_future is not None:
301                # read previous future result to prevent the "unhandled future exception" error
302                self._connecting_future.exception()  # pylint: disable=E0203
303            future = salt.ext.tornado.concurrent.Future()
304            self._connecting_future = future
305            self._connect(timeout)
306
307        if callback is not None:
308
309            def handle_future(future):
310                response = future.result()
311                self.io_loop.add_callback(callback, response)
312
313            future.add_done_callback(handle_future)
314
315        return future
316
317    @salt.ext.tornado.gen.coroutine
318    def _connect(self, timeout=None):
319        """
320        Connect to a running IPCServer
321        """
322        if isinstance(self.socket_path, int):
323            sock_type = socket.AF_INET
324            sock_addr = ("127.0.0.1", self.socket_path)
325        else:
326            sock_type = socket.AF_UNIX
327            sock_addr = self.socket_path
328
329        self.stream = None
330        if timeout is not None:
331            timeout_at = time.time() + timeout
332
333        while True:
334            if self._closing:
335                break
336
337            if self.stream is None:
338                with salt.utils.asynchronous.current_ioloop(self.io_loop):
339                    self.stream = IOStream(socket.socket(sock_type, socket.SOCK_STREAM))
340            try:
341                log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
342                yield self.stream.connect(sock_addr)
343                self._connecting_future.set_result(True)
344                break
345            except Exception as e:  # pylint: disable=broad-except
346                if self.stream.closed():
347                    self.stream = None
348
349                if timeout is None or time.time() > timeout_at:
350                    if self.stream is not None:
351                        self.stream.close()
352                        self.stream = None
353                    self._connecting_future.set_exception(e)
354                    break
355
356                yield salt.ext.tornado.gen.sleep(1)
357
358    def close(self):
359        """
360        Routines to handle any cleanup before the instance shuts down.
361        Sockets and filehandles should be closed explicitly, to prevent
362        leaks.
363        """
364        if self._closing:
365            return
366
367        self._closing = True
368        self._connecting_future = None
369
370        log.debug("Closing %s instance", self.__class__.__name__)
371
372        if self.stream is not None and not self.stream.closed():
373            try:
374                self.stream.close()
375            except OSError as exc:
376                if exc.errno != errno.EBADF:
377                    # If its not a bad file descriptor error, raise
378                    raise
379
380    # pylint: disable=W1701
381    def __del__(self):
382        try:
383            self.close()
384        except TypeError:
385            # This is raised when Python's GC has collected objects which
386            # would be needed when calling self.close()
387            pass
388
389    # pylint: enable=W1701
390
391    def __enter__(self):
392        return self
393
394    def __exit__(self, *args):
395        self.close()
396
397
398class IPCMessageClient(IPCClient):
399    """
400    Salt IPC message client
401
402    Create an IPC client to send messages to an IPC server
403
404    An example of a very simple IPCMessageClient connecting to an IPCServer. This
405    example assumes an already running IPCMessage server.
406
407    IMPORTANT: The below example also assumes a running IOLoop process.
408
409    # Import Tornado libs
410    import salt.ext.tornado.ioloop
411
412    # Import Salt libs
413    import salt.config
414    import salt.transport.ipc
415
416    io_loop = salt.ext.tornado.ioloop.IOLoop.current()
417
418    ipc_server_socket_path = '/var/run/ipc_server.ipc'
419
420    ipc_client = salt.transport.ipc.IPCMessageClient(ipc_server_socket_path, io_loop=io_loop)
421
422    # Connect to the server
423    ipc_client.connect()
424
425    # Send some data
426    ipc_client.send('Hello world')
427    """
428
429    async_methods = [
430        "send",
431        "connect",
432        "_connect",
433    ]
434    close_methods = [
435        "close",
436    ]
437
438    # FIXME timeout unimplemented
439    # FIXME tries unimplemented
440    @salt.ext.tornado.gen.coroutine
441    def send(self, msg, timeout=None, tries=None):
442        """
443        Send a message to an IPC socket
444
445        If the socket is not currently connected, a connection will be established.
446
447        :param dict msg: The message to be sent
448        :param int timeout: Timeout when sending message (Currently unimplemented)
449        """
450        if not self.connected():
451            yield self.connect()
452        pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
453        yield self.stream.write(pack)
454
455
456class IPCMessageServer(IPCServer):
457    """
458    Salt IPC message server
459
460    Creates a message server which can create and bind to a socket on a given
461    path and then respond to messages asynchronously.
462
463    An example of a very simple IPCServer which prints received messages to
464    a console:
465
466        # Import Tornado libs
467        import salt.ext.tornado.ioloop
468
469        # Import Salt libs
470        import salt.transport.ipc
471
472        io_loop = salt.ext.tornado.ioloop.IOLoop.current()
473        ipc_server_socket_path = '/var/run/ipc_server.ipc'
474        ipc_server = salt.transport.ipc.IPCMessageServer(ipc_server_socket_path, io_loop=io_loop,
475                                                         payload_handler=print_to_console)
476        # Bind to the socket and prepare to run
477        ipc_server.start()
478
479        # Start the server
480        io_loop.start()
481
482        # This callback is run whenever a message is received
483        def print_to_console(payload):
484            print(payload)
485
486    See IPCMessageClient() for an example of sending messages to an IPCMessageServer instance
487    """
488
489
490class IPCMessagePublisher:
491    """
492    A Tornado IPC Publisher similar to Tornado's TCPServer class
493    but using either UNIX domain sockets or TCP sockets
494    """
495
496    def __init__(self, opts, socket_path, io_loop=None):
497        """
498        Create a new Tornado IPC server
499        :param dict opts: Salt options
500        :param str/int socket_path: Path on the filesystem for the
501                                    socket to bind to. This socket does
502                                    not need to exist prior to calling
503                                    this method, but parent directories
504                                    should.
505                                    It may also be of type 'int', in
506                                    which case it is used as the port
507                                    for a tcp localhost connection.
508        :param IOLoop io_loop: A Tornado ioloop to handle scheduling
509        """
510        self.opts = opts
511        self.socket_path = socket_path
512        self._started = False
513
514        # Placeholders for attributes to be populated by method calls
515        self.sock = None
516        self.io_loop = io_loop or IOLoop.current()
517        self._closing = False
518        self.streams = set()
519
520    def start(self):
521        """
522        Perform the work necessary to start up a Tornado IPC server
523
524        Blocks until socket is established
525        """
526        # Start up the ioloop
527        log.trace("IPCMessagePublisher: binding to socket: %s", self.socket_path)
528        if isinstance(self.socket_path, int):
529            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
530            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
531            self.sock.setblocking(0)
532            self.sock.bind(("127.0.0.1", self.socket_path))
533            # Based on default used in salt.ext.tornado.netutil.bind_sockets()
534            self.sock.listen(128)
535        else:
536            self.sock = salt.ext.tornado.netutil.bind_unix_socket(self.socket_path)
537
538        with salt.utils.asynchronous.current_ioloop(self.io_loop):
539            salt.ext.tornado.netutil.add_accept_handler(
540                self.sock,
541                self.handle_connection,
542            )
543        self._started = True
544
545    @salt.ext.tornado.gen.coroutine
546    def _write(self, stream, pack):
547        try:
548            yield stream.write(pack)
549        except StreamClosedError:
550            log.trace("Client disconnected from IPC %s", self.socket_path)
551            self.streams.discard(stream)
552        except Exception as exc:  # pylint: disable=broad-except
553            log.error("Exception occurred while handling stream: %s", exc)
554            if not stream.closed():
555                stream.close()
556            self.streams.discard(stream)
557
558    def publish(self, msg):
559        """
560        Send message to all connected sockets
561        """
562        if not self.streams:
563            return
564
565        pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
566        for stream in self.streams:
567            self.io_loop.spawn_callback(self._write, stream, pack)
568
569    def handle_connection(self, connection, address):
570        log.trace("IPCServer: Handling connection to address: %s", address)
571        try:
572            kwargs = {}
573            if self.opts["ipc_write_buffer"] > 0:
574                kwargs["max_write_buffer_size"] = self.opts["ipc_write_buffer"]
575                log.trace(
576                    "Setting IPC connection write buffer: %s",
577                    (self.opts["ipc_write_buffer"]),
578                )
579            with salt.utils.asynchronous.current_ioloop(self.io_loop):
580                stream = IOStream(connection, **kwargs)
581            self.streams.add(stream)
582
583            def discard_after_closed():
584                self.streams.discard(stream)
585
586            stream.set_close_callback(discard_after_closed)
587        except Exception as exc:  # pylint: disable=broad-except
588            log.error("IPC streaming error: %s", exc)
589
590    def close(self):
591        """
592        Routines to handle any cleanup before the instance shuts down.
593        Sockets and filehandles should be closed explicitly, to prevent
594        leaks.
595        """
596        if self._closing:
597            return
598        self._closing = True
599        for stream in self.streams:
600            stream.close()
601        self.streams.clear()
602        if hasattr(self.sock, "close"):
603            self.sock.close()
604
605    def __enter__(self):
606        return self
607
608    def __exit__(self, *args):
609        self.close()
610
611
612class IPCMessageSubscriber(IPCClient):
613    """
614    Salt IPC message subscriber
615
616    Create an IPC client to receive messages from IPC publisher
617
618    An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher.
619    This example assumes an already running IPCMessagePublisher.
620
621    IMPORTANT: The below example also assumes the IOLoop is NOT running.
622
623    # Import Tornado libs
624    import salt.ext.tornado.ioloop
625
626    # Import Salt libs
627    import salt.config
628    import salt.transport.ipc
629
630    # Create a new IO Loop.
631    # We know that this new IO Loop is not currently running.
632    io_loop = salt.ext.tornado.ioloop.IOLoop()
633
634    ipc_publisher_socket_path = '/var/run/ipc_publisher.ipc'
635
636    ipc_subscriber = salt.transport.ipc.IPCMessageSubscriber(ipc_server_socket_path, io_loop=io_loop)
637
638    # Connect to the server
639    # Use the associated IO Loop that isn't running.
640    io_loop.run_sync(ipc_subscriber.connect)
641
642    # Wait for some data
643    package = ipc_subscriber.read_sync()
644    """
645
646    async_methods = [
647        "read",
648        "connect",
649    ]
650    close_methods = [
651        "close",
652    ]
653
654    def __init__(self, socket_path, io_loop=None):
655        super().__init__(socket_path, io_loop=io_loop)
656        self._read_stream_future = None
657        self._saved_data = []
658        self._read_in_progress = Lock()
659
660    @salt.ext.tornado.gen.coroutine
661    def _read(self, timeout, callback=None):
662        try:
663            yield self._read_in_progress.acquire(timeout=0.00000001)
664        except salt.ext.tornado.gen.TimeoutError:
665            raise salt.ext.tornado.gen.Return(None)
666
667        exc_to_raise = None
668        ret = None
669        try:
670            while True:
671                if self._read_stream_future is None:
672                    self._read_stream_future = self.stream.read_bytes(
673                        4096, partial=True
674                    )
675
676                if timeout is None:
677                    wire_bytes = yield self._read_stream_future
678                else:
679                    wire_bytes = yield FutureWithTimeout(
680                        self.io_loop, self._read_stream_future, timeout
681                    )
682                self._read_stream_future = None
683
684                # Remove the timeout once we get some data or an exception
685                # occurs. We will assume that the rest of the data is already
686                # there or is coming soon if an exception doesn't occur.
687                timeout = None
688
689                self.unpacker.feed(wire_bytes)
690                first_sync_msg = True
691                for framed_msg in self.unpacker:
692                    if callback:
693                        self.io_loop.spawn_callback(callback, framed_msg["body"])
694                    elif first_sync_msg:
695                        ret = framed_msg["body"]
696                        first_sync_msg = False
697                    else:
698                        self._saved_data.append(framed_msg["body"])
699                if not first_sync_msg:
700                    # We read at least one piece of data and we're on sync run
701                    break
702        except TornadoTimeoutError:
703            # In the timeout case, just return None.
704            # Keep 'self._read_stream_future' alive.
705            ret = None
706        except StreamClosedError as exc:
707            log.trace("Subscriber disconnected from IPC %s", self.socket_path)
708            self._read_stream_future = None
709        except Exception as exc:  # pylint: disable=broad-except
710            log.error("Exception occurred in Subscriber while handling stream: %s", exc)
711            self._read_stream_future = None
712            exc_to_raise = exc
713
714        self._read_in_progress.release()
715
716        if exc_to_raise is not None:
717            raise exc_to_raise  # pylint: disable=E0702
718        raise salt.ext.tornado.gen.Return(ret)
719
720    @salt.ext.tornado.gen.coroutine
721    def read(self, timeout):
722        """
723        Asynchronously read messages and invoke a callback when they are ready.
724        :param callback: A callback with the received data
725        """
726        if self._saved_data:
727            res = self._saved_data.pop(0)
728            raise salt.ext.tornado.gen.Return(res)
729        while not self.connected():
730            try:
731                yield self.connect(timeout=5)
732            except StreamClosedError:
733                log.trace(
734                    "Subscriber closed stream on IPC %s before connect",
735                    self.socket_path,
736                )
737                yield salt.ext.tornado.gen.sleep(1)
738            except Exception as exc:  # pylint: disable=broad-except
739                log.error("Exception occurred while Subscriber connecting: %s", exc)
740                yield salt.ext.tornado.gen.sleep(1)
741        res = yield self._read(timeout)
742        raise salt.ext.tornado.gen.Return(res)
743
744    def read_sync(self, timeout=None):
745        """
746        Read a message from an IPC socket
747
748        The socket must already be connected.
749        The associated IO Loop must NOT be running.
750        :param int timeout: Timeout when receiving message
751        :return: message data if successful. None if timed out. Will raise an
752                 exception for all other error conditions.
753        """
754        if self._saved_data:
755            return self._saved_data.pop(0)
756        return self.io_loop.run_sync(lambda: self._read(timeout))
757
758    @salt.ext.tornado.gen.coroutine
759    def read_async(self, callback):
760        """
761        Asynchronously read messages and invoke a callback when they are ready.
762
763        :param callback: A callback with the received data
764        """
765        while not self.connected():
766            try:
767                yield self.connect(timeout=5)
768            except StreamClosedError:
769                log.trace(
770                    "Subscriber closed stream on IPC %s before connect",
771                    self.socket_path,
772                )
773                yield salt.ext.tornado.gen.sleep(1)
774            except Exception as exc:  # pylint: disable=broad-except
775                log.error("Exception occurred while Subscriber connecting: %s", exc)
776                yield salt.ext.tornado.gen.sleep(1)
777        yield self._read(None, callback)
778
779    def close(self):
780        """
781        Routines to handle any cleanup before the instance shuts down.
782        Sockets and filehandles should be closed explicitly, to prevent
783        leaks.
784        """
785        if self._closing:
786            return
787        super().close()
788        # This will prevent this message from showing up:
789        # '[ERROR   ] Future exception was never retrieved:
790        # StreamClosedError'
791        if self._read_stream_future is not None and self._read_stream_future.done():
792            exc = self._read_stream_future.exception()
793            if exc and not isinstance(exc, StreamClosedError):
794                log.error("Read future returned exception %r", exc)
795