1"""
2QEMU qtest library
3
4qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
5offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
6subclass of QEMUMachine, respectively.
7"""
8
9# Copyright (C) 2015 Red Hat Inc.
10#
11# Authors:
12#  Fam Zheng <famz@redhat.com>
13#
14# This work is licensed under the terms of the GNU GPL, version 2.  See
15# the COPYING file in the top-level directory.
16#
17# Based on qmp.py.
18#
19
20import os
21import socket
22from typing import (
23    List,
24    Optional,
25    Sequence,
26    TextIO,
27)
28
29from .machine import QEMUMachine
30from .qmp import SocketAddrT
31
32
33class QEMUQtestProtocol:
34    """
35    QEMUQtestProtocol implements a connection to a qtest socket.
36
37    :param address: QEMU address, can be either a unix socket path (string)
38                    or a tuple in the form ( address, port ) for a TCP
39                    connection
40    :param server: server mode, listens on the socket (bool)
41    :raise socket.error: on socket connection errors
42
43    .. note::
44       No conection is estabalished by __init__(), this is done
45       by the connect() or accept() methods.
46    """
47    def __init__(self, address: SocketAddrT,
48                 server: bool = False):
49        self._address = address
50        self._sock = self._get_sock()
51        self._sockfile: Optional[TextIO] = None
52        if server:
53            self._sock.bind(self._address)
54            self._sock.listen(1)
55
56    def _get_sock(self) -> socket.socket:
57        if isinstance(self._address, tuple):
58            family = socket.AF_INET
59        else:
60            family = socket.AF_UNIX
61        return socket.socket(family, socket.SOCK_STREAM)
62
63    def connect(self) -> None:
64        """
65        Connect to the qtest socket.
66
67        @raise socket.error on socket connection errors
68        """
69        self._sock.connect(self._address)
70        self._sockfile = self._sock.makefile(mode='r')
71
72    def accept(self) -> None:
73        """
74        Await connection from QEMU.
75
76        @raise socket.error on socket connection errors
77        """
78        self._sock, _ = self._sock.accept()
79        self._sockfile = self._sock.makefile(mode='r')
80
81    def cmd(self, qtest_cmd: str) -> str:
82        """
83        Send a qtest command on the wire.
84
85        @param qtest_cmd: qtest command text to be sent
86        """
87        assert self._sockfile is not None
88        self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
89        resp = self._sockfile.readline()
90        return resp
91
92    def close(self) -> None:
93        """
94        Close this socket.
95        """
96        self._sock.close()
97        if self._sockfile:
98            self._sockfile.close()
99            self._sockfile = None
100
101    def settimeout(self, timeout: Optional[float]) -> None:
102        """Set a timeout, in seconds."""
103        self._sock.settimeout(timeout)
104
105
106class QEMUQtestMachine(QEMUMachine):
107    """
108    A QEMU VM, with a qtest socket available.
109    """
110
111    def __init__(self,
112                 binary: str,
113                 args: Sequence[str] = (),
114                 name: Optional[str] = None,
115                 test_dir: str = "/var/tmp",
116                 socket_scm_helper: Optional[str] = None,
117                 sock_dir: Optional[str] = None):
118        if name is None:
119            name = "qemu-%d" % os.getpid()
120        if sock_dir is None:
121            sock_dir = test_dir
122        super().__init__(binary, args, name=name, test_dir=test_dir,
123                         socket_scm_helper=socket_scm_helper,
124                         sock_dir=sock_dir)
125        self._qtest: Optional[QEMUQtestProtocol] = None
126        self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
127
128    @property
129    def _base_args(self) -> List[str]:
130        args = super()._base_args
131        args.extend([
132            '-qtest', f"unix:path={self._qtest_path}",
133            '-accel', 'qtest'
134        ])
135        return args
136
137    def _pre_launch(self) -> None:
138        super()._pre_launch()
139        self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
140
141    def _post_launch(self) -> None:
142        assert self._qtest is not None
143        super()._post_launch()
144        self._qtest.accept()
145
146    def _post_shutdown(self) -> None:
147        super()._post_shutdown()
148        self._remove_if_exists(self._qtest_path)
149
150    def qtest(self, cmd: str) -> str:
151        """
152        Send a qtest command to the guest.
153
154        :param cmd: qtest command to send
155        :return: qtest server response
156        """
157        if self._qtest is None:
158            raise RuntimeError("qtest socket not available")
159        return self._qtest.cmd(cmd)
160