xref: /qemu/python/qemu/machine/console_socket.py (revision 5db05230)
1"""
2QEMU Console Socket Module:
3
4This python module implements a ConsoleSocket object,
5which can drain a socket and optionally dump the bytes to file.
6"""
7# Copyright 2020 Linaro
8#
9# Authors:
10#  Robert Foley <robert.foley@linaro.org>
11#
12# This code is licensed under the GPL version 2 or later.  See
13# the COPYING file in the top-level directory.
14#
15
16from collections import deque
17import socket
18import threading
19import time
20from typing import Deque, Optional
21
22
23class ConsoleSocket(socket.socket):
24    """
25    ConsoleSocket represents a socket attached to a char device.
26
27    :param address: An AF_UNIX path or address.
28    :param sock_fd: Optionally, an existing socket file descriptor.
29                    One of address or sock_fd must be specified.
30    :param file: Optionally, a filename to log to.
31    :param drain: Optionally, drains the socket and places the bytes
32                  into an in memory buffer for later processing.
33    """
34    def __init__(self,
35                 address: Optional[str] = None,
36                 sock_fd: Optional[int] = None,
37                 file: Optional[str] = None,
38                 drain: bool = False):
39        if address is None and sock_fd is None:
40            raise ValueError("one of 'address' or 'sock_fd' must be specified")
41        if address is not None and sock_fd is not None:
42            raise ValueError("can't specify both 'address' and 'sock_fd'")
43
44        self._recv_timeout_sec = 300.0
45        self._sleep_time = 0.5
46        self._buffer: Deque[int] = deque()
47        if address is not None:
48            socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
49            self.connect(address)
50        else:
51            assert sock_fd is not None
52            socket.socket.__init__(self, fileno=sock_fd)
53        self._logfile = None
54        if file:
55            # pylint: disable=consider-using-with
56            self._logfile = open(file, "bw")
57        self._open = True
58        self._drain_thread = None
59        if drain:
60            self._drain_thread = self._thread_start()
61
62    def __repr__(self) -> str:
63        tmp = super().__repr__()
64        tmp = tmp.rstrip(">")
65        tmp = "%s,  logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
66                                                     self._drain_thread)
67        return tmp
68
69    def _drain_fn(self) -> None:
70        """Drains the socket and runs while the socket is open."""
71        while self._open:
72            try:
73                self._drain_socket()
74            except socket.timeout:
75                # The socket is expected to timeout since we set a
76                # short timeout to allow the thread to exit when
77                # self._open is set to False.
78                time.sleep(self._sleep_time)
79
80    def _thread_start(self) -> threading.Thread:
81        """Kick off a thread to drain the socket."""
82        # Configure socket to not block and timeout.
83        # This allows our drain thread to not block
84        # on receive and exit smoothly.
85        socket.socket.setblocking(self, False)
86        socket.socket.settimeout(self, 1)
87        drain_thread = threading.Thread(target=self._drain_fn)
88        drain_thread.daemon = True
89        drain_thread.start()
90        return drain_thread
91
92    def close(self) -> None:
93        """Close the base object and wait for the thread to terminate"""
94        if self._open:
95            self._open = False
96            if self._drain_thread is not None:
97                thread, self._drain_thread = self._drain_thread, None
98                thread.join()
99            socket.socket.close(self)
100            if self._logfile:
101                self._logfile.close()
102                self._logfile = None
103
104    def _drain_socket(self) -> None:
105        """process arriving characters into in memory _buffer"""
106        data = socket.socket.recv(self, 1)
107        if self._logfile:
108            self._logfile.write(data)
109            self._logfile.flush()
110        self._buffer.extend(data)
111
112    def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
113        """Return chars from in memory buffer.
114           Maintains the same API as socket.socket.recv.
115        """
116        if self._drain_thread is None:
117            # Not buffering the socket, pass thru to socket.
118            return socket.socket.recv(self, bufsize, flags)
119        assert not flags, "Cannot pass flags to recv() in drained mode"
120        start_time = time.time()
121        while len(self._buffer) < bufsize:
122            time.sleep(self._sleep_time)
123            elapsed_sec = time.time() - start_time
124            if elapsed_sec > self._recv_timeout_sec:
125                raise socket.timeout
126        return bytes((self._buffer.popleft() for i in range(bufsize)))
127
128    def setblocking(self, value: bool) -> None:
129        """When not draining we pass thru to the socket,
130           since when draining we control socket blocking.
131        """
132        if self._drain_thread is None:
133            socket.socket.setblocking(self, value)
134
135    def settimeout(self, value: Optional[float]) -> None:
136        """When not draining we pass thru to the socket,
137           since when draining we control the timeout.
138        """
139        if value is not None:
140            self._recv_timeout_sec = value
141        if self._drain_thread is None:
142            socket.socket.settimeout(self, value)
143