""" QMP Protocol Implementation This module provides the `QMPClient` class, which can be used to connect and send commands to a QMP server such as QEMU. The QMP class can be used to either connect to a listening server, or used to listen and accept an incoming connection from that server. """ import asyncio import logging import socket import struct from typing import ( Dict, List, Mapping, Optional, Union, cast, ) from .error import ProtocolError, QMPError from .events import Events from .message import Message from .models import ErrorResponse, Greeting from .protocol import AsyncProtocol, Runstate, require from .util import ( bottom_half, exception_summary, pretty_traceback, upper_half, ) class _WrappedProtocolError(ProtocolError): """ Abstract exception class for Protocol errors that wrap an Exception. :param error_message: Human-readable string describing the error. :param exc: The root-cause exception. """ def __init__(self, error_message: str, exc: Exception): super().__init__(error_message) self.exc = exc def __str__(self) -> str: return f"{self.error_message}: {self.exc!s}" class GreetingError(_WrappedProtocolError): """ An exception occurred during the Greeting phase. :param error_message: Human-readable string describing the error. :param exc: The root-cause exception. """ class NegotiationError(_WrappedProtocolError): """ An exception occurred during the Negotiation phase. :param error_message: Human-readable string describing the error. :param exc: The root-cause exception. """ class ExecuteError(QMPError): """ Exception raised by `QMPClient.execute()` on RPC failure. :param error_response: The RPC error response object. :param sent: The sent RPC message that caused the failure. :param received: The raw RPC error reply received. """ def __init__(self, error_response: ErrorResponse, sent: Message, received: Message): super().__init__(error_response.error.desc) #: The sent `Message` that caused the failure self.sent: Message = sent #: The received `Message` that indicated failure self.received: Message = received #: The parsed error response self.error: ErrorResponse = error_response #: The QMP error class self.error_class: str = error_response.error.class_ class ExecInterruptedError(QMPError): """ Exception raised by `execute()` (et al) when an RPC is interrupted. This error is raised when an `execute()` statement could not be completed. This can occur because the connection itself was terminated before a reply was received. The true cause of the interruption will be available via `disconnect()`. """ class _MsgProtocolError(ProtocolError): """ Abstract error class for protocol errors that have a `Message` object. This Exception class is used for protocol errors where the `Message` was mechanically understood, but was found to be inappropriate or malformed. :param error_message: Human-readable string describing the error. :param msg: The QMP `Message` that caused the error. """ def __init__(self, error_message: str, msg: Message): super().__init__(error_message) #: The received `Message` that caused the error. self.msg: Message = msg def __str__(self) -> str: return "\n".join([ super().__str__(), f" Message was: {str(self.msg)}\n", ]) class ServerParseError(_MsgProtocolError): """ The Server sent a `Message` indicating parsing failure. i.e. A reply has arrived from the server, but it is missing the "ID" field, indicating a parsing error. :param error_message: Human-readable string describing the error. :param msg: The QMP `Message` that caused the error. """ class BadReplyError(_MsgProtocolError): """ An execution reply was successfully routed, but not understood. If a QMP message is received with an 'id' field to allow it to be routed, but is otherwise malformed, this exception will be raised. A reply message is malformed if it is missing either the 'return' or 'error' keys, or if the 'error' value has missing keys or members of the wrong type. :param error_message: Human-readable string describing the error. :param msg: The malformed reply that was received. :param sent: The message that was sent that prompted the error. """ def __init__(self, error_message: str, msg: Message, sent: Message): super().__init__(error_message, msg) #: The sent `Message` that caused the failure self.sent = sent class QMPClient(AsyncProtocol[Message], Events): """ Implements a QMP client connection. QMP can be used to establish a connection as either the transport client or server, though this class always acts as the QMP client. :param name: Optional nickname for the connection, used for logging. Basic script-style usage looks like this:: qmp = QMPClient('my_virtual_machine_name') await qmp.connect(('127.0.0.1', 1234)) ... res = await qmp.execute('block-query') ... await qmp.disconnect() Basic async client-style usage looks like this:: class Client: def __init__(self, name: str): self.qmp = QMPClient(name) async def watch_events(self): try: async for event in self.qmp.events: print(f"Event: {event['event']}") except asyncio.CancelledError: return async def run(self, address='/tmp/qemu.socket'): await self.qmp.connect(address) asyncio.create_task(self.watch_events()) await self.qmp.runstate_changed.wait() await self.disconnect() See `qmp.events` for more detail on event handling patterns. """ #: Logger object used for debugging messages. logger = logging.getLogger(__name__) # Read buffer limit; 10MB like libvirt default _limit = 10 * 1024 * 1024 # Type alias for pending execute() result items _PendingT = Union[Message, ExecInterruptedError] def __init__(self, name: Optional[str] = None) -> None: super().__init__(name) Events.__init__(self) #: Whether or not to await a greeting after establishing a connection. self.await_greeting: bool = True #: Whether or not to perform capabilities negotiation upon connection. #: Implies `await_greeting`. self.negotiate: bool = True # Cached Greeting, if one was awaited. self._greeting: Optional[Greeting] = None # Command ID counter self._execute_id = 0 # Incoming RPC reply messages. self._pending: Dict[ Union[str, None], 'asyncio.Queue[QMPClient._PendingT]' ] = {} @property def greeting(self) -> Optional[Greeting]: """The `Greeting` from the QMP server, if any.""" return self._greeting @upper_half async def _establish_session(self) -> None: """ Initiate the QMP session. Wait for the QMP greeting and perform capabilities negotiation. :raise GreetingError: When the greeting is not understood. :raise NegotiationError: If the negotiation fails. :raise EOFError: When the server unexpectedly hangs up. :raise OSError: For underlying stream errors. """ self._greeting = None self._pending = {} if self.await_greeting or self.negotiate: self._greeting = await self._get_greeting() if self.negotiate: await self._negotiate() # This will start the reader/writers: await super()._establish_session() @upper_half async def _get_greeting(self) -> Greeting: """ :raise GreetingError: When the greeting is not understood. :raise EOFError: When the server unexpectedly hangs up. :raise OSError: For underlying stream errors. :return: the Greeting object given by the server. """ self.logger.debug("Awaiting greeting ...") try: msg = await self._recv() return Greeting(msg) except (ProtocolError, KeyError, TypeError) as err: emsg = "Did not understand Greeting" self.logger.error("%s: %s", emsg, exception_summary(err)) self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) raise GreetingError(emsg, err) from err except BaseException as err: # EOFError, OSError, or something unexpected. emsg = "Failed to receive Greeting" self.logger.error("%s: %s", emsg, exception_summary(err)) self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) raise @upper_half async def _negotiate(self) -> None: """ Perform QMP capabilities negotiation. :raise NegotiationError: When negotiation fails. :raise EOFError: When the server unexpectedly hangs up. :raise OSError: For underlying stream errors. """ self.logger.debug("Negotiating capabilities ...") arguments: Dict[str, List[str]] = {} if self._greeting and 'oob' in self._greeting.QMP.capabilities: arguments.setdefault('enable', []).append('oob') msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) # It's not safe to use execute() here, because the reader/writers # aren't running. AsyncProtocol *requires* that a new session # does not fail after the reader/writers are running! try: await self._send(msg) reply = await self._recv() assert 'return' in reply assert 'error' not in reply except (ProtocolError, AssertionError) as err: emsg = "Negotiation failed" self.logger.error("%s: %s", emsg, exception_summary(err)) self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) raise NegotiationError(emsg, err) from err except BaseException as err: # EOFError, OSError, or something unexpected. emsg = "Negotiation failed" self.logger.error("%s: %s", emsg, exception_summary(err)) self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) raise @bottom_half async def _bh_disconnect(self) -> None: try: await super()._bh_disconnect() finally: if self._pending: self.logger.debug("Cancelling pending executions") keys = self._pending.keys() for key in keys: self.logger.debug("Cancelling execution '%s'", key) self._pending[key].put_nowait( ExecInterruptedError("Disconnected") ) self.logger.debug("QMP Disconnected.") @upper_half def _cleanup(self) -> None: super()._cleanup() assert not self._pending @bottom_half async def _on_message(self, msg: Message) -> None: """ Add an incoming message to the appropriate queue/handler. :raise ServerParseError: When Message indicates server parse failure. """ # Incoming messages are not fully parsed/validated here; # do only light peeking to know how to route the messages. if 'event' in msg: await self._event_dispatch(msg) return # Below, we assume everything left is an execute/exec-oob response. exec_id = cast(Optional[str], msg.get('id')) if exec_id in self._pending: await self._pending[exec_id].put(msg) return # We have a message we can't route back to a caller. is_error = 'error' in msg has_id = 'id' in msg if is_error and not has_id: # This is very likely a server parsing error. # It doesn't inherently belong to any pending execution. # Instead of performing clever recovery, just terminate. # See "NOTE" in qmp-spec.rst, section "Error". raise ServerParseError( ("Server sent an error response without an ID, " "but there are no ID-less executions pending. " "Assuming this is a server parser failure."), msg ) # qmp-spec.rst, section "Commands Responses": # 'Clients should drop all the responses # that have an unknown "id" field.' self.logger.log( logging.ERROR if is_error else logging.WARNING, "Unknown ID '%s', message dropped.", exec_id, ) self.logger.debug("Unroutable message: %s", str(msg)) @upper_half @bottom_half async def _do_recv(self) -> Message: """ :raise OSError: When a stream error is encountered. :raise EOFError: When the stream is at EOF. :raise ProtocolError: When the Message is not understood. See also `Message._deserialize`. :return: A single QMP `Message`. """ msg_bytes = await self._readline() msg = Message(msg_bytes, eager=True) return msg @upper_half @bottom_half def _do_send(self, msg: Message) -> None: """ :raise ValueError: JSON serialization failure :raise TypeError: JSON serialization failure :raise OSError: When a stream error is encountered. """ assert self._writer is not None self._writer.write(bytes(msg)) @upper_half def _get_exec_id(self) -> str: exec_id = f"__qmp#{self._execute_id:05d}" self._execute_id += 1 return exec_id @upper_half async def _issue(self, msg: Message) -> Union[None, str]: """ Issue a QMP `Message` and do not wait for a reply. :param msg: The QMP `Message` to send to the server. :return: The ID of the `Message` sent. """ msg_id: Optional[str] = None if 'id' in msg: assert isinstance(msg['id'], str) msg_id = msg['id'] self._pending[msg_id] = asyncio.Queue(maxsize=1) try: await self._outgoing.put(msg) except: del self._pending[msg_id] raise return msg_id @upper_half async def _reply(self, msg_id: Union[str, None]) -> Message: """ Await a reply to a previously issued QMP message. :param msg_id: The ID of the previously issued message. :return: The reply from the server. :raise ExecInterruptedError: When the reply could not be retrieved because the connection was lost, or some other problem. """ queue = self._pending[msg_id] try: result = await queue.get() if isinstance(result, ExecInterruptedError): raise result return result finally: del self._pending[msg_id] @upper_half async def _execute(self, msg: Message, assign_id: bool = True) -> Message: """ Send a QMP `Message` to the server and await a reply. This method *assumes* you are sending some kind of an execute statement that *will* receive a reply. An execution ID will be assigned if assign_id is `True`. It can be disabled, but this requires that an ID is manually assigned instead. For manually assigned IDs, you must not use the string '__qmp#' anywhere in the ID. :param msg: The QMP `Message` to execute. :param assign_id: If True, assign a new execution ID. :return: Execution reply from the server. :raise ExecInterruptedError: When the reply could not be retrieved because the connection was lost, or some other problem. """ if assign_id: msg['id'] = self._get_exec_id() elif 'id' in msg: assert isinstance(msg['id'], str) assert '__qmp#' not in msg['id'] exec_id = await self._issue(msg) return await self._reply(exec_id) @upper_half @require(Runstate.RUNNING) async def _raw( self, msg: Union[Message, Mapping[str, object], bytes], assign_id: bool = True, ) -> Message: """ Issue a raw `Message` to the QMP server and await a reply. :param msg: A Message to send to the server. It may be a `Message`, any Mapping (including Dict), or raw bytes. :param assign_id: Assign an arbitrary execution ID to this message. If `False`, the existing id must either be absent (and no other such pending execution may omit an ID) or a string. If it is a string, it must not start with '__qmp#' and no other such pending execution may currently be using that ID. :return: Execution reply from the server. :raise ExecInterruptedError: When the reply could not be retrieved because the connection was lost, or some other problem. :raise TypeError: When assign_id is `False`, an ID is given, and it is not a string. :raise ValueError: When assign_id is `False`, but the ID is not usable; Either because it starts with '__qmp#' or it is already in-use. """ # 1. convert generic Mapping or bytes to a QMP Message # 2. copy Message objects so that we assign an ID only to the copy. msg = Message(msg) exec_id = msg.get('id') if not assign_id and 'id' in msg: if not isinstance(exec_id, str): raise TypeError(f"ID ('{exec_id}') must be a string.") if exec_id.startswith('__qmp#'): raise ValueError( f"ID ('{exec_id}') must not start with '__qmp#'." ) if not assign_id and exec_id in self._pending: raise ValueError( f"ID '{exec_id}' is in-use and cannot be used." ) return await self._execute(msg, assign_id=assign_id) @upper_half @require(Runstate.RUNNING) async def execute_msg(self, msg: Message) -> object: """ Execute a QMP command and return its value. :param msg: The QMP `Message` to execute. :return: The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a `dict`. :raise ValueError: If the QMP `Message` does not have either the 'execute' or 'exec-oob' fields set. :raise ExecuteError: When the server returns an error response. :raise ExecInterruptedError: if the connection was terminated early. """ if not ('execute' in msg or 'exec-oob' in msg): raise ValueError("Requires 'execute' or 'exec-oob' message") # Copy the Message so that the ID assigned by _execute() is # local to this method; allowing the ID to be seen in raised # Exceptions but without modifying the caller's held copy. msg = Message(msg) reply = await self._execute(msg) if 'error' in reply: try: error_response = ErrorResponse(reply) except (KeyError, TypeError) as err: # Error response was malformed. raise BadReplyError( "QMP error reply is malformed", reply, msg, ) from err raise ExecuteError(error_response, msg, reply) if 'return' not in reply: raise BadReplyError( "QMP reply is missing a 'error' or 'return' member", reply, msg, ) return reply['return'] @classmethod def make_execute_msg(cls, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> Message: """ Create an executable message to be sent by `execute_msg` later. :param cmd: QMP command name. :param arguments: Arguments (if any). Must be JSON-serializable. :param oob: If `True`, execute "out of band". :return: An executable QMP `Message`. """ msg = Message({'exec-oob' if oob else 'execute': cmd}) if arguments is not None: msg['arguments'] = arguments return msg @upper_half async def execute(self, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> object: """ Execute a QMP command and return its value. :param cmd: QMP command name. :param arguments: Arguments (if any). Must be JSON-serializable. :param oob: If `True`, execute "out of band". :return: The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a `dict`. :raise ExecuteError: When the server returns an error response. :raise ExecInterruptedError: if the connection was terminated early. """ msg = self.make_execute_msg(cmd, arguments, oob=oob) return await self.execute_msg(msg) @upper_half @require(Runstate.RUNNING) def send_fd_scm(self, fd: int) -> None: """ Send a file descriptor to the remote via SCM_RIGHTS. """ assert self._writer is not None sock = self._writer.transport.get_extra_info('socket') if sock.family != socket.AF_UNIX: raise QMPError("Sending file descriptors requires a UNIX socket.") if not hasattr(sock, 'sendmsg'): # We need to void the warranty sticker. # Access to sendmsg is scheduled for removal in Python 3.11. # Find the real backing socket to use it anyway. sock = sock._sock # pylint: disable=protected-access sock.sendmsg( [b' '], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] )