xref: /qemu/python/qemu/machine/console_socket.py (revision b355f08a)
1"""
2QEMU Console Socket Module:
3
4This python module implements a ConsoleSocket object,
5which can drain a socket and optionally dump the bytes to file.
6"""
7# Copyright 2020 Linaro
8#
9# Authors:
10#  Robert Foley <robert.foley@linaro.org>
11#
12# This code is licensed under the GPL version 2 or later.  See
13# the COPYING file in the top-level directory.
14#
15
16from collections import deque
17import socket
18import threading
19import time
20from typing import Deque, Optional
21
22
23class ConsoleSocket(socket.socket):
24    """
25    ConsoleSocket represents a socket attached to a char device.
26
27    Optionally (if drain==True), drains the socket and places the bytes
28    into an in memory buffer for later processing.
29
30    Optionally a file path can be passed in and we will also
31    dump the characters to this file for debugging purposes.
32    """
33    def __init__(self, address: str, file: Optional[str] = None,
34                 drain: bool = False):
35        self._recv_timeout_sec = 300.0
36        self._sleep_time = 0.5
37        self._buffer: Deque[int] = deque()
38        socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
39        self.connect(address)
40        self._logfile = None
41        if file:
42            # pylint: disable=consider-using-with
43            self._logfile = open(file, "bw")
44        self._open = True
45        self._drain_thread = None
46        if drain:
47            self._drain_thread = self._thread_start()
48
49    def __repr__(self) -> str:
50        tmp = super().__repr__()
51        tmp = tmp.rstrip(">")
52        tmp = "%s,  logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
53                                                     self._drain_thread)
54        return tmp
55
56    def _drain_fn(self) -> None:
57        """Drains the socket and runs while the socket is open."""
58        while self._open:
59            try:
60                self._drain_socket()
61            except socket.timeout:
62                # The socket is expected to timeout since we set a
63                # short timeout to allow the thread to exit when
64                # self._open is set to False.
65                time.sleep(self._sleep_time)
66
67    def _thread_start(self) -> threading.Thread:
68        """Kick off a thread to drain the socket."""
69        # Configure socket to not block and timeout.
70        # This allows our drain thread to not block
71        # on recieve and exit smoothly.
72        socket.socket.setblocking(self, False)
73        socket.socket.settimeout(self, 1)
74        drain_thread = threading.Thread(target=self._drain_fn)
75        drain_thread.daemon = True
76        drain_thread.start()
77        return drain_thread
78
79    def close(self) -> None:
80        """Close the base object and wait for the thread to terminate"""
81        if self._open:
82            self._open = False
83            if self._drain_thread is not None:
84                thread, self._drain_thread = self._drain_thread, None
85                thread.join()
86            socket.socket.close(self)
87            if self._logfile:
88                self._logfile.close()
89                self._logfile = None
90
91    def _drain_socket(self) -> None:
92        """process arriving characters into in memory _buffer"""
93        data = socket.socket.recv(self, 1)
94        if self._logfile:
95            self._logfile.write(data)
96            self._logfile.flush()
97        self._buffer.extend(data)
98
99    def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
100        """Return chars from in memory buffer.
101           Maintains the same API as socket.socket.recv.
102        """
103        if self._drain_thread is None:
104            # Not buffering the socket, pass thru to socket.
105            return socket.socket.recv(self, bufsize, flags)
106        assert not flags, "Cannot pass flags to recv() in drained mode"
107        start_time = time.time()
108        while len(self._buffer) < bufsize:
109            time.sleep(self._sleep_time)
110            elapsed_sec = time.time() - start_time
111            if elapsed_sec > self._recv_timeout_sec:
112                raise socket.timeout
113        return bytes((self._buffer.popleft() for i in range(bufsize)))
114
115    def setblocking(self, value: bool) -> None:
116        """When not draining we pass thru to the socket,
117           since when draining we control socket blocking.
118        """
119        if self._drain_thread is None:
120            socket.socket.setblocking(self, value)
121
122    def settimeout(self, value: Optional[float]) -> None:
123        """When not draining we pass thru to the socket,
124           since when draining we control the timeout.
125        """
126        if value is not None:
127            self._recv_timeout_sec = value
128        if self._drain_thread is None:
129            socket.socket.settimeout(self, value)
130