xref: /qemu/python/qemu/qmp/__init__.py (revision c5955f4f)
1"""
2QEMU Monitor Protocol (QMP) development library & tooling.
3
4This package provides a fairly low-level class for communicating to QMP
5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
6QEMU Storage Daemon. This library is not intended for production use.
7
8`QEMUMonitorProtocol` is the primary class of interest, and all errors
9raised derive from `QMPError`.
10"""
11
12# Copyright (C) 2009, 2010 Red Hat Inc.
13#
14# Authors:
15#  Luiz Capitulino <lcapitulino@redhat.com>
16#
17# This work is licensed under the terms of the GNU GPL, version 2.  See
18# the COPYING file in the top-level directory.
19
20import errno
21import json
22import logging
23import socket
24import struct
25from types import TracebackType
26from typing import (
27    Any,
28    Dict,
29    List,
30    Optional,
31    TextIO,
32    Tuple,
33    Type,
34    TypeVar,
35    Union,
36    cast,
37)
38
39
40#: QMPMessage is an entire QMP message of any kind.
41QMPMessage = Dict[str, Any]
42
43#: QMPReturnValue is the 'return' value of a command.
44QMPReturnValue = object
45
46#: QMPObject is any object in a QMP message.
47QMPObject = Dict[str, object]
48
49# QMPMessage can be outgoing commands or incoming events/returns.
50# QMPReturnValue is usually a dict/json object, but due to QAPI's
51# 'returns-whitelist', it can actually be anything.
52#
53# {'return': {}} is a QMPMessage,
54# {} is the QMPReturnValue.
55
56
57InternetAddrT = Tuple[str, int]
58UnixAddrT = str
59SocketAddrT = Union[InternetAddrT, UnixAddrT]
60
61
62class QMPError(Exception):
63    """
64    QMP base exception
65    """
66
67
68class QMPConnectError(QMPError):
69    """
70    QMP connection exception
71    """
72
73
74class QMPCapabilitiesError(QMPError):
75    """
76    QMP negotiate capabilities exception
77    """
78
79
80class QMPTimeoutError(QMPError):
81    """
82    QMP timeout exception
83    """
84
85
86class QMPProtocolError(QMPError):
87    """
88    QMP protocol error; unexpected response
89    """
90
91
92class QMPResponseError(QMPError):
93    """
94    Represents erroneous QMP monitor reply
95    """
96    def __init__(self, reply: QMPMessage):
97        try:
98            desc = reply['error']['desc']
99        except KeyError:
100            desc = reply
101        super().__init__(desc)
102        self.reply = reply
103
104
105class QMPBadPortError(QMPError):
106    """
107    Unable to parse socket address: Port was non-numerical.
108    """
109
110
111class QEMUMonitorProtocol:
112    """
113    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
114    allow to handle commands and events.
115    """
116
117    #: Logger object for debugging messages
118    logger = logging.getLogger('QMP')
119
120    def __init__(self, address: SocketAddrT,
121                 server: bool = False,
122                 nickname: Optional[str] = None):
123        """
124        Create a QEMUMonitorProtocol class.
125
126        @param address: QEMU address, can be either a unix socket path (string)
127                        or a tuple in the form ( address, port ) for a TCP
128                        connection
129        @param server: server mode listens on the socket (bool)
130        @raise OSError on socket connection errors
131        @note No connection is established, this is done by the connect() or
132              accept() methods
133        """
134        self.__events: List[QMPMessage] = []
135        self.__address = address
136        self.__sock = self.__get_sock()
137        self.__sockfile: Optional[TextIO] = None
138        self._nickname = nickname
139        if self._nickname:
140            self.logger = logging.getLogger('QMP').getChild(self._nickname)
141        if server:
142            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
143            self.__sock.bind(self.__address)
144            self.__sock.listen(1)
145
146    def __get_sock(self) -> socket.socket:
147        if isinstance(self.__address, tuple):
148            family = socket.AF_INET
149        else:
150            family = socket.AF_UNIX
151        return socket.socket(family, socket.SOCK_STREAM)
152
153    def __negotiate_capabilities(self) -> QMPMessage:
154        greeting = self.__json_read()
155        if greeting is None or "QMP" not in greeting:
156            raise QMPConnectError
157        # Greeting seems ok, negotiate capabilities
158        resp = self.cmd('qmp_capabilities')
159        if resp and "return" in resp:
160            return greeting
161        raise QMPCapabilitiesError
162
163    def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
164        assert self.__sockfile is not None
165        while True:
166            data = self.__sockfile.readline()
167            if not data:
168                return None
169            # By definition, any JSON received from QMP is a QMPMessage,
170            # and we are asserting only at static analysis time that it
171            # has a particular shape.
172            resp: QMPMessage = json.loads(data)
173            if 'event' in resp:
174                self.logger.debug("<<< %s", resp)
175                self.__events.append(resp)
176                if not only_event:
177                    continue
178            return resp
179
180    def __get_events(self, wait: Union[bool, float] = False) -> None:
181        """
182        Check for new events in the stream and cache them in __events.
183
184        @param wait (bool): block until an event is available.
185        @param wait (float): If wait is a float, treat it as a timeout value.
186
187        @raise QMPTimeoutError: If a timeout float is provided and the timeout
188                                period elapses.
189        @raise QMPConnectError: If wait is True but no events could be
190                                retrieved or if some other error occurred.
191        """
192
193        # Current timeout and blocking status
194        current_timeout = self.__sock.gettimeout()
195
196        # Check for new events regardless and pull them into the cache:
197        self.__sock.settimeout(0)  # i.e. setblocking(False)
198        try:
199            self.__json_read()
200        except OSError as err:
201            # EAGAIN: No data available; not critical
202            if err.errno != errno.EAGAIN:
203                raise
204        finally:
205            self.__sock.settimeout(current_timeout)
206
207        # Wait for new events, if needed.
208        # if wait is 0.0, this means "no wait" and is also implicitly false.
209        if not self.__events and wait:
210            if isinstance(wait, float):
211                self.__sock.settimeout(wait)
212            try:
213                ret = self.__json_read(only_event=True)
214            except socket.timeout as err:
215                raise QMPTimeoutError("Timeout waiting for event") from err
216            except Exception as err:
217                msg = "Error while reading from socket"
218                raise QMPConnectError(msg) from err
219            finally:
220                self.__sock.settimeout(current_timeout)
221
222            if ret is None:
223                raise QMPConnectError("Error while reading from socket")
224
225    T = TypeVar('T')
226
227    def __enter__(self: T) -> T:
228        # Implement context manager enter function.
229        return self
230
231    def __exit__(self,
232                 # pylint: disable=duplicate-code
233                 # see https://github.com/PyCQA/pylint/issues/3619
234                 exc_type: Optional[Type[BaseException]],
235                 exc_val: Optional[BaseException],
236                 exc_tb: Optional[TracebackType]) -> None:
237        # Implement context manager exit function.
238        self.close()
239
240    @classmethod
241    def parse_address(cls, address: str) -> SocketAddrT:
242        """
243        Parse a string into a QMP address.
244
245        Figure out if the argument is in the port:host form.
246        If it's not, it's probably a file path.
247        """
248        components = address.split(':')
249        if len(components) == 2:
250            try:
251                port = int(components[1])
252            except ValueError:
253                msg = f"Bad port: '{components[1]}' in '{address}'."
254                raise QMPBadPortError(msg) from None
255            return (components[0], port)
256
257        # Treat as filepath.
258        return address
259
260    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
261        """
262        Connect to the QMP Monitor and perform capabilities negotiation.
263
264        @return QMP greeting dict, or None if negotiate is false
265        @raise OSError on socket connection errors
266        @raise QMPConnectError if the greeting is not received
267        @raise QMPCapabilitiesError if fails to negotiate capabilities
268        """
269        self.__sock.connect(self.__address)
270        self.__sockfile = self.__sock.makefile(mode='r')
271        if negotiate:
272            return self.__negotiate_capabilities()
273        return None
274
275    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
276        """
277        Await connection from QMP Monitor and perform capabilities negotiation.
278
279        @param timeout: timeout in seconds (nonnegative float number, or
280                        None). The value passed will set the behavior of the
281                        underneath QMP socket as described in [1].
282                        Default value is set to 15.0.
283
284        @return QMP greeting dict
285        @raise OSError on socket connection errors
286        @raise QMPConnectError if the greeting is not received
287        @raise QMPCapabilitiesError if fails to negotiate capabilities
288
289        [1]
290        https://docs.python.org/3/library/socket.html#socket.socket.settimeout
291        """
292        self.__sock.settimeout(timeout)
293        self.__sock, _ = self.__sock.accept()
294        self.__sockfile = self.__sock.makefile(mode='r')
295        return self.__negotiate_capabilities()
296
297    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
298        """
299        Send a QMP command to the QMP Monitor.
300
301        @param qmp_cmd: QMP command to be sent as a Python dict
302        @return QMP response as a Python dict
303        """
304        self.logger.debug(">>> %s", qmp_cmd)
305        self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
306        resp = self.__json_read()
307        if resp is None:
308            raise QMPConnectError("Unexpected empty reply from server")
309        self.logger.debug("<<< %s", resp)
310        return resp
311
312    def cmd(self, name: str,
313            args: Optional[Dict[str, object]] = None,
314            cmd_id: Optional[object] = None) -> QMPMessage:
315        """
316        Build a QMP command and send it to the QMP Monitor.
317
318        @param name: command name (string)
319        @param args: command arguments (dict)
320        @param cmd_id: command id (dict, list, string or int)
321        """
322        qmp_cmd: QMPMessage = {'execute': name}
323        if args:
324            qmp_cmd['arguments'] = args
325        if cmd_id:
326            qmp_cmd['id'] = cmd_id
327        return self.cmd_obj(qmp_cmd)
328
329    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
330        """
331        Build and send a QMP command to the monitor, report errors if any
332        """
333        ret = self.cmd(cmd, kwds)
334        if 'error' in ret:
335            raise QMPResponseError(ret)
336        if 'return' not in ret:
337            raise QMPProtocolError(
338                "'return' key not found in QMP response '{}'".format(str(ret))
339            )
340        return cast(QMPReturnValue, ret['return'])
341
342    def pull_event(self,
343                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
344        """
345        Pulls a single event.
346
347        @param wait (bool): block until an event is available.
348        @param wait (float): If wait is a float, treat it as a timeout value.
349
350        @raise QMPTimeoutError: If a timeout float is provided and the timeout
351                                period elapses.
352        @raise QMPConnectError: If wait is True but no events could be
353                                retrieved or if some other error occurred.
354
355        @return The first available QMP event, or None.
356        """
357        self.__get_events(wait)
358
359        if self.__events:
360            return self.__events.pop(0)
361        return None
362
363    def get_events(self, wait: bool = False) -> List[QMPMessage]:
364        """
365        Get a list of available QMP events and clear all pending events.
366
367        @param wait (bool): block until an event is available.
368        @param wait (float): If wait is a float, treat it as a timeout value.
369
370        @raise QMPTimeoutError: If a timeout float is provided and the timeout
371                                period elapses.
372        @raise QMPConnectError: If wait is True but no events could be
373                                retrieved or if some other error occurred.
374
375        @return The list of available QMP events.
376        """
377        self.__get_events(wait)
378        events = self.__events
379        self.__events = []
380        return events
381
382    def clear_events(self) -> None:
383        """
384        Clear current list of pending events.
385        """
386        self.__events = []
387
388    def close(self) -> None:
389        """
390        Close the socket and socket file.
391        """
392        if self.__sock:
393            self.__sock.close()
394        if self.__sockfile:
395            self.__sockfile.close()
396
397    def settimeout(self, timeout: Optional[float]) -> None:
398        """
399        Set the socket timeout.
400
401        @param timeout (float): timeout in seconds (non-zero), or None.
402        @note This is a wrap around socket.settimeout
403
404        @raise ValueError: if timeout was set to 0.
405        """
406        if timeout == 0:
407            msg = "timeout cannot be 0; this engages non-blocking mode."
408            msg += " Use 'None' instead to disable timeouts."
409            raise ValueError(msg)
410        self.__sock.settimeout(timeout)
411
412    def send_fd_scm(self, fd: int) -> None:
413        """
414        Send a file descriptor to the remote via SCM_RIGHTS.
415        """
416        if self.__sock.family != socket.AF_UNIX:
417            raise RuntimeError("Can't use SCM_RIGHTS on non-AF_UNIX socket.")
418
419        self.__sock.sendmsg(
420            [b' '],
421            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
422        )
423