xref: /qemu/python/qemu/qmp/__init__.py (revision eeae5466)
1"""
2QEMU Monitor Protocol (QMP) development library & tooling.
3
4This package provides a fairly low-level class for communicating to QMP
5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
6QEMU Storage Daemon. This library is not intended for production use.
7
8`QEMUMonitorProtocol` is the primary class of interest, and all errors
9raised derive from `QMPError`.
10"""
11
12# Copyright (C) 2009, 2010 Red Hat Inc.
13#
14# Authors:
15#  Luiz Capitulino <lcapitulino@redhat.com>
16#
17# This work is licensed under the terms of the GNU GPL, version 2.  See
18# the COPYING file in the top-level directory.
19
20import errno
21import json
22import logging
23import socket
24from types import TracebackType
25from typing import (
26    Any,
27    Dict,
28    List,
29    Optional,
30    TextIO,
31    Tuple,
32    Type,
33    Union,
34    cast,
35)
36
37
38# QMPMessage is a QMP Message of any kind.
39# e.g. {'yee': 'haw'}
40#
41# QMPReturnValue is the inner value of return values only.
42# {'return': {}} is the QMPMessage,
43# {} is the QMPReturnValue.
44QMPMessage = Dict[str, Any]
45QMPReturnValue = Dict[str, Any]
46
47InternetAddrT = Tuple[str, str]
48UnixAddrT = str
49SocketAddrT = Union[InternetAddrT, UnixAddrT]
50
51
52class QMPError(Exception):
53    """
54    QMP base exception
55    """
56
57
58class QMPConnectError(QMPError):
59    """
60    QMP connection exception
61    """
62
63
64class QMPCapabilitiesError(QMPError):
65    """
66    QMP negotiate capabilities exception
67    """
68
69
70class QMPTimeoutError(QMPError):
71    """
72    QMP timeout exception
73    """
74
75
76class QMPProtocolError(QMPError):
77    """
78    QMP protocol error; unexpected response
79    """
80
81
82class QMPResponseError(QMPError):
83    """
84    Represents erroneous QMP monitor reply
85    """
86    def __init__(self, reply: QMPMessage):
87        try:
88            desc = reply['error']['desc']
89        except KeyError:
90            desc = reply
91        super().__init__(desc)
92        self.reply = reply
93
94
95class QEMUMonitorProtocol:
96    """
97    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
98    allow to handle commands and events.
99    """
100
101    #: Logger object for debugging messages
102    logger = logging.getLogger('QMP')
103
104    def __init__(self, address: SocketAddrT,
105                 server: bool = False,
106                 nickname: Optional[str] = None):
107        """
108        Create a QEMUMonitorProtocol class.
109
110        @param address: QEMU address, can be either a unix socket path (string)
111                        or a tuple in the form ( address, port ) for a TCP
112                        connection
113        @param server: server mode listens on the socket (bool)
114        @raise OSError on socket connection errors
115        @note No connection is established, this is done by the connect() or
116              accept() methods
117        """
118        self.__events: List[QMPMessage] = []
119        self.__address = address
120        self.__sock = self.__get_sock()
121        self.__sockfile: Optional[TextIO] = None
122        self._nickname = nickname
123        if self._nickname:
124            self.logger = logging.getLogger('QMP').getChild(self._nickname)
125        if server:
126            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
127            self.__sock.bind(self.__address)
128            self.__sock.listen(1)
129
130    def __get_sock(self) -> socket.socket:
131        if isinstance(self.__address, tuple):
132            family = socket.AF_INET
133        else:
134            family = socket.AF_UNIX
135        return socket.socket(family, socket.SOCK_STREAM)
136
137    def __negotiate_capabilities(self) -> QMPMessage:
138        greeting = self.__json_read()
139        if greeting is None or "QMP" not in greeting:
140            raise QMPConnectError
141        # Greeting seems ok, negotiate capabilities
142        resp = self.cmd('qmp_capabilities')
143        if resp and "return" in resp:
144            return greeting
145        raise QMPCapabilitiesError
146
147    def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
148        assert self.__sockfile is not None
149        while True:
150            data = self.__sockfile.readline()
151            if not data:
152                return None
153            # By definition, any JSON received from QMP is a QMPMessage,
154            # and we are asserting only at static analysis time that it
155            # has a particular shape.
156            resp: QMPMessage = json.loads(data)
157            if 'event' in resp:
158                self.logger.debug("<<< %s", resp)
159                self.__events.append(resp)
160                if not only_event:
161                    continue
162            return resp
163
164    def __get_events(self, wait: Union[bool, float] = False) -> None:
165        """
166        Check for new events in the stream and cache them in __events.
167
168        @param wait (bool): block until an event is available.
169        @param wait (float): If wait is a float, treat it as a timeout value.
170
171        @raise QMPTimeoutError: If a timeout float is provided and the timeout
172                                period elapses.
173        @raise QMPConnectError: If wait is True but no events could be
174                                retrieved or if some other error occurred.
175        """
176
177        # Current timeout and blocking status
178        current_timeout = self.__sock.gettimeout()
179
180        # Check for new events regardless and pull them into the cache:
181        self.__sock.settimeout(0)  # i.e. setblocking(False)
182        try:
183            self.__json_read()
184        except OSError as err:
185            # EAGAIN: No data available; not critical
186            if err.errno != errno.EAGAIN:
187                raise
188        finally:
189            self.__sock.settimeout(current_timeout)
190
191        # Wait for new events, if needed.
192        # if wait is 0.0, this means "no wait" and is also implicitly false.
193        if not self.__events and wait:
194            if isinstance(wait, float):
195                self.__sock.settimeout(wait)
196            try:
197                ret = self.__json_read(only_event=True)
198            except socket.timeout as err:
199                raise QMPTimeoutError("Timeout waiting for event") from err
200            except Exception as err:
201                msg = "Error while reading from socket"
202                raise QMPConnectError(msg) from err
203            finally:
204                self.__sock.settimeout(current_timeout)
205
206            if ret is None:
207                raise QMPConnectError("Error while reading from socket")
208
209    def __enter__(self) -> 'QEMUMonitorProtocol':
210        # Implement context manager enter function.
211        return self
212
213    def __exit__(self,
214                 # pylint: disable=duplicate-code
215                 # see https://github.com/PyCQA/pylint/issues/3619
216                 exc_type: Optional[Type[BaseException]],
217                 exc_val: Optional[BaseException],
218                 exc_tb: Optional[TracebackType]) -> None:
219        # Implement context manager exit function.
220        self.close()
221
222    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
223        """
224        Connect to the QMP Monitor and perform capabilities negotiation.
225
226        @return QMP greeting dict, or None if negotiate is false
227        @raise OSError on socket connection errors
228        @raise QMPConnectError if the greeting is not received
229        @raise QMPCapabilitiesError if fails to negotiate capabilities
230        """
231        self.__sock.connect(self.__address)
232        self.__sockfile = self.__sock.makefile(mode='r')
233        if negotiate:
234            return self.__negotiate_capabilities()
235        return None
236
237    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
238        """
239        Await connection from QMP Monitor and perform capabilities negotiation.
240
241        @param timeout: timeout in seconds (nonnegative float number, or
242                        None). The value passed will set the behavior of the
243                        underneath QMP socket as described in [1].
244                        Default value is set to 15.0.
245        @return QMP greeting dict
246        @raise OSError on socket connection errors
247        @raise QMPConnectError if the greeting is not received
248        @raise QMPCapabilitiesError if fails to negotiate capabilities
249
250        [1]
251        https://docs.python.org/3/library/socket.html#socket.socket.settimeout
252        """
253        self.__sock.settimeout(timeout)
254        self.__sock, _ = self.__sock.accept()
255        self.__sockfile = self.__sock.makefile(mode='r')
256        return self.__negotiate_capabilities()
257
258    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
259        """
260        Send a QMP command to the QMP Monitor.
261
262        @param qmp_cmd: QMP command to be sent as a Python dict
263        @return QMP response as a Python dict
264        """
265        self.logger.debug(">>> %s", qmp_cmd)
266        self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
267        resp = self.__json_read()
268        if resp is None:
269            raise QMPConnectError("Unexpected empty reply from server")
270        self.logger.debug("<<< %s", resp)
271        return resp
272
273    def cmd(self, name: str,
274            args: Optional[Dict[str, Any]] = None,
275            cmd_id: Optional[Any] = None) -> QMPMessage:
276        """
277        Build a QMP command and send it to the QMP Monitor.
278
279        @param name: command name (string)
280        @param args: command arguments (dict)
281        @param cmd_id: command id (dict, list, string or int)
282        """
283        qmp_cmd: QMPMessage = {'execute': name}
284        if args:
285            qmp_cmd['arguments'] = args
286        if cmd_id:
287            qmp_cmd['id'] = cmd_id
288        return self.cmd_obj(qmp_cmd)
289
290    def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
291        """
292        Build and send a QMP command to the monitor, report errors if any
293        """
294        ret = self.cmd(cmd, kwds)
295        if 'error' in ret:
296            raise QMPResponseError(ret)
297        if 'return' not in ret:
298            raise QMPProtocolError(
299                "'return' key not found in QMP response '{}'".format(str(ret))
300            )
301        return cast(QMPReturnValue, ret['return'])
302
303    def pull_event(self,
304                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
305        """
306        Pulls a single event.
307
308        @param wait (bool): block until an event is available.
309        @param wait (float): If wait is a float, treat it as a timeout value.
310
311        @raise QMPTimeoutError: If a timeout float is provided and the timeout
312                                period elapses.
313        @raise QMPConnectError: If wait is True but no events could be
314                                retrieved or if some other error occurred.
315
316        @return The first available QMP event, or None.
317        """
318        self.__get_events(wait)
319
320        if self.__events:
321            return self.__events.pop(0)
322        return None
323
324    def get_events(self, wait: bool = False) -> List[QMPMessage]:
325        """
326        Get a list of available QMP events.
327
328        @param wait (bool): block until an event is available.
329        @param wait (float): If wait is a float, treat it as a timeout value.
330
331        @raise QMPTimeoutError: If a timeout float is provided and the timeout
332                                period elapses.
333        @raise QMPConnectError: If wait is True but no events could be
334                                retrieved or if some other error occurred.
335
336        @return The list of available QMP events.
337        """
338        self.__get_events(wait)
339        return self.__events
340
341    def clear_events(self) -> None:
342        """
343        Clear current list of pending events.
344        """
345        self.__events = []
346
347    def close(self) -> None:
348        """
349        Close the socket and socket file.
350        """
351        if self.__sock:
352            self.__sock.close()
353        if self.__sockfile:
354            self.__sockfile.close()
355
356    def settimeout(self, timeout: Optional[float]) -> None:
357        """
358        Set the socket timeout.
359
360        @param timeout (float): timeout in seconds (non-zero), or None.
361        @note This is a wrap around socket.settimeout
362
363        @raise ValueError: if timeout was set to 0.
364        """
365        if timeout == 0:
366            msg = "timeout cannot be 0; this engages non-blocking mode."
367            msg += " Use 'None' instead to disable timeouts."
368            raise ValueError(msg)
369        self.__sock.settimeout(timeout)
370
371    def get_sock_fd(self) -> int:
372        """
373        Get the socket file descriptor.
374
375        @return The file descriptor number.
376        """
377        return self.__sock.fileno()
378
379    def is_scm_available(self) -> bool:
380        """
381        Check if the socket allows for SCM_RIGHTS.
382
383        @return True if SCM_RIGHTS is available, otherwise False.
384        """
385        return self.__sock.family == socket.AF_UNIX
386