1""" 2(Legacy) Sync QMP Wrapper 3 4This module provides the `QEMUMonitorProtocol` class, which is a 5synchronous wrapper around `QMPClient`. 6 7Its design closely resembles that of the original QEMUMonitorProtocol 8class, originally written by Luiz Capitulino. It is provided here for 9compatibility with scripts inside the QEMU source tree that expect the 10old interface. 11""" 12 13# 14# Copyright (C) 2009-2022 Red Hat Inc. 15# 16# Authors: 17# Luiz Capitulino <lcapitulino@redhat.com> 18# John Snow <jsnow@redhat.com> 19# 20# This work is licensed under the terms of the GNU GPL, version 2. See 21# the COPYING file in the top-level directory. 22# 23 24import asyncio 25import socket 26from types import TracebackType 27from typing import ( 28 Any, 29 Awaitable, 30 Dict, 31 List, 32 Optional, 33 Type, 34 TypeVar, 35 Union, 36) 37 38from .error import QMPError 39from .protocol import Runstate, SocketAddrT 40from .qmp_client import QMPClient 41 42 43#: QMPMessage is an entire QMP message of any kind. 44QMPMessage = Dict[str, Any] 45 46#: QMPReturnValue is the 'return' value of a command. 47QMPReturnValue = object 48 49#: QMPObject is any object in a QMP message. 50QMPObject = Dict[str, object] 51 52# QMPMessage can be outgoing commands or incoming events/returns. 53# QMPReturnValue is usually a dict/json object, but due to QAPI's 54# 'command-returns-exceptions', it can actually be anything. 55# 56# {'return': {}} is a QMPMessage, 57# {} is the QMPReturnValue. 58 59 60class QMPBadPortError(QMPError): 61 """ 62 Unable to parse socket address: Port was non-numerical. 63 """ 64 65 66class QEMUMonitorProtocol: 67 """ 68 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) 69 and then allow to handle commands and events. 70 71 :param address: QEMU address, can be either a unix socket path (string) 72 or a tuple in the form ( address, port ) for a TCP 73 connection or None 74 :param sock: a socket or None 75 :param server: Act as the socket server. (See 'accept') 76 :param nickname: Optional nickname used for logging. 77 """ 78 79 def __init__(self, 80 address: Optional[SocketAddrT] = None, 81 sock: Optional[socket.socket] = None, 82 server: bool = False, 83 nickname: Optional[str] = None): 84 85 assert address or sock 86 self._qmp = QMPClient(nickname) 87 self._aloop = asyncio.get_event_loop() 88 self._address = address 89 self._sock = sock 90 self._timeout: Optional[float] = None 91 92 if server: 93 if sock: 94 assert self._sock is not None 95 self._sync(self._qmp.open_with_socket(self._sock)) 96 else: 97 assert self._address is not None 98 self._sync(self._qmp.start_server(self._address)) 99 100 _T = TypeVar('_T') 101 102 def _sync( 103 self, future: Awaitable[_T], timeout: Optional[float] = None 104 ) -> _T: 105 return self._aloop.run_until_complete( 106 asyncio.wait_for(future, timeout=timeout) 107 ) 108 109 def _get_greeting(self) -> Optional[QMPMessage]: 110 if self._qmp.greeting is not None: 111 # pylint: disable=protected-access 112 return self._qmp.greeting._asdict() 113 return None 114 115 def __enter__(self: _T) -> _T: 116 # Implement context manager enter function. 117 return self 118 119 def __exit__(self, 120 exc_type: Optional[Type[BaseException]], 121 exc_val: Optional[BaseException], 122 exc_tb: Optional[TracebackType]) -> None: 123 # Implement context manager exit function. 124 self.close() 125 126 @classmethod 127 def parse_address(cls, address: str) -> SocketAddrT: 128 """ 129 Parse a string into a QMP address. 130 131 Figure out if the argument is in the port:host form. 132 If it's not, it's probably a file path. 133 """ 134 components = address.split(':') 135 if len(components) == 2: 136 try: 137 port = int(components[1]) 138 except ValueError: 139 msg = f"Bad port: '{components[1]}' in '{address}'." 140 raise QMPBadPortError(msg) from None 141 return (components[0], port) 142 143 # Treat as filepath. 144 return address 145 146 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 147 """ 148 Connect to the QMP Monitor and perform capabilities negotiation. 149 150 :return: QMP greeting dict, or None if negotiate is false 151 :raise ConnectError: on connection errors 152 """ 153 assert self._address is not None 154 self._qmp.await_greeting = negotiate 155 self._qmp.negotiate = negotiate 156 157 self._sync( 158 self._qmp.connect(self._address) 159 ) 160 return self._get_greeting() 161 162 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 163 """ 164 Await connection from QMP Monitor and perform capabilities negotiation. 165 166 :param timeout: 167 timeout in seconds (nonnegative float number, or None). 168 If None, there is no timeout, and this may block forever. 169 170 :return: QMP greeting dict 171 :raise ConnectError: on connection errors 172 """ 173 self._qmp.await_greeting = True 174 self._qmp.negotiate = True 175 176 self._sync(self._qmp.accept(), timeout) 177 178 ret = self._get_greeting() 179 assert ret is not None 180 return ret 181 182 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 183 """ 184 Send a QMP command to the QMP Monitor. 185 186 :param qmp_cmd: QMP command to be sent as a Python dict 187 :return: QMP response as a Python dict 188 """ 189 return dict( 190 self._sync( 191 # pylint: disable=protected-access 192 193 # _raw() isn't a public API, because turning off 194 # automatic ID assignment is discouraged. For 195 # compatibility with iotests *only*, do it anyway. 196 self._qmp._raw(qmp_cmd, assign_id=False), 197 self._timeout 198 ) 199 ) 200 201 def cmd(self, name: str, 202 args: Optional[Dict[str, object]] = None, 203 cmd_id: Optional[object] = None) -> QMPMessage: 204 """ 205 Build a QMP command and send it to the QMP Monitor. 206 207 :param name: command name (string) 208 :param args: command arguments (dict) 209 :param cmd_id: command id (dict, list, string or int) 210 """ 211 qmp_cmd: QMPMessage = {'execute': name} 212 if args: 213 qmp_cmd['arguments'] = args 214 if cmd_id: 215 qmp_cmd['id'] = cmd_id 216 return self.cmd_obj(qmp_cmd) 217 218 def command(self, cmd: str, **kwds: object) -> QMPReturnValue: 219 """ 220 Build and send a QMP command to the monitor, report errors if any 221 """ 222 return self._sync( 223 self._qmp.execute(cmd, kwds), 224 self._timeout 225 ) 226 227 def pull_event(self, 228 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 229 """ 230 Pulls a single event. 231 232 :param wait: 233 If False or 0, do not wait. Return None if no events ready. 234 If True, wait forever until the next event. 235 Otherwise, wait for the specified number of seconds. 236 237 :raise asyncio.TimeoutError: 238 When a timeout is requested and the timeout period elapses. 239 240 :return: The first available QMP event, or None. 241 """ 242 if not wait: 243 # wait is False/0: "do not wait, do not except." 244 if self._qmp.events.empty(): 245 return None 246 247 # If wait is 'True', wait forever. If wait is False/0, the events 248 # queue must not be empty; but it still needs some real amount 249 # of time to complete. 250 timeout = None 251 if wait and isinstance(wait, float): 252 timeout = wait 253 254 return dict( 255 self._sync( 256 self._qmp.events.get(), 257 timeout 258 ) 259 ) 260 261 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: 262 """ 263 Get a list of QMP events and clear all pending events. 264 265 :param wait: 266 If False or 0, do not wait. Return None if no events ready. 267 If True, wait until we have at least one event. 268 Otherwise, wait for up to the specified number of seconds for at 269 least one event. 270 271 :raise asyncio.TimeoutError: 272 When a timeout is requested and the timeout period elapses. 273 274 :return: A list of QMP events. 275 """ 276 events = [dict(x) for x in self._qmp.events.clear()] 277 if events: 278 return events 279 280 event = self.pull_event(wait) 281 return [event] if event is not None else [] 282 283 def clear_events(self) -> None: 284 """Clear current list of pending events.""" 285 self._qmp.events.clear() 286 287 def close(self) -> None: 288 """Close the connection.""" 289 self._sync( 290 self._qmp.disconnect() 291 ) 292 293 def settimeout(self, timeout: Optional[float]) -> None: 294 """ 295 Set the timeout for QMP RPC execution. 296 297 This timeout affects the `cmd`, `cmd_obj`, and `command` methods. 298 The `accept`, `pull_event` and `get_event` methods have their 299 own configurable timeouts. 300 301 :param timeout: 302 timeout in seconds, or None. 303 None will wait indefinitely. 304 """ 305 self._timeout = timeout 306 307 def send_fd_scm(self, fd: int) -> None: 308 """ 309 Send a file descriptor to the remote via SCM_RIGHTS. 310 """ 311 self._qmp.send_fd_scm(fd) 312 313 def __del__(self) -> None: 314 if self._qmp.runstate == Runstate.IDLE: 315 return 316 317 if not self._aloop.is_running(): 318 self.close() 319 else: 320 # Garbage collection ran while the event loop was running. 321 # Nothing we can do about it now, but if we don't raise our 322 # own error, the user will be treated to a lot of traceback 323 # they might not understand. 324 raise QMPError( 325 "QEMUMonitorProtocol.close()" 326 " was not called before object was garbage collected" 327 ) 328