1"""
2QEMU Monitor Protocol (QMP) development library & tooling.
3
4This package provides a fairly low-level class for communicating to QMP
5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
6QEMU Storage Daemon. This library is not intended for production use.
7
8`QEMUMonitorProtocol` is the primary class of interest, and all errors
9raised derive from `QMPError`.
10"""
11
12# Copyright (C) 2009, 2010 Red Hat Inc.
13#
14# Authors:
15#  Luiz Capitulino <lcapitulino@redhat.com>
16#
17# This work is licensed under the terms of the GNU GPL, version 2.  See
18# the COPYING file in the top-level directory.
19
20import errno
21import json
22import logging
23import socket
24from types import TracebackType
25from typing import (
26    Any,
27    Dict,
28    List,
29    Optional,
30    TextIO,
31    Tuple,
32    Type,
33    TypeVar,
34    Union,
35    cast,
36)
37
38
39#: QMPMessage is an entire QMP message of any kind.
40QMPMessage = Dict[str, Any]
41
42#: QMPReturnValue is the 'return' value of a command.
43QMPReturnValue = object
44
45#: QMPObject is any object in a QMP message.
46QMPObject = Dict[str, object]
47
48# QMPMessage can be outgoing commands or incoming events/returns.
49# QMPReturnValue is usually a dict/json object, but due to QAPI's
50# 'returns-whitelist', it can actually be anything.
51#
52# {'return': {}} is a QMPMessage,
53# {} is the QMPReturnValue.
54
55
56InternetAddrT = Tuple[str, int]
57UnixAddrT = str
58SocketAddrT = Union[InternetAddrT, UnixAddrT]
59
60
61class QMPError(Exception):
62    """
63    QMP base exception
64    """
65
66
67class QMPConnectError(QMPError):
68    """
69    QMP connection exception
70    """
71
72
73class QMPCapabilitiesError(QMPError):
74    """
75    QMP negotiate capabilities exception
76    """
77
78
79class QMPTimeoutError(QMPError):
80    """
81    QMP timeout exception
82    """
83
84
85class QMPProtocolError(QMPError):
86    """
87    QMP protocol error; unexpected response
88    """
89
90
91class QMPResponseError(QMPError):
92    """
93    Represents erroneous QMP monitor reply
94    """
95    def __init__(self, reply: QMPMessage):
96        try:
97            desc = reply['error']['desc']
98        except KeyError:
99            desc = reply
100        super().__init__(desc)
101        self.reply = reply
102
103
104class QMPBadPortError(QMPError):
105    """
106    Unable to parse socket address: Port was non-numerical.
107    """
108
109
110class QEMUMonitorProtocol:
111    """
112    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
113    allow to handle commands and events.
114    """
115
116    #: Logger object for debugging messages
117    logger = logging.getLogger('QMP')
118
119    def __init__(self, address: SocketAddrT,
120                 server: bool = False,
121                 nickname: Optional[str] = None):
122        """
123        Create a QEMUMonitorProtocol class.
124
125        @param address: QEMU address, can be either a unix socket path (string)
126                        or a tuple in the form ( address, port ) for a TCP
127                        connection
128        @param server: server mode listens on the socket (bool)
129        @raise OSError on socket connection errors
130        @note No connection is established, this is done by the connect() or
131              accept() methods
132        """
133        self.__events: List[QMPMessage] = []
134        self.__address = address
135        self.__sock = self.__get_sock()
136        self.__sockfile: Optional[TextIO] = None
137        self._nickname = nickname
138        if self._nickname:
139            self.logger = logging.getLogger('QMP').getChild(self._nickname)
140        if server:
141            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
142            self.__sock.bind(self.__address)
143            self.__sock.listen(1)
144
145    def __get_sock(self) -> socket.socket:
146        if isinstance(self.__address, tuple):
147            family = socket.AF_INET
148        else:
149            family = socket.AF_UNIX
150        return socket.socket(family, socket.SOCK_STREAM)
151
152    def __negotiate_capabilities(self) -> QMPMessage:
153        greeting = self.__json_read()
154        if greeting is None or "QMP" not in greeting:
155            raise QMPConnectError
156        # Greeting seems ok, negotiate capabilities
157        resp = self.cmd('qmp_capabilities')
158        if resp and "return" in resp:
159            return greeting
160        raise QMPCapabilitiesError
161
162    def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
163        assert self.__sockfile is not None
164        while True:
165            data = self.__sockfile.readline()
166            if not data:
167                return None
168            # By definition, any JSON received from QMP is a QMPMessage,
169            # and we are asserting only at static analysis time that it
170            # has a particular shape.
171            resp: QMPMessage = json.loads(data)
172            if 'event' in resp:
173                self.logger.debug("<<< %s", resp)
174                self.__events.append(resp)
175                if not only_event:
176                    continue
177            return resp
178
179    def __get_events(self, wait: Union[bool, float] = False) -> None:
180        """
181        Check for new events in the stream and cache them in __events.
182
183        @param wait (bool): block until an event is available.
184        @param wait (float): If wait is a float, treat it as a timeout value.
185
186        @raise QMPTimeoutError: If a timeout float is provided and the timeout
187                                period elapses.
188        @raise QMPConnectError: If wait is True but no events could be
189                                retrieved or if some other error occurred.
190        """
191
192        # Current timeout and blocking status
193        current_timeout = self.__sock.gettimeout()
194
195        # Check for new events regardless and pull them into the cache:
196        self.__sock.settimeout(0)  # i.e. setblocking(False)
197        try:
198            self.__json_read()
199        except OSError as err:
200            # EAGAIN: No data available; not critical
201            if err.errno != errno.EAGAIN:
202                raise
203        finally:
204            self.__sock.settimeout(current_timeout)
205
206        # Wait for new events, if needed.
207        # if wait is 0.0, this means "no wait" and is also implicitly false.
208        if not self.__events and wait:
209            if isinstance(wait, float):
210                self.__sock.settimeout(wait)
211            try:
212                ret = self.__json_read(only_event=True)
213            except socket.timeout as err:
214                raise QMPTimeoutError("Timeout waiting for event") from err
215            except Exception as err:
216                msg = "Error while reading from socket"
217                raise QMPConnectError(msg) from err
218            finally:
219                self.__sock.settimeout(current_timeout)
220
221            if ret is None:
222                raise QMPConnectError("Error while reading from socket")
223
224    T = TypeVar('T')
225
226    def __enter__(self: T) -> T:
227        # Implement context manager enter function.
228        return self
229
230    def __exit__(self,
231                 # pylint: disable=duplicate-code
232                 # see https://github.com/PyCQA/pylint/issues/3619
233                 exc_type: Optional[Type[BaseException]],
234                 exc_val: Optional[BaseException],
235                 exc_tb: Optional[TracebackType]) -> None:
236        # Implement context manager exit function.
237        self.close()
238
239    @classmethod
240    def parse_address(cls, address: str) -> SocketAddrT:
241        """
242        Parse a string into a QMP address.
243
244        Figure out if the argument is in the port:host form.
245        If it's not, it's probably a file path.
246        """
247        components = address.split(':')
248        if len(components) == 2:
249            try:
250                port = int(components[1])
251            except ValueError:
252                msg = f"Bad port: '{components[1]}' in '{address}'."
253                raise QMPBadPortError(msg) from None
254            return (components[0], port)
255
256        # Treat as filepath.
257        return address
258
259    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
260        """
261        Connect to the QMP Monitor and perform capabilities negotiation.
262
263        @return QMP greeting dict, or None if negotiate is false
264        @raise OSError on socket connection errors
265        @raise QMPConnectError if the greeting is not received
266        @raise QMPCapabilitiesError if fails to negotiate capabilities
267        """
268        self.__sock.connect(self.__address)
269        self.__sockfile = self.__sock.makefile(mode='r')
270        if negotiate:
271            return self.__negotiate_capabilities()
272        return None
273
274    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
275        """
276        Await connection from QMP Monitor and perform capabilities negotiation.
277
278        @param timeout: timeout in seconds (nonnegative float number, or
279                        None). The value passed will set the behavior of the
280                        underneath QMP socket as described in [1].
281                        Default value is set to 15.0.
282
283        @return QMP greeting dict
284        @raise OSError on socket connection errors
285        @raise QMPConnectError if the greeting is not received
286        @raise QMPCapabilitiesError if fails to negotiate capabilities
287
288        [1]
289        https://docs.python.org/3/library/socket.html#socket.socket.settimeout
290        """
291        self.__sock.settimeout(timeout)
292        self.__sock, _ = self.__sock.accept()
293        self.__sockfile = self.__sock.makefile(mode='r')
294        return self.__negotiate_capabilities()
295
296    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
297        """
298        Send a QMP command to the QMP Monitor.
299
300        @param qmp_cmd: QMP command to be sent as a Python dict
301        @return QMP response as a Python dict
302        """
303        self.logger.debug(">>> %s", qmp_cmd)
304        self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
305        resp = self.__json_read()
306        if resp is None:
307            raise QMPConnectError("Unexpected empty reply from server")
308        self.logger.debug("<<< %s", resp)
309        return resp
310
311    def cmd(self, name: str,
312            args: Optional[Dict[str, object]] = None,
313            cmd_id: Optional[object] = None) -> QMPMessage:
314        """
315        Build a QMP command and send it to the QMP Monitor.
316
317        @param name: command name (string)
318        @param args: command arguments (dict)
319        @param cmd_id: command id (dict, list, string or int)
320        """
321        qmp_cmd: QMPMessage = {'execute': name}
322        if args:
323            qmp_cmd['arguments'] = args
324        if cmd_id:
325            qmp_cmd['id'] = cmd_id
326        return self.cmd_obj(qmp_cmd)
327
328    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
329        """
330        Build and send a QMP command to the monitor, report errors if any
331        """
332        ret = self.cmd(cmd, kwds)
333        if 'error' in ret:
334            raise QMPResponseError(ret)
335        if 'return' not in ret:
336            raise QMPProtocolError(
337                "'return' key not found in QMP response '{}'".format(str(ret))
338            )
339        return cast(QMPReturnValue, ret['return'])
340
341    def pull_event(self,
342                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
343        """
344        Pulls a single event.
345
346        @param wait (bool): block until an event is available.
347        @param wait (float): If wait is a float, treat it as a timeout value.
348
349        @raise QMPTimeoutError: If a timeout float is provided and the timeout
350                                period elapses.
351        @raise QMPConnectError: If wait is True but no events could be
352                                retrieved or if some other error occurred.
353
354        @return The first available QMP event, or None.
355        """
356        self.__get_events(wait)
357
358        if self.__events:
359            return self.__events.pop(0)
360        return None
361
362    def get_events(self, wait: bool = False) -> List[QMPMessage]:
363        """
364        Get a list of available QMP events.
365
366        @param wait (bool): block until an event is available.
367        @param wait (float): If wait is a float, treat it as a timeout value.
368
369        @raise QMPTimeoutError: If a timeout float is provided and the timeout
370                                period elapses.
371        @raise QMPConnectError: If wait is True but no events could be
372                                retrieved or if some other error occurred.
373
374        @return The list of available QMP events.
375        """
376        self.__get_events(wait)
377        return self.__events
378
379    def clear_events(self) -> None:
380        """
381        Clear current list of pending events.
382        """
383        self.__events = []
384
385    def close(self) -> None:
386        """
387        Close the socket and socket file.
388        """
389        if self.__sock:
390            self.__sock.close()
391        if self.__sockfile:
392            self.__sockfile.close()
393
394    def settimeout(self, timeout: Optional[float]) -> None:
395        """
396        Set the socket timeout.
397
398        @param timeout (float): timeout in seconds (non-zero), or None.
399        @note This is a wrap around socket.settimeout
400
401        @raise ValueError: if timeout was set to 0.
402        """
403        if timeout == 0:
404            msg = "timeout cannot be 0; this engages non-blocking mode."
405            msg += " Use 'None' instead to disable timeouts."
406            raise ValueError(msg)
407        self.__sock.settimeout(timeout)
408
409    def get_sock_fd(self) -> int:
410        """
411        Get the socket file descriptor.
412
413        @return The file descriptor number.
414        """
415        return self.__sock.fileno()
416
417    def is_scm_available(self) -> bool:
418        """
419        Check if the socket allows for SCM_RIGHTS.
420
421        @return True if SCM_RIGHTS is available, otherwise False.
422        """
423        return self.__sock.family == socket.AF_UNIX
424