1""" 2(Legacy) Sync QMP Wrapper 3 4This module provides the `QEMUMonitorProtocol` class, which is a 5synchronous wrapper around `QMPClient`. 6 7Its design closely resembles that of the original QEMUMonitorProtocol 8class, originally written by Luiz Capitulino. It is provided here for 9compatibility with scripts inside the QEMU source tree that expect the 10old interface. 11""" 12 13# 14# Copyright (C) 2009-2022 Red Hat Inc. 15# 16# Authors: 17# Luiz Capitulino <lcapitulino@redhat.com> 18# John Snow <jsnow@redhat.com> 19# 20# This work is licensed under the terms of the GNU GPL, version 2. See 21# the COPYING file in the top-level directory. 22# 23 24import asyncio 25import socket 26from types import TracebackType 27from typing import ( 28 Any, 29 Awaitable, 30 Dict, 31 List, 32 Optional, 33 Type, 34 TypeVar, 35 Union, 36) 37 38from .error import QMPError 39from .protocol import Runstate, SocketAddrT 40from .qmp_client import QMPClient 41 42 43#: QMPMessage is an entire QMP message of any kind. 44QMPMessage = Dict[str, Any] 45 46#: QMPReturnValue is the 'return' value of a command. 47QMPReturnValue = object 48 49#: QMPObject is any object in a QMP message. 50QMPObject = Dict[str, object] 51 52# QMPMessage can be outgoing commands or incoming events/returns. 53# QMPReturnValue is usually a dict/json object, but due to QAPI's 54# 'command-returns-exceptions', it can actually be anything. 55# 56# {'return': {}} is a QMPMessage, 57# {} is the QMPReturnValue. 58 59 60class QMPBadPortError(QMPError): 61 """ 62 Unable to parse socket address: Port was non-numerical. 63 """ 64 65 66class QEMUMonitorProtocol: 67 """ 68 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) 69 and then allow to handle commands and events. 70 71 :param address: QEMU address, can be a unix socket path (string), a tuple 72 in the form ( address, port ) for a TCP connection, or an 73 existing `socket.socket` object. 74 :param server: Act as the socket server. (See 'accept') 75 Not applicable when passing a socket directly. 76 :param nickname: Optional nickname used for logging. 77 """ 78 79 def __init__(self, 80 address: Union[SocketAddrT, socket.socket], 81 server: bool = False, 82 nickname: Optional[str] = None): 83 84 if server and isinstance(address, socket.socket): 85 raise ValueError( 86 "server argument should be False when passing a socket") 87 88 self._qmp = QMPClient(nickname) 89 self._aloop = asyncio.get_event_loop() 90 self._address = address 91 self._timeout: Optional[float] = None 92 93 if server: 94 assert not isinstance(self._address, socket.socket) 95 self._sync(self._qmp.start_server(self._address)) 96 97 _T = TypeVar('_T') 98 99 def _sync( 100 self, future: Awaitable[_T], timeout: Optional[float] = None 101 ) -> _T: 102 return self._aloop.run_until_complete( 103 asyncio.wait_for(future, timeout=timeout) 104 ) 105 106 def _get_greeting(self) -> Optional[QMPMessage]: 107 if self._qmp.greeting is not None: 108 # pylint: disable=protected-access 109 return self._qmp.greeting._asdict() 110 return None 111 112 def __enter__(self: _T) -> _T: 113 # Implement context manager enter function. 114 return self 115 116 def __exit__(self, 117 exc_type: Optional[Type[BaseException]], 118 exc_val: Optional[BaseException], 119 exc_tb: Optional[TracebackType]) -> None: 120 # Implement context manager exit function. 121 self.close() 122 123 @classmethod 124 def parse_address(cls, address: str) -> SocketAddrT: 125 """ 126 Parse a string into a QMP address. 127 128 Figure out if the argument is in the port:host form. 129 If it's not, it's probably a file path. 130 """ 131 components = address.split(':') 132 if len(components) == 2: 133 try: 134 port = int(components[1]) 135 except ValueError: 136 msg = f"Bad port: '{components[1]}' in '{address}'." 137 raise QMPBadPortError(msg) from None 138 return (components[0], port) 139 140 # Treat as filepath. 141 return address 142 143 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 144 """ 145 Connect to the QMP Monitor and perform capabilities negotiation. 146 147 :return: QMP greeting dict, or None if negotiate is false 148 :raise ConnectError: on connection errors 149 """ 150 self._qmp.await_greeting = negotiate 151 self._qmp.negotiate = negotiate 152 153 self._sync( 154 self._qmp.connect(self._address) 155 ) 156 return self._get_greeting() 157 158 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 159 """ 160 Await connection from QMP Monitor and perform capabilities negotiation. 161 162 :param timeout: 163 timeout in seconds (nonnegative float number, or None). 164 If None, there is no timeout, and this may block forever. 165 166 :return: QMP greeting dict 167 :raise ConnectError: on connection errors 168 """ 169 self._qmp.await_greeting = True 170 self._qmp.negotiate = True 171 172 self._sync(self._qmp.accept(), timeout) 173 174 ret = self._get_greeting() 175 assert ret is not None 176 return ret 177 178 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 179 """ 180 Send a QMP command to the QMP Monitor. 181 182 :param qmp_cmd: QMP command to be sent as a Python dict 183 :return: QMP response as a Python dict 184 """ 185 return dict( 186 self._sync( 187 # pylint: disable=protected-access 188 189 # _raw() isn't a public API, because turning off 190 # automatic ID assignment is discouraged. For 191 # compatibility with iotests *only*, do it anyway. 192 self._qmp._raw(qmp_cmd, assign_id=False), 193 self._timeout 194 ) 195 ) 196 197 def cmd(self, name: str, 198 args: Optional[Dict[str, object]] = None, 199 cmd_id: Optional[object] = None) -> QMPMessage: 200 """ 201 Build a QMP command and send it to the QMP Monitor. 202 203 :param name: command name (string) 204 :param args: command arguments (dict) 205 :param cmd_id: command id (dict, list, string or int) 206 """ 207 qmp_cmd: QMPMessage = {'execute': name} 208 if args: 209 qmp_cmd['arguments'] = args 210 if cmd_id: 211 qmp_cmd['id'] = cmd_id 212 return self.cmd_obj(qmp_cmd) 213 214 def command(self, cmd: str, **kwds: object) -> QMPReturnValue: 215 """ 216 Build and send a QMP command to the monitor, report errors if any 217 """ 218 return self._sync( 219 self._qmp.execute(cmd, kwds), 220 self._timeout 221 ) 222 223 def pull_event(self, 224 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 225 """ 226 Pulls a single event. 227 228 :param wait: 229 If False or 0, do not wait. Return None if no events ready. 230 If True, wait forever until the next event. 231 Otherwise, wait for the specified number of seconds. 232 233 :raise asyncio.TimeoutError: 234 When a timeout is requested and the timeout period elapses. 235 236 :return: The first available QMP event, or None. 237 """ 238 if not wait: 239 # wait is False/0: "do not wait, do not except." 240 if self._qmp.events.empty(): 241 return None 242 243 # If wait is 'True', wait forever. If wait is False/0, the events 244 # queue must not be empty; but it still needs some real amount 245 # of time to complete. 246 timeout = None 247 if wait and isinstance(wait, float): 248 timeout = wait 249 250 return dict( 251 self._sync( 252 self._qmp.events.get(), 253 timeout 254 ) 255 ) 256 257 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: 258 """ 259 Get a list of QMP events and clear all pending events. 260 261 :param wait: 262 If False or 0, do not wait. Return None if no events ready. 263 If True, wait until we have at least one event. 264 Otherwise, wait for up to the specified number of seconds for at 265 least one event. 266 267 :raise asyncio.TimeoutError: 268 When a timeout is requested and the timeout period elapses. 269 270 :return: A list of QMP events. 271 """ 272 events = [dict(x) for x in self._qmp.events.clear()] 273 if events: 274 return events 275 276 event = self.pull_event(wait) 277 return [event] if event is not None else [] 278 279 def clear_events(self) -> None: 280 """Clear current list of pending events.""" 281 self._qmp.events.clear() 282 283 def close(self) -> None: 284 """Close the connection.""" 285 self._sync( 286 self._qmp.disconnect() 287 ) 288 289 def settimeout(self, timeout: Optional[float]) -> None: 290 """ 291 Set the timeout for QMP RPC execution. 292 293 This timeout affects the `cmd`, `cmd_obj`, and `command` methods. 294 The `accept`, `pull_event` and `get_event` methods have their 295 own configurable timeouts. 296 297 :param timeout: 298 timeout in seconds, or None. 299 None will wait indefinitely. 300 """ 301 self._timeout = timeout 302 303 def send_fd_scm(self, fd: int) -> None: 304 """ 305 Send a file descriptor to the remote via SCM_RIGHTS. 306 """ 307 self._qmp.send_fd_scm(fd) 308 309 def __del__(self) -> None: 310 if self._qmp.runstate == Runstate.IDLE: 311 return 312 313 if not self._aloop.is_running(): 314 self.close() 315 else: 316 # Garbage collection ran while the event loop was running. 317 # Nothing we can do about it now, but if we don't raise our 318 # own error, the user will be treated to a lot of traceback 319 # they might not understand. 320 raise QMPError( 321 "QEMUMonitorProtocol.close()" 322 " was not called before object was garbage collected" 323 ) 324