xref: /qemu/python/qemu/qmp/qmp_client.py (revision d5657258)
1"""
2QMP Protocol Implementation
3
4This module provides the `QMPClient` class, which can be used to connect
5and send commands to a QMP server such as QEMU. The QMP class can be
6used to either connect to a listening server, or used to listen and
7accept an incoming connection from that server.
8"""
9
10import asyncio
11import logging
12import socket
13import struct
14from typing import (
15    Dict,
16    List,
17    Mapping,
18    Optional,
19    Union,
20    cast,
21)
22
23from .error import ProtocolError, QMPError
24from .events import Events
25from .message import Message
26from .models import ErrorResponse, Greeting
27from .protocol import AsyncProtocol, Runstate, require
28from .util import (
29    bottom_half,
30    exception_summary,
31    pretty_traceback,
32    upper_half,
33)
34
35
36class _WrappedProtocolError(ProtocolError):
37    """
38    Abstract exception class for Protocol errors that wrap an Exception.
39
40    :param error_message: Human-readable string describing the error.
41    :param exc: The root-cause exception.
42    """
43    def __init__(self, error_message: str, exc: Exception):
44        super().__init__(error_message)
45        self.exc = exc
46
47    def __str__(self) -> str:
48        return f"{self.error_message}: {self.exc!s}"
49
50
51class GreetingError(_WrappedProtocolError):
52    """
53    An exception occurred during the Greeting phase.
54
55    :param error_message: Human-readable string describing the error.
56    :param exc: The root-cause exception.
57    """
58
59
60class NegotiationError(_WrappedProtocolError):
61    """
62    An exception occurred during the Negotiation phase.
63
64    :param error_message: Human-readable string describing the error.
65    :param exc: The root-cause exception.
66    """
67
68
69class ExecuteError(QMPError):
70    """
71    Exception raised by `QMPClient.execute()` on RPC failure.
72
73    :param error_response: The RPC error response object.
74    :param sent: The sent RPC message that caused the failure.
75    :param received: The raw RPC error reply received.
76    """
77    def __init__(self, error_response: ErrorResponse,
78                 sent: Message, received: Message):
79        super().__init__(error_response.error.desc)
80        #: The sent `Message` that caused the failure
81        self.sent: Message = sent
82        #: The received `Message` that indicated failure
83        self.received: Message = received
84        #: The parsed error response
85        self.error: ErrorResponse = error_response
86        #: The QMP error class
87        self.error_class: str = error_response.error.class_
88
89
90class ExecInterruptedError(QMPError):
91    """
92    Exception raised by `execute()` (et al) when an RPC is interrupted.
93
94    This error is raised when an `execute()` statement could not be
95    completed.  This can occur because the connection itself was
96    terminated before a reply was received.
97
98    The true cause of the interruption will be available via `disconnect()`.
99    """
100
101
102class _MsgProtocolError(ProtocolError):
103    """
104    Abstract error class for protocol errors that have a `Message` object.
105
106    This Exception class is used for protocol errors where the `Message`
107    was mechanically understood, but was found to be inappropriate or
108    malformed.
109
110    :param error_message: Human-readable string describing the error.
111    :param msg: The QMP `Message` that caused the error.
112    """
113    def __init__(self, error_message: str, msg: Message):
114        super().__init__(error_message)
115        #: The received `Message` that caused the error.
116        self.msg: Message = msg
117
118    def __str__(self) -> str:
119        return "\n".join([
120            super().__str__(),
121            f"  Message was: {str(self.msg)}\n",
122        ])
123
124
125class ServerParseError(_MsgProtocolError):
126    """
127    The Server sent a `Message` indicating parsing failure.
128
129    i.e. A reply has arrived from the server, but it is missing the "ID"
130    field, indicating a parsing error.
131
132    :param error_message: Human-readable string describing the error.
133    :param msg: The QMP `Message` that caused the error.
134    """
135
136
137class BadReplyError(_MsgProtocolError):
138    """
139    An execution reply was successfully routed, but not understood.
140
141    If a QMP message is received with an 'id' field to allow it to be
142    routed, but is otherwise malformed, this exception will be raised.
143
144    A reply message is malformed if it is missing either the 'return' or
145    'error' keys, or if the 'error' value has missing keys or members of
146    the wrong type.
147
148    :param error_message: Human-readable string describing the error.
149    :param msg: The malformed reply that was received.
150    :param sent: The message that was sent that prompted the error.
151    """
152    def __init__(self, error_message: str, msg: Message, sent: Message):
153        super().__init__(error_message, msg)
154        #: The sent `Message` that caused the failure
155        self.sent = sent
156
157
158class QMPClient(AsyncProtocol[Message], Events):
159    """
160    Implements a QMP client connection.
161
162    QMP can be used to establish a connection as either the transport
163    client or server, though this class always acts as the QMP client.
164
165    :param name: Optional nickname for the connection, used for logging.
166
167    Basic script-style usage looks like this::
168
169      qmp = QMPClient('my_virtual_machine_name')
170      await qmp.connect(('127.0.0.1', 1234))
171      ...
172      res = await qmp.execute('block-query')
173      ...
174      await qmp.disconnect()
175
176    Basic async client-style usage looks like this::
177
178      class Client:
179          def __init__(self, name: str):
180              self.qmp = QMPClient(name)
181
182          async def watch_events(self):
183              try:
184                  async for event in self.qmp.events:
185                      print(f"Event: {event['event']}")
186              except asyncio.CancelledError:
187                  return
188
189          async def run(self, address='/tmp/qemu.socket'):
190              await self.qmp.connect(address)
191              asyncio.create_task(self.watch_events())
192              await self.qmp.runstate_changed.wait()
193              await self.disconnect()
194
195    See `qmp.events` for more detail on event handling patterns.
196    """
197    #: Logger object used for debugging messages.
198    logger = logging.getLogger(__name__)
199
200    # Read buffer limit; 10MB like libvirt default
201    _limit = 10 * 1024 * 1024
202
203    # Type alias for pending execute() result items
204    _PendingT = Union[Message, ExecInterruptedError]
205
206    def __init__(self, name: Optional[str] = None) -> None:
207        super().__init__(name)
208        Events.__init__(self)
209
210        #: Whether or not to await a greeting after establishing a connection.
211        self.await_greeting: bool = True
212
213        #: Whether or not to perform capabilities negotiation upon connection.
214        #: Implies `await_greeting`.
215        self.negotiate: bool = True
216
217        # Cached Greeting, if one was awaited.
218        self._greeting: Optional[Greeting] = None
219
220        # Command ID counter
221        self._execute_id = 0
222
223        # Incoming RPC reply messages.
224        self._pending: Dict[
225            Union[str, None],
226            'asyncio.Queue[QMPClient._PendingT]'
227        ] = {}
228
229    @property
230    def greeting(self) -> Optional[Greeting]:
231        """The `Greeting` from the QMP server, if any."""
232        return self._greeting
233
234    @upper_half
235    async def _establish_session(self) -> None:
236        """
237        Initiate the QMP session.
238
239        Wait for the QMP greeting and perform capabilities negotiation.
240
241        :raise GreetingError: When the greeting is not understood.
242        :raise NegotiationError: If the negotiation fails.
243        :raise EOFError: When the server unexpectedly hangs up.
244        :raise OSError: For underlying stream errors.
245        """
246        self._greeting = None
247        self._pending = {}
248
249        if self.await_greeting or self.negotiate:
250            self._greeting = await self._get_greeting()
251
252        if self.negotiate:
253            await self._negotiate()
254
255        # This will start the reader/writers:
256        await super()._establish_session()
257
258    @upper_half
259    async def _get_greeting(self) -> Greeting:
260        """
261        :raise GreetingError: When the greeting is not understood.
262        :raise EOFError: When the server unexpectedly hangs up.
263        :raise OSError: For underlying stream errors.
264
265        :return: the Greeting object given by the server.
266        """
267        self.logger.debug("Awaiting greeting ...")
268
269        try:
270            msg = await self._recv()
271            return Greeting(msg)
272        except (ProtocolError, KeyError, TypeError) as err:
273            emsg = "Did not understand Greeting"
274            self.logger.error("%s: %s", emsg, exception_summary(err))
275            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
276            raise GreetingError(emsg, err) from err
277        except BaseException as err:
278            # EOFError, OSError, or something unexpected.
279            emsg = "Failed to receive Greeting"
280            self.logger.error("%s: %s", emsg, exception_summary(err))
281            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
282            raise
283
284    @upper_half
285    async def _negotiate(self) -> None:
286        """
287        Perform QMP capabilities negotiation.
288
289        :raise NegotiationError: When negotiation fails.
290        :raise EOFError: When the server unexpectedly hangs up.
291        :raise OSError: For underlying stream errors.
292        """
293        self.logger.debug("Negotiating capabilities ...")
294
295        arguments: Dict[str, List[str]] = {}
296        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
297            arguments.setdefault('enable', []).append('oob')
298        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
299
300        # It's not safe to use execute() here, because the reader/writers
301        # aren't running. AsyncProtocol *requires* that a new session
302        # does not fail after the reader/writers are running!
303        try:
304            await self._send(msg)
305            reply = await self._recv()
306            assert 'return' in reply
307            assert 'error' not in reply
308        except (ProtocolError, AssertionError) as err:
309            emsg = "Negotiation failed"
310            self.logger.error("%s: %s", emsg, exception_summary(err))
311            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
312            raise NegotiationError(emsg, err) from err
313        except BaseException as err:
314            # EOFError, OSError, or something unexpected.
315            emsg = "Negotiation failed"
316            self.logger.error("%s: %s", emsg, exception_summary(err))
317            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
318            raise
319
320    @bottom_half
321    async def _bh_disconnect(self) -> None:
322        try:
323            await super()._bh_disconnect()
324        finally:
325            if self._pending:
326                self.logger.debug("Cancelling pending executions")
327            keys = self._pending.keys()
328            for key in keys:
329                self.logger.debug("Cancelling execution '%s'", key)
330                self._pending[key].put_nowait(
331                    ExecInterruptedError("Disconnected")
332                )
333
334            self.logger.debug("QMP Disconnected.")
335
336    @upper_half
337    def _cleanup(self) -> None:
338        super()._cleanup()
339        assert not self._pending
340
341    @bottom_half
342    async def _on_message(self, msg: Message) -> None:
343        """
344        Add an incoming message to the appropriate queue/handler.
345
346        :raise ServerParseError: When Message indicates server parse failure.
347        """
348        # Incoming messages are not fully parsed/validated here;
349        # do only light peeking to know how to route the messages.
350
351        if 'event' in msg:
352            await self._event_dispatch(msg)
353            return
354
355        # Below, we assume everything left is an execute/exec-oob response.
356
357        exec_id = cast(Optional[str], msg.get('id'))
358
359        if exec_id in self._pending:
360            await self._pending[exec_id].put(msg)
361            return
362
363        # We have a message we can't route back to a caller.
364
365        is_error = 'error' in msg
366        has_id = 'id' in msg
367
368        if is_error and not has_id:
369            # This is very likely a server parsing error.
370            # It doesn't inherently belong to any pending execution.
371            # Instead of performing clever recovery, just terminate.
372            # See "NOTE" in qmp-spec.rst, section "Error".
373            raise ServerParseError(
374                ("Server sent an error response without an ID, "
375                 "but there are no ID-less executions pending. "
376                 "Assuming this is a server parser failure."),
377                msg
378            )
379
380        # qmp-spec.rst, section "Commands Responses":
381        # 'Clients should drop all the responses
382        # that have an unknown "id" field.'
383        self.logger.log(
384            logging.ERROR if is_error else logging.WARNING,
385            "Unknown ID '%s', message dropped.",
386            exec_id,
387        )
388        self.logger.debug("Unroutable message: %s", str(msg))
389
390    @upper_half
391    @bottom_half
392    async def _do_recv(self) -> Message:
393        """
394        :raise OSError: When a stream error is encountered.
395        :raise EOFError: When the stream is at EOF.
396        :raise ProtocolError:
397            When the Message is not understood.
398            See also `Message._deserialize`.
399
400        :return: A single QMP `Message`.
401        """
402        msg_bytes = await self._readline()
403        msg = Message(msg_bytes, eager=True)
404        return msg
405
406    @upper_half
407    @bottom_half
408    def _do_send(self, msg: Message) -> None:
409        """
410        :raise ValueError: JSON serialization failure
411        :raise TypeError: JSON serialization failure
412        :raise OSError: When a stream error is encountered.
413        """
414        assert self._writer is not None
415        self._writer.write(bytes(msg))
416
417    @upper_half
418    def _get_exec_id(self) -> str:
419        exec_id = f"__qmp#{self._execute_id:05d}"
420        self._execute_id += 1
421        return exec_id
422
423    @upper_half
424    async def _issue(self, msg: Message) -> Union[None, str]:
425        """
426        Issue a QMP `Message` and do not wait for a reply.
427
428        :param msg: The QMP `Message` to send to the server.
429
430        :return: The ID of the `Message` sent.
431        """
432        msg_id: Optional[str] = None
433        if 'id' in msg:
434            assert isinstance(msg['id'], str)
435            msg_id = msg['id']
436
437        self._pending[msg_id] = asyncio.Queue(maxsize=1)
438        try:
439            await self._outgoing.put(msg)
440        except:
441            del self._pending[msg_id]
442            raise
443
444        return msg_id
445
446    @upper_half
447    async def _reply(self, msg_id: Union[str, None]) -> Message:
448        """
449        Await a reply to a previously issued QMP message.
450
451        :param msg_id: The ID of the previously issued message.
452
453        :return: The reply from the server.
454        :raise ExecInterruptedError:
455            When the reply could not be retrieved because the connection
456            was lost, or some other problem.
457        """
458        queue = self._pending[msg_id]
459
460        try:
461            result = await queue.get()
462            if isinstance(result, ExecInterruptedError):
463                raise result
464            return result
465        finally:
466            del self._pending[msg_id]
467
468    @upper_half
469    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
470        """
471        Send a QMP `Message` to the server and await a reply.
472
473        This method *assumes* you are sending some kind of an execute
474        statement that *will* receive a reply.
475
476        An execution ID will be assigned if assign_id is `True`. It can be
477        disabled, but this requires that an ID is manually assigned
478        instead. For manually assigned IDs, you must not use the string
479        '__qmp#' anywhere in the ID.
480
481        :param msg: The QMP `Message` to execute.
482        :param assign_id: If True, assign a new execution ID.
483
484        :return: Execution reply from the server.
485        :raise ExecInterruptedError:
486            When the reply could not be retrieved because the connection
487            was lost, or some other problem.
488        """
489        if assign_id:
490            msg['id'] = self._get_exec_id()
491        elif 'id' in msg:
492            assert isinstance(msg['id'], str)
493            assert '__qmp#' not in msg['id']
494
495        exec_id = await self._issue(msg)
496        return await self._reply(exec_id)
497
498    @upper_half
499    @require(Runstate.RUNNING)
500    async def _raw(
501            self,
502            msg: Union[Message, Mapping[str, object], bytes],
503            assign_id: bool = True,
504    ) -> Message:
505        """
506        Issue a raw `Message` to the QMP server and await a reply.
507
508        :param msg:
509            A Message to send to the server. It may be a `Message`, any
510            Mapping (including Dict), or raw bytes.
511        :param assign_id:
512            Assign an arbitrary execution ID to this message. If
513            `False`, the existing id must either be absent (and no other
514            such pending execution may omit an ID) or a string. If it is
515            a string, it must not start with '__qmp#' and no other such
516            pending execution may currently be using that ID.
517
518        :return: Execution reply from the server.
519
520        :raise ExecInterruptedError:
521            When the reply could not be retrieved because the connection
522            was lost, or some other problem.
523        :raise TypeError:
524            When assign_id is `False`, an ID is given, and it is not a string.
525        :raise ValueError:
526            When assign_id is `False`, but the ID is not usable;
527            Either because it starts with '__qmp#' or it is already in-use.
528        """
529        # 1. convert generic Mapping or bytes to a QMP Message
530        # 2. copy Message objects so that we assign an ID only to the copy.
531        msg = Message(msg)
532
533        exec_id = msg.get('id')
534        if not assign_id and 'id' in msg:
535            if not isinstance(exec_id, str):
536                raise TypeError(f"ID ('{exec_id}') must be a string.")
537            if exec_id.startswith('__qmp#'):
538                raise ValueError(
539                    f"ID ('{exec_id}') must not start with '__qmp#'."
540                )
541
542        if not assign_id and exec_id in self._pending:
543            raise ValueError(
544                f"ID '{exec_id}' is in-use and cannot be used."
545            )
546
547        return await self._execute(msg, assign_id=assign_id)
548
549    @upper_half
550    @require(Runstate.RUNNING)
551    async def execute_msg(self, msg: Message) -> object:
552        """
553        Execute a QMP command and return its value.
554
555        :param msg: The QMP `Message` to execute.
556
557        :return:
558            The command execution return value from the server. The type of
559            object returned depends on the command that was issued,
560            though most in QEMU return a `dict`.
561        :raise ValueError:
562            If the QMP `Message` does not have either the 'execute' or
563            'exec-oob' fields set.
564        :raise ExecuteError: When the server returns an error response.
565        :raise ExecInterruptedError: if the connection was terminated early.
566        """
567        if not ('execute' in msg or 'exec-oob' in msg):
568            raise ValueError("Requires 'execute' or 'exec-oob' message")
569
570        # Copy the Message so that the ID assigned by _execute() is
571        # local to this method; allowing the ID to be seen in raised
572        # Exceptions but without modifying the caller's held copy.
573        msg = Message(msg)
574        reply = await self._execute(msg)
575
576        if 'error' in reply:
577            try:
578                error_response = ErrorResponse(reply)
579            except (KeyError, TypeError) as err:
580                # Error response was malformed.
581                raise BadReplyError(
582                    "QMP error reply is malformed", reply, msg,
583                ) from err
584
585            raise ExecuteError(error_response, msg, reply)
586
587        if 'return' not in reply:
588            raise BadReplyError(
589                "QMP reply is missing a 'error' or 'return' member",
590                reply, msg,
591            )
592
593        return reply['return']
594
595    @classmethod
596    def make_execute_msg(cls, cmd: str,
597                         arguments: Optional[Mapping[str, object]] = None,
598                         oob: bool = False) -> Message:
599        """
600        Create an executable message to be sent by `execute_msg` later.
601
602        :param cmd: QMP command name.
603        :param arguments: Arguments (if any). Must be JSON-serializable.
604        :param oob: If `True`, execute "out of band".
605
606        :return: An executable QMP `Message`.
607        """
608        msg = Message({'exec-oob' if oob else 'execute': cmd})
609        if arguments is not None:
610            msg['arguments'] = arguments
611        return msg
612
613    @upper_half
614    async def execute(self, cmd: str,
615                      arguments: Optional[Mapping[str, object]] = None,
616                      oob: bool = False) -> object:
617        """
618        Execute a QMP command and return its value.
619
620        :param cmd: QMP command name.
621        :param arguments: Arguments (if any). Must be JSON-serializable.
622        :param oob: If `True`, execute "out of band".
623
624        :return:
625            The command execution return value from the server. The type of
626            object returned depends on the command that was issued,
627            though most in QEMU return a `dict`.
628        :raise ExecuteError: When the server returns an error response.
629        :raise ExecInterruptedError: if the connection was terminated early.
630        """
631        msg = self.make_execute_msg(cmd, arguments, oob=oob)
632        return await self.execute_msg(msg)
633
634    @upper_half
635    @require(Runstate.RUNNING)
636    def send_fd_scm(self, fd: int) -> None:
637        """
638        Send a file descriptor to the remote via SCM_RIGHTS.
639        """
640        assert self._writer is not None
641        sock = self._writer.transport.get_extra_info('socket')
642
643        if sock.family != socket.AF_UNIX:
644            raise QMPError("Sending file descriptors requires a UNIX socket.")
645
646        if not hasattr(sock, 'sendmsg'):
647            # We need to void the warranty sticker.
648            # Access to sendmsg is scheduled for removal in Python 3.11.
649            # Find the real backing socket to use it anyway.
650            sock = sock._sock  # pylint: disable=protected-access
651
652        sock.sendmsg(
653            [b' '],
654            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
655        )
656