1""" 2QMP Protocol Implementation 3 4This module provides the `QMPClient` class, which can be used to connect 5and send commands to a QMP server such as QEMU. The QMP class can be 6used to either connect to a listening server, or used to listen and 7accept an incoming connection from that server. 8""" 9 10import asyncio 11import logging 12import socket 13import struct 14from typing import ( 15 Dict, 16 List, 17 Mapping, 18 Optional, 19 Union, 20 cast, 21) 22 23from .error import ProtocolError, QMPError 24from .events import Events 25from .message import Message 26from .models import ErrorResponse, Greeting 27from .protocol import AsyncProtocol, Runstate, require 28from .util import ( 29 bottom_half, 30 exception_summary, 31 pretty_traceback, 32 upper_half, 33) 34 35 36class _WrappedProtocolError(ProtocolError): 37 """ 38 Abstract exception class for Protocol errors that wrap an Exception. 39 40 :param error_message: Human-readable string describing the error. 41 :param exc: The root-cause exception. 42 """ 43 def __init__(self, error_message: str, exc: Exception): 44 super().__init__(error_message) 45 self.exc = exc 46 47 def __str__(self) -> str: 48 return f"{self.error_message}: {self.exc!s}" 49 50 51class GreetingError(_WrappedProtocolError): 52 """ 53 An exception occurred during the Greeting phase. 54 55 :param error_message: Human-readable string describing the error. 56 :param exc: The root-cause exception. 57 """ 58 59 60class NegotiationError(_WrappedProtocolError): 61 """ 62 An exception occurred during the Negotiation phase. 63 64 :param error_message: Human-readable string describing the error. 65 :param exc: The root-cause exception. 66 """ 67 68 69class ExecuteError(QMPError): 70 """ 71 Exception raised by `QMPClient.execute()` on RPC failure. 72 73 :param error_response: The RPC error response object. 74 :param sent: The sent RPC message that caused the failure. 75 :param received: The raw RPC error reply received. 76 """ 77 def __init__(self, error_response: ErrorResponse, 78 sent: Message, received: Message): 79 super().__init__(error_response.error.desc) 80 #: The sent `Message` that caused the failure 81 self.sent: Message = sent 82 #: The received `Message` that indicated failure 83 self.received: Message = received 84 #: The parsed error response 85 self.error: ErrorResponse = error_response 86 #: The QMP error class 87 self.error_class: str = error_response.error.class_ 88 89 90class ExecInterruptedError(QMPError): 91 """ 92 Exception raised by `execute()` (et al) when an RPC is interrupted. 93 94 This error is raised when an `execute()` statement could not be 95 completed. This can occur because the connection itself was 96 terminated before a reply was received. 97 98 The true cause of the interruption will be available via `disconnect()`. 99 """ 100 101 102class _MsgProtocolError(ProtocolError): 103 """ 104 Abstract error class for protocol errors that have a `Message` object. 105 106 This Exception class is used for protocol errors where the `Message` 107 was mechanically understood, but was found to be inappropriate or 108 malformed. 109 110 :param error_message: Human-readable string describing the error. 111 :param msg: The QMP `Message` that caused the error. 112 """ 113 def __init__(self, error_message: str, msg: Message): 114 super().__init__(error_message) 115 #: The received `Message` that caused the error. 116 self.msg: Message = msg 117 118 def __str__(self) -> str: 119 return "\n".join([ 120 super().__str__(), 121 f" Message was: {str(self.msg)}\n", 122 ]) 123 124 125class ServerParseError(_MsgProtocolError): 126 """ 127 The Server sent a `Message` indicating parsing failure. 128 129 i.e. A reply has arrived from the server, but it is missing the "ID" 130 field, indicating a parsing error. 131 132 :param error_message: Human-readable string describing the error. 133 :param msg: The QMP `Message` that caused the error. 134 """ 135 136 137class BadReplyError(_MsgProtocolError): 138 """ 139 An execution reply was successfully routed, but not understood. 140 141 If a QMP message is received with an 'id' field to allow it to be 142 routed, but is otherwise malformed, this exception will be raised. 143 144 A reply message is malformed if it is missing either the 'return' or 145 'error' keys, or if the 'error' value has missing keys or members of 146 the wrong type. 147 148 :param error_message: Human-readable string describing the error. 149 :param msg: The malformed reply that was received. 150 :param sent: The message that was sent that prompted the error. 151 """ 152 def __init__(self, error_message: str, msg: Message, sent: Message): 153 super().__init__(error_message, msg) 154 #: The sent `Message` that caused the failure 155 self.sent = sent 156 157 158class QMPClient(AsyncProtocol[Message], Events): 159 """ 160 Implements a QMP client connection. 161 162 QMP can be used to establish a connection as either the transport 163 client or server, though this class always acts as the QMP client. 164 165 :param name: Optional nickname for the connection, used for logging. 166 167 Basic script-style usage looks like this:: 168 169 qmp = QMPClient('my_virtual_machine_name') 170 await qmp.connect(('127.0.0.1', 1234)) 171 ... 172 res = await qmp.execute('block-query') 173 ... 174 await qmp.disconnect() 175 176 Basic async client-style usage looks like this:: 177 178 class Client: 179 def __init__(self, name: str): 180 self.qmp = QMPClient(name) 181 182 async def watch_events(self): 183 try: 184 async for event in self.qmp.events: 185 print(f"Event: {event['event']}") 186 except asyncio.CancelledError: 187 return 188 189 async def run(self, address='/tmp/qemu.socket'): 190 await self.qmp.connect(address) 191 asyncio.create_task(self.watch_events()) 192 await self.qmp.runstate_changed.wait() 193 await self.disconnect() 194 195 See `qmp.events` for more detail on event handling patterns. 196 """ 197 #: Logger object used for debugging messages. 198 logger = logging.getLogger(__name__) 199 200 # Read buffer limit; 10MB like libvirt default 201 _limit = 10 * 1024 * 1024 202 203 # Type alias for pending execute() result items 204 _PendingT = Union[Message, ExecInterruptedError] 205 206 def __init__(self, name: Optional[str] = None) -> None: 207 super().__init__(name) 208 Events.__init__(self) 209 210 #: Whether or not to await a greeting after establishing a connection. 211 self.await_greeting: bool = True 212 213 #: Whether or not to perform capabilities negotiation upon connection. 214 #: Implies `await_greeting`. 215 self.negotiate: bool = True 216 217 # Cached Greeting, if one was awaited. 218 self._greeting: Optional[Greeting] = None 219 220 # Command ID counter 221 self._execute_id = 0 222 223 # Incoming RPC reply messages. 224 self._pending: Dict[ 225 Union[str, None], 226 'asyncio.Queue[QMPClient._PendingT]' 227 ] = {} 228 229 @property 230 def greeting(self) -> Optional[Greeting]: 231 """The `Greeting` from the QMP server, if any.""" 232 return self._greeting 233 234 @upper_half 235 async def _establish_session(self) -> None: 236 """ 237 Initiate the QMP session. 238 239 Wait for the QMP greeting and perform capabilities negotiation. 240 241 :raise GreetingError: When the greeting is not understood. 242 :raise NegotiationError: If the negotiation fails. 243 :raise EOFError: When the server unexpectedly hangs up. 244 :raise OSError: For underlying stream errors. 245 """ 246 self._greeting = None 247 self._pending = {} 248 249 if self.await_greeting or self.negotiate: 250 self._greeting = await self._get_greeting() 251 252 if self.negotiate: 253 await self._negotiate() 254 255 # This will start the reader/writers: 256 await super()._establish_session() 257 258 @upper_half 259 async def _get_greeting(self) -> Greeting: 260 """ 261 :raise GreetingError: When the greeting is not understood. 262 :raise EOFError: When the server unexpectedly hangs up. 263 :raise OSError: For underlying stream errors. 264 265 :return: the Greeting object given by the server. 266 """ 267 self.logger.debug("Awaiting greeting ...") 268 269 try: 270 msg = await self._recv() 271 return Greeting(msg) 272 except (ProtocolError, KeyError, TypeError) as err: 273 emsg = "Did not understand Greeting" 274 self.logger.error("%s: %s", emsg, exception_summary(err)) 275 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 276 raise GreetingError(emsg, err) from err 277 except BaseException as err: 278 # EOFError, OSError, or something unexpected. 279 emsg = "Failed to receive Greeting" 280 self.logger.error("%s: %s", emsg, exception_summary(err)) 281 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 282 raise 283 284 @upper_half 285 async def _negotiate(self) -> None: 286 """ 287 Perform QMP capabilities negotiation. 288 289 :raise NegotiationError: When negotiation fails. 290 :raise EOFError: When the server unexpectedly hangs up. 291 :raise OSError: For underlying stream errors. 292 """ 293 self.logger.debug("Negotiating capabilities ...") 294 295 arguments: Dict[str, List[str]] = {} 296 if self._greeting and 'oob' in self._greeting.QMP.capabilities: 297 arguments.setdefault('enable', []).append('oob') 298 msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) 299 300 # It's not safe to use execute() here, because the reader/writers 301 # aren't running. AsyncProtocol *requires* that a new session 302 # does not fail after the reader/writers are running! 303 try: 304 await self._send(msg) 305 reply = await self._recv() 306 assert 'return' in reply 307 assert 'error' not in reply 308 except (ProtocolError, AssertionError) as err: 309 emsg = "Negotiation failed" 310 self.logger.error("%s: %s", emsg, exception_summary(err)) 311 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 312 raise NegotiationError(emsg, err) from err 313 except BaseException as err: 314 # EOFError, OSError, or something unexpected. 315 emsg = "Negotiation failed" 316 self.logger.error("%s: %s", emsg, exception_summary(err)) 317 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 318 raise 319 320 @bottom_half 321 async def _bh_disconnect(self) -> None: 322 try: 323 await super()._bh_disconnect() 324 finally: 325 if self._pending: 326 self.logger.debug("Cancelling pending executions") 327 keys = self._pending.keys() 328 for key in keys: 329 self.logger.debug("Cancelling execution '%s'", key) 330 self._pending[key].put_nowait( 331 ExecInterruptedError("Disconnected") 332 ) 333 334 self.logger.debug("QMP Disconnected.") 335 336 @upper_half 337 def _cleanup(self) -> None: 338 super()._cleanup() 339 assert not self._pending 340 341 @bottom_half 342 async def _on_message(self, msg: Message) -> None: 343 """ 344 Add an incoming message to the appropriate queue/handler. 345 346 :raise ServerParseError: When Message indicates server parse failure. 347 """ 348 # Incoming messages are not fully parsed/validated here; 349 # do only light peeking to know how to route the messages. 350 351 if 'event' in msg: 352 await self._event_dispatch(msg) 353 return 354 355 # Below, we assume everything left is an execute/exec-oob response. 356 357 exec_id = cast(Optional[str], msg.get('id')) 358 359 if exec_id in self._pending: 360 await self._pending[exec_id].put(msg) 361 return 362 363 # We have a message we can't route back to a caller. 364 365 is_error = 'error' in msg 366 has_id = 'id' in msg 367 368 if is_error and not has_id: 369 # This is very likely a server parsing error. 370 # It doesn't inherently belong to any pending execution. 371 # Instead of performing clever recovery, just terminate. 372 # See "NOTE" in qmp-spec.rst, section "Error". 373 raise ServerParseError( 374 ("Server sent an error response without an ID, " 375 "but there are no ID-less executions pending. " 376 "Assuming this is a server parser failure."), 377 msg 378 ) 379 380 # qmp-spec.rst, section "Commands Responses": 381 # 'Clients should drop all the responses 382 # that have an unknown "id" field.' 383 self.logger.log( 384 logging.ERROR if is_error else logging.WARNING, 385 "Unknown ID '%s', message dropped.", 386 exec_id, 387 ) 388 self.logger.debug("Unroutable message: %s", str(msg)) 389 390 @upper_half 391 @bottom_half 392 async def _do_recv(self) -> Message: 393 """ 394 :raise OSError: When a stream error is encountered. 395 :raise EOFError: When the stream is at EOF. 396 :raise ProtocolError: 397 When the Message is not understood. 398 See also `Message._deserialize`. 399 400 :return: A single QMP `Message`. 401 """ 402 msg_bytes = await self._readline() 403 msg = Message(msg_bytes, eager=True) 404 return msg 405 406 @upper_half 407 @bottom_half 408 def _do_send(self, msg: Message) -> None: 409 """ 410 :raise ValueError: JSON serialization failure 411 :raise TypeError: JSON serialization failure 412 :raise OSError: When a stream error is encountered. 413 """ 414 assert self._writer is not None 415 self._writer.write(bytes(msg)) 416 417 @upper_half 418 def _get_exec_id(self) -> str: 419 exec_id = f"__qmp#{self._execute_id:05d}" 420 self._execute_id += 1 421 return exec_id 422 423 @upper_half 424 async def _issue(self, msg: Message) -> Union[None, str]: 425 """ 426 Issue a QMP `Message` and do not wait for a reply. 427 428 :param msg: The QMP `Message` to send to the server. 429 430 :return: The ID of the `Message` sent. 431 """ 432 msg_id: Optional[str] = None 433 if 'id' in msg: 434 assert isinstance(msg['id'], str) 435 msg_id = msg['id'] 436 437 self._pending[msg_id] = asyncio.Queue(maxsize=1) 438 try: 439 await self._outgoing.put(msg) 440 except: 441 del self._pending[msg_id] 442 raise 443 444 return msg_id 445 446 @upper_half 447 async def _reply(self, msg_id: Union[str, None]) -> Message: 448 """ 449 Await a reply to a previously issued QMP message. 450 451 :param msg_id: The ID of the previously issued message. 452 453 :return: The reply from the server. 454 :raise ExecInterruptedError: 455 When the reply could not be retrieved because the connection 456 was lost, or some other problem. 457 """ 458 queue = self._pending[msg_id] 459 460 try: 461 result = await queue.get() 462 if isinstance(result, ExecInterruptedError): 463 raise result 464 return result 465 finally: 466 del self._pending[msg_id] 467 468 @upper_half 469 async def _execute(self, msg: Message, assign_id: bool = True) -> Message: 470 """ 471 Send a QMP `Message` to the server and await a reply. 472 473 This method *assumes* you are sending some kind of an execute 474 statement that *will* receive a reply. 475 476 An execution ID will be assigned if assign_id is `True`. It can be 477 disabled, but this requires that an ID is manually assigned 478 instead. For manually assigned IDs, you must not use the string 479 '__qmp#' anywhere in the ID. 480 481 :param msg: The QMP `Message` to execute. 482 :param assign_id: If True, assign a new execution ID. 483 484 :return: Execution reply from the server. 485 :raise ExecInterruptedError: 486 When the reply could not be retrieved because the connection 487 was lost, or some other problem. 488 """ 489 if assign_id: 490 msg['id'] = self._get_exec_id() 491 elif 'id' in msg: 492 assert isinstance(msg['id'], str) 493 assert '__qmp#' not in msg['id'] 494 495 exec_id = await self._issue(msg) 496 return await self._reply(exec_id) 497 498 @upper_half 499 @require(Runstate.RUNNING) 500 async def _raw( 501 self, 502 msg: Union[Message, Mapping[str, object], bytes], 503 assign_id: bool = True, 504 ) -> Message: 505 """ 506 Issue a raw `Message` to the QMP server and await a reply. 507 508 :param msg: 509 A Message to send to the server. It may be a `Message`, any 510 Mapping (including Dict), or raw bytes. 511 :param assign_id: 512 Assign an arbitrary execution ID to this message. If 513 `False`, the existing id must either be absent (and no other 514 such pending execution may omit an ID) or a string. If it is 515 a string, it must not start with '__qmp#' and no other such 516 pending execution may currently be using that ID. 517 518 :return: Execution reply from the server. 519 520 :raise ExecInterruptedError: 521 When the reply could not be retrieved because the connection 522 was lost, or some other problem. 523 :raise TypeError: 524 When assign_id is `False`, an ID is given, and it is not a string. 525 :raise ValueError: 526 When assign_id is `False`, but the ID is not usable; 527 Either because it starts with '__qmp#' or it is already in-use. 528 """ 529 # 1. convert generic Mapping or bytes to a QMP Message 530 # 2. copy Message objects so that we assign an ID only to the copy. 531 msg = Message(msg) 532 533 exec_id = msg.get('id') 534 if not assign_id and 'id' in msg: 535 if not isinstance(exec_id, str): 536 raise TypeError(f"ID ('{exec_id}') must be a string.") 537 if exec_id.startswith('__qmp#'): 538 raise ValueError( 539 f"ID ('{exec_id}') must not start with '__qmp#'." 540 ) 541 542 if not assign_id and exec_id in self._pending: 543 raise ValueError( 544 f"ID '{exec_id}' is in-use and cannot be used." 545 ) 546 547 return await self._execute(msg, assign_id=assign_id) 548 549 @upper_half 550 @require(Runstate.RUNNING) 551 async def execute_msg(self, msg: Message) -> object: 552 """ 553 Execute a QMP command and return its value. 554 555 :param msg: The QMP `Message` to execute. 556 557 :return: 558 The command execution return value from the server. The type of 559 object returned depends on the command that was issued, 560 though most in QEMU return a `dict`. 561 :raise ValueError: 562 If the QMP `Message` does not have either the 'execute' or 563 'exec-oob' fields set. 564 :raise ExecuteError: When the server returns an error response. 565 :raise ExecInterruptedError: if the connection was terminated early. 566 """ 567 if not ('execute' in msg or 'exec-oob' in msg): 568 raise ValueError("Requires 'execute' or 'exec-oob' message") 569 570 # Copy the Message so that the ID assigned by _execute() is 571 # local to this method; allowing the ID to be seen in raised 572 # Exceptions but without modifying the caller's held copy. 573 msg = Message(msg) 574 reply = await self._execute(msg) 575 576 if 'error' in reply: 577 try: 578 error_response = ErrorResponse(reply) 579 except (KeyError, TypeError) as err: 580 # Error response was malformed. 581 raise BadReplyError( 582 "QMP error reply is malformed", reply, msg, 583 ) from err 584 585 raise ExecuteError(error_response, msg, reply) 586 587 if 'return' not in reply: 588 raise BadReplyError( 589 "QMP reply is missing a 'error' or 'return' member", 590 reply, msg, 591 ) 592 593 return reply['return'] 594 595 @classmethod 596 def make_execute_msg(cls, cmd: str, 597 arguments: Optional[Mapping[str, object]] = None, 598 oob: bool = False) -> Message: 599 """ 600 Create an executable message to be sent by `execute_msg` later. 601 602 :param cmd: QMP command name. 603 :param arguments: Arguments (if any). Must be JSON-serializable. 604 :param oob: If `True`, execute "out of band". 605 606 :return: An executable QMP `Message`. 607 """ 608 msg = Message({'exec-oob' if oob else 'execute': cmd}) 609 if arguments is not None: 610 msg['arguments'] = arguments 611 return msg 612 613 @upper_half 614 async def execute(self, cmd: str, 615 arguments: Optional[Mapping[str, object]] = None, 616 oob: bool = False) -> object: 617 """ 618 Execute a QMP command and return its value. 619 620 :param cmd: QMP command name. 621 :param arguments: Arguments (if any). Must be JSON-serializable. 622 :param oob: If `True`, execute "out of band". 623 624 :return: 625 The command execution return value from the server. The type of 626 object returned depends on the command that was issued, 627 though most in QEMU return a `dict`. 628 :raise ExecuteError: When the server returns an error response. 629 :raise ExecInterruptedError: if the connection was terminated early. 630 """ 631 msg = self.make_execute_msg(cmd, arguments, oob=oob) 632 return await self.execute_msg(msg) 633 634 @upper_half 635 @require(Runstate.RUNNING) 636 def send_fd_scm(self, fd: int) -> None: 637 """ 638 Send a file descriptor to the remote via SCM_RIGHTS. 639 """ 640 assert self._writer is not None 641 sock = self._writer.transport.get_extra_info('socket') 642 643 if sock.family != socket.AF_UNIX: 644 raise QMPError("Sending file descriptors requires a UNIX socket.") 645 646 if not hasattr(sock, 'sendmsg'): 647 # We need to void the warranty sticker. 648 # Access to sendmsg is scheduled for removal in Python 3.11. 649 # Find the real backing socket to use it anyway. 650 sock = sock._sock # pylint: disable=protected-access 651 652 sock.sendmsg( 653 [b' '], 654 [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] 655 ) 656