1""" 2(Legacy) Sync QMP Wrapper 3 4This module provides the `QEMUMonitorProtocol` class, which is a 5synchronous wrapper around `QMPClient`. 6 7Its design closely resembles that of the original QEMUMonitorProtocol 8class, originally written by Luiz Capitulino. It is provided here for 9compatibility with scripts inside the QEMU source tree that expect the 10old interface. 11""" 12 13# 14# Copyright (C) 2009-2022 Red Hat Inc. 15# 16# Authors: 17# Luiz Capitulino <lcapitulino@redhat.com> 18# John Snow <jsnow@redhat.com> 19# 20# This work is licensed under the terms of the GNU GPL, version 2. See 21# the COPYING file in the top-level directory. 22# 23 24import asyncio 25from types import TracebackType 26from typing import ( 27 Any, 28 Awaitable, 29 Dict, 30 List, 31 Optional, 32 Type, 33 TypeVar, 34 Union, 35) 36 37from .error import QMPError 38from .protocol import Runstate, SocketAddrT 39from .qmp_client import QMPClient 40 41 42#: QMPMessage is an entire QMP message of any kind. 43QMPMessage = Dict[str, Any] 44 45#: QMPReturnValue is the 'return' value of a command. 46QMPReturnValue = object 47 48#: QMPObject is any object in a QMP message. 49QMPObject = Dict[str, object] 50 51# QMPMessage can be outgoing commands or incoming events/returns. 52# QMPReturnValue is usually a dict/json object, but due to QAPI's 53# 'command-returns-exceptions', it can actually be anything. 54# 55# {'return': {}} is a QMPMessage, 56# {} is the QMPReturnValue. 57 58 59class QMPBadPortError(QMPError): 60 """ 61 Unable to parse socket address: Port was non-numerical. 62 """ 63 64 65class QEMUMonitorProtocol: 66 """ 67 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) 68 and then allow to handle commands and events. 69 70 :param address: QEMU address, can be either a unix socket path (string) 71 or a tuple in the form ( address, port ) for a TCP 72 connection 73 :param server: Act as the socket server. (See 'accept') 74 :param nickname: Optional nickname used for logging. 75 """ 76 77 def __init__(self, address: SocketAddrT, 78 server: bool = False, 79 nickname: Optional[str] = None): 80 81 self._qmp = QMPClient(nickname) 82 self._aloop = asyncio.get_event_loop() 83 self._address = address 84 self._timeout: Optional[float] = None 85 86 if server: 87 self._sync(self._qmp.start_server(self._address)) 88 89 _T = TypeVar('_T') 90 91 def _sync( 92 self, future: Awaitable[_T], timeout: Optional[float] = None 93 ) -> _T: 94 return self._aloop.run_until_complete( 95 asyncio.wait_for(future, timeout=timeout) 96 ) 97 98 def _get_greeting(self) -> Optional[QMPMessage]: 99 if self._qmp.greeting is not None: 100 # pylint: disable=protected-access 101 return self._qmp.greeting._asdict() 102 return None 103 104 def __enter__(self: _T) -> _T: 105 # Implement context manager enter function. 106 return self 107 108 def __exit__(self, 109 exc_type: Optional[Type[BaseException]], 110 exc_val: Optional[BaseException], 111 exc_tb: Optional[TracebackType]) -> None: 112 # Implement context manager exit function. 113 self.close() 114 115 @classmethod 116 def parse_address(cls, address: str) -> SocketAddrT: 117 """ 118 Parse a string into a QMP address. 119 120 Figure out if the argument is in the port:host form. 121 If it's not, it's probably a file path. 122 """ 123 components = address.split(':') 124 if len(components) == 2: 125 try: 126 port = int(components[1]) 127 except ValueError: 128 msg = f"Bad port: '{components[1]}' in '{address}'." 129 raise QMPBadPortError(msg) from None 130 return (components[0], port) 131 132 # Treat as filepath. 133 return address 134 135 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 136 """ 137 Connect to the QMP Monitor and perform capabilities negotiation. 138 139 :return: QMP greeting dict, or None if negotiate is false 140 :raise ConnectError: on connection errors 141 """ 142 self._qmp.await_greeting = negotiate 143 self._qmp.negotiate = negotiate 144 145 self._sync( 146 self._qmp.connect(self._address) 147 ) 148 return self._get_greeting() 149 150 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 151 """ 152 Await connection from QMP Monitor and perform capabilities negotiation. 153 154 :param timeout: 155 timeout in seconds (nonnegative float number, or None). 156 If None, there is no timeout, and this may block forever. 157 158 :return: QMP greeting dict 159 :raise ConnectError: on connection errors 160 """ 161 self._qmp.await_greeting = True 162 self._qmp.negotiate = True 163 164 self._sync(self._qmp.accept(), timeout) 165 166 ret = self._get_greeting() 167 assert ret is not None 168 return ret 169 170 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 171 """ 172 Send a QMP command to the QMP Monitor. 173 174 :param qmp_cmd: QMP command to be sent as a Python dict 175 :return: QMP response as a Python dict 176 """ 177 return dict( 178 self._sync( 179 # pylint: disable=protected-access 180 181 # _raw() isn't a public API, because turning off 182 # automatic ID assignment is discouraged. For 183 # compatibility with iotests *only*, do it anyway. 184 self._qmp._raw(qmp_cmd, assign_id=False), 185 self._timeout 186 ) 187 ) 188 189 def cmd(self, name: str, 190 args: Optional[Dict[str, object]] = None, 191 cmd_id: Optional[object] = None) -> QMPMessage: 192 """ 193 Build a QMP command and send it to the QMP Monitor. 194 195 :param name: command name (string) 196 :param args: command arguments (dict) 197 :param cmd_id: command id (dict, list, string or int) 198 """ 199 qmp_cmd: QMPMessage = {'execute': name} 200 if args: 201 qmp_cmd['arguments'] = args 202 if cmd_id: 203 qmp_cmd['id'] = cmd_id 204 return self.cmd_obj(qmp_cmd) 205 206 def command(self, cmd: str, **kwds: object) -> QMPReturnValue: 207 """ 208 Build and send a QMP command to the monitor, report errors if any 209 """ 210 return self._sync( 211 self._qmp.execute(cmd, kwds), 212 self._timeout 213 ) 214 215 def pull_event(self, 216 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 217 """ 218 Pulls a single event. 219 220 :param wait: 221 If False or 0, do not wait. Return None if no events ready. 222 If True, wait forever until the next event. 223 Otherwise, wait for the specified number of seconds. 224 225 :raise asyncio.TimeoutError: 226 When a timeout is requested and the timeout period elapses. 227 228 :return: The first available QMP event, or None. 229 """ 230 if not wait: 231 # wait is False/0: "do not wait, do not except." 232 if self._qmp.events.empty(): 233 return None 234 235 # If wait is 'True', wait forever. If wait is False/0, the events 236 # queue must not be empty; but it still needs some real amount 237 # of time to complete. 238 timeout = None 239 if wait and isinstance(wait, float): 240 timeout = wait 241 242 return dict( 243 self._sync( 244 self._qmp.events.get(), 245 timeout 246 ) 247 ) 248 249 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: 250 """ 251 Get a list of QMP events and clear all pending events. 252 253 :param wait: 254 If False or 0, do not wait. Return None if no events ready. 255 If True, wait until we have at least one event. 256 Otherwise, wait for up to the specified number of seconds for at 257 least one event. 258 259 :raise asyncio.TimeoutError: 260 When a timeout is requested and the timeout period elapses. 261 262 :return: A list of QMP events. 263 """ 264 events = [dict(x) for x in self._qmp.events.clear()] 265 if events: 266 return events 267 268 event = self.pull_event(wait) 269 return [event] if event is not None else [] 270 271 def clear_events(self) -> None: 272 """Clear current list of pending events.""" 273 self._qmp.events.clear() 274 275 def close(self) -> None: 276 """Close the connection.""" 277 self._sync( 278 self._qmp.disconnect() 279 ) 280 281 def settimeout(self, timeout: Optional[float]) -> None: 282 """ 283 Set the timeout for QMP RPC execution. 284 285 This timeout affects the `cmd`, `cmd_obj`, and `command` methods. 286 The `accept`, `pull_event` and `get_event` methods have their 287 own configurable timeouts. 288 289 :param timeout: 290 timeout in seconds, or None. 291 None will wait indefinitely. 292 """ 293 self._timeout = timeout 294 295 def send_fd_scm(self, fd: int) -> None: 296 """ 297 Send a file descriptor to the remote via SCM_RIGHTS. 298 """ 299 self._qmp.send_fd_scm(fd) 300 301 def __del__(self) -> None: 302 if self._qmp.runstate == Runstate.IDLE: 303 return 304 305 if not self._aloop.is_running(): 306 self.close() 307 else: 308 # Garbage collection ran while the event loop was running. 309 # Nothing we can do about it now, but if we don't raise our 310 # own error, the user will be treated to a lot of traceback 311 # they might not understand. 312 raise QMPError( 313 "QEMUMonitorProtocol.close()" 314 " was not called before object was garbage collected" 315 ) 316