1""" 2QEMU Monitor Protocol (QMP) development library & tooling. 3 4This package provides a fairly low-level class for communicating to QMP 5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the 6QEMU Storage Daemon. This library is not intended for production use. 7 8`QEMUMonitorProtocol` is the primary class of interest, and all errors 9raised derive from `QMPError`. 10""" 11 12# Copyright (C) 2009, 2010 Red Hat Inc. 13# 14# Authors: 15# Luiz Capitulino <lcapitulino@redhat.com> 16# 17# This work is licensed under the terms of the GNU GPL, version 2. See 18# the COPYING file in the top-level directory. 19 20import errno 21import json 22import logging 23import socket 24from types import TracebackType 25from typing import ( 26 Any, 27 Dict, 28 List, 29 Optional, 30 TextIO, 31 Tuple, 32 Type, 33 Union, 34 cast, 35) 36 37 38# QMPMessage is a QMP Message of any kind. 39# e.g. {'yee': 'haw'} 40# 41# QMPReturnValue is the inner value of return values only. 42# {'return': {}} is the QMPMessage, 43# {} is the QMPReturnValue. 44QMPMessage = Dict[str, Any] 45QMPReturnValue = Dict[str, Any] 46 47InternetAddrT = Tuple[str, str] 48UnixAddrT = str 49SocketAddrT = Union[InternetAddrT, UnixAddrT] 50 51 52class QMPError(Exception): 53 """ 54 QMP base exception 55 """ 56 57 58class QMPConnectError(QMPError): 59 """ 60 QMP connection exception 61 """ 62 63 64class QMPCapabilitiesError(QMPError): 65 """ 66 QMP negotiate capabilities exception 67 """ 68 69 70class QMPTimeoutError(QMPError): 71 """ 72 QMP timeout exception 73 """ 74 75 76class QMPProtocolError(QMPError): 77 """ 78 QMP protocol error; unexpected response 79 """ 80 81 82class QMPResponseError(QMPError): 83 """ 84 Represents erroneous QMP monitor reply 85 """ 86 def __init__(self, reply: QMPMessage): 87 try: 88 desc = reply['error']['desc'] 89 except KeyError: 90 desc = reply 91 super().__init__(desc) 92 self.reply = reply 93 94 95class QEMUMonitorProtocol: 96 """ 97 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then 98 allow to handle commands and events. 99 """ 100 101 #: Logger object for debugging messages 102 logger = logging.getLogger('QMP') 103 104 def __init__(self, address: SocketAddrT, 105 server: bool = False, 106 nickname: Optional[str] = None): 107 """ 108 Create a QEMUMonitorProtocol class. 109 110 @param address: QEMU address, can be either a unix socket path (string) 111 or a tuple in the form ( address, port ) for a TCP 112 connection 113 @param server: server mode listens on the socket (bool) 114 @raise OSError on socket connection errors 115 @note No connection is established, this is done by the connect() or 116 accept() methods 117 """ 118 self.__events: List[QMPMessage] = [] 119 self.__address = address 120 self.__sock = self.__get_sock() 121 self.__sockfile: Optional[TextIO] = None 122 self._nickname = nickname 123 if self._nickname: 124 self.logger = logging.getLogger('QMP').getChild(self._nickname) 125 if server: 126 self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 127 self.__sock.bind(self.__address) 128 self.__sock.listen(1) 129 130 def __get_sock(self) -> socket.socket: 131 if isinstance(self.__address, tuple): 132 family = socket.AF_INET 133 else: 134 family = socket.AF_UNIX 135 return socket.socket(family, socket.SOCK_STREAM) 136 137 def __negotiate_capabilities(self) -> QMPMessage: 138 greeting = self.__json_read() 139 if greeting is None or "QMP" not in greeting: 140 raise QMPConnectError 141 # Greeting seems ok, negotiate capabilities 142 resp = self.cmd('qmp_capabilities') 143 if resp and "return" in resp: 144 return greeting 145 raise QMPCapabilitiesError 146 147 def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]: 148 assert self.__sockfile is not None 149 while True: 150 data = self.__sockfile.readline() 151 if not data: 152 return None 153 # By definition, any JSON received from QMP is a QMPMessage, 154 # and we are asserting only at static analysis time that it 155 # has a particular shape. 156 resp: QMPMessage = json.loads(data) 157 if 'event' in resp: 158 self.logger.debug("<<< %s", resp) 159 self.__events.append(resp) 160 if not only_event: 161 continue 162 return resp 163 164 def __get_events(self, wait: Union[bool, float] = False) -> None: 165 """ 166 Check for new events in the stream and cache them in __events. 167 168 @param wait (bool): block until an event is available. 169 @param wait (float): If wait is a float, treat it as a timeout value. 170 171 @raise QMPTimeoutError: If a timeout float is provided and the timeout 172 period elapses. 173 @raise QMPConnectError: If wait is True but no events could be 174 retrieved or if some other error occurred. 175 """ 176 177 # Current timeout and blocking status 178 current_timeout = self.__sock.gettimeout() 179 180 # Check for new events regardless and pull them into the cache: 181 self.__sock.settimeout(0) # i.e. setblocking(False) 182 try: 183 self.__json_read() 184 except OSError as err: 185 # EAGAIN: No data available; not critical 186 if err.errno != errno.EAGAIN: 187 raise 188 finally: 189 self.__sock.settimeout(current_timeout) 190 191 # Wait for new events, if needed. 192 # if wait is 0.0, this means "no wait" and is also implicitly false. 193 if not self.__events and wait: 194 if isinstance(wait, float): 195 self.__sock.settimeout(wait) 196 try: 197 ret = self.__json_read(only_event=True) 198 except socket.timeout as err: 199 raise QMPTimeoutError("Timeout waiting for event") from err 200 except Exception as err: 201 msg = "Error while reading from socket" 202 raise QMPConnectError(msg) from err 203 finally: 204 self.__sock.settimeout(current_timeout) 205 206 if ret is None: 207 raise QMPConnectError("Error while reading from socket") 208 209 def __enter__(self) -> 'QEMUMonitorProtocol': 210 # Implement context manager enter function. 211 return self 212 213 def __exit__(self, 214 # pylint: disable=duplicate-code 215 # see https://github.com/PyCQA/pylint/issues/3619 216 exc_type: Optional[Type[BaseException]], 217 exc_val: Optional[BaseException], 218 exc_tb: Optional[TracebackType]) -> None: 219 # Implement context manager exit function. 220 self.close() 221 222 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 223 """ 224 Connect to the QMP Monitor and perform capabilities negotiation. 225 226 @return QMP greeting dict, or None if negotiate is false 227 @raise OSError on socket connection errors 228 @raise QMPConnectError if the greeting is not received 229 @raise QMPCapabilitiesError if fails to negotiate capabilities 230 """ 231 self.__sock.connect(self.__address) 232 self.__sockfile = self.__sock.makefile(mode='r') 233 if negotiate: 234 return self.__negotiate_capabilities() 235 return None 236 237 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 238 """ 239 Await connection from QMP Monitor and perform capabilities negotiation. 240 241 @param timeout: timeout in seconds (nonnegative float number, or 242 None). The value passed will set the behavior of the 243 underneath QMP socket as described in [1]. 244 Default value is set to 15.0. 245 @return QMP greeting dict 246 @raise OSError on socket connection errors 247 @raise QMPConnectError if the greeting is not received 248 @raise QMPCapabilitiesError if fails to negotiate capabilities 249 250 [1] 251 https://docs.python.org/3/library/socket.html#socket.socket.settimeout 252 """ 253 self.__sock.settimeout(timeout) 254 self.__sock, _ = self.__sock.accept() 255 self.__sockfile = self.__sock.makefile(mode='r') 256 return self.__negotiate_capabilities() 257 258 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 259 """ 260 Send a QMP command to the QMP Monitor. 261 262 @param qmp_cmd: QMP command to be sent as a Python dict 263 @return QMP response as a Python dict 264 """ 265 self.logger.debug(">>> %s", qmp_cmd) 266 self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) 267 resp = self.__json_read() 268 if resp is None: 269 raise QMPConnectError("Unexpected empty reply from server") 270 self.logger.debug("<<< %s", resp) 271 return resp 272 273 def cmd(self, name: str, 274 args: Optional[Dict[str, Any]] = None, 275 cmd_id: Optional[Any] = None) -> QMPMessage: 276 """ 277 Build a QMP command and send it to the QMP Monitor. 278 279 @param name: command name (string) 280 @param args: command arguments (dict) 281 @param cmd_id: command id (dict, list, string or int) 282 """ 283 qmp_cmd: QMPMessage = {'execute': name} 284 if args: 285 qmp_cmd['arguments'] = args 286 if cmd_id: 287 qmp_cmd['id'] = cmd_id 288 return self.cmd_obj(qmp_cmd) 289 290 def command(self, cmd: str, **kwds: Any) -> QMPReturnValue: 291 """ 292 Build and send a QMP command to the monitor, report errors if any 293 """ 294 ret = self.cmd(cmd, kwds) 295 if 'error' in ret: 296 raise QMPResponseError(ret) 297 if 'return' not in ret: 298 raise QMPProtocolError( 299 "'return' key not found in QMP response '{}'".format(str(ret)) 300 ) 301 return cast(QMPReturnValue, ret['return']) 302 303 def pull_event(self, 304 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 305 """ 306 Pulls a single event. 307 308 @param wait (bool): block until an event is available. 309 @param wait (float): If wait is a float, treat it as a timeout value. 310 311 @raise QMPTimeoutError: If a timeout float is provided and the timeout 312 period elapses. 313 @raise QMPConnectError: If wait is True but no events could be 314 retrieved or if some other error occurred. 315 316 @return The first available QMP event, or None. 317 """ 318 self.__get_events(wait) 319 320 if self.__events: 321 return self.__events.pop(0) 322 return None 323 324 def get_events(self, wait: bool = False) -> List[QMPMessage]: 325 """ 326 Get a list of available QMP events. 327 328 @param wait (bool): block until an event is available. 329 @param wait (float): If wait is a float, treat it as a timeout value. 330 331 @raise QMPTimeoutError: If a timeout float is provided and the timeout 332 period elapses. 333 @raise QMPConnectError: If wait is True but no events could be 334 retrieved or if some other error occurred. 335 336 @return The list of available QMP events. 337 """ 338 self.__get_events(wait) 339 return self.__events 340 341 def clear_events(self) -> None: 342 """ 343 Clear current list of pending events. 344 """ 345 self.__events = [] 346 347 def close(self) -> None: 348 """ 349 Close the socket and socket file. 350 """ 351 if self.__sock: 352 self.__sock.close() 353 if self.__sockfile: 354 self.__sockfile.close() 355 356 def settimeout(self, timeout: Optional[float]) -> None: 357 """ 358 Set the socket timeout. 359 360 @param timeout (float): timeout in seconds (non-zero), or None. 361 @note This is a wrap around socket.settimeout 362 363 @raise ValueError: if timeout was set to 0. 364 """ 365 if timeout == 0: 366 msg = "timeout cannot be 0; this engages non-blocking mode." 367 msg += " Use 'None' instead to disable timeouts." 368 raise ValueError(msg) 369 self.__sock.settimeout(timeout) 370 371 def get_sock_fd(self) -> int: 372 """ 373 Get the socket file descriptor. 374 375 @return The file descriptor number. 376 """ 377 return self.__sock.fileno() 378 379 def is_scm_available(self) -> bool: 380 """ 381 Check if the socket allows for SCM_RIGHTS. 382 383 @return True if SCM_RIGHTS is available, otherwise False. 384 """ 385 return self.__sock.family == socket.AF_UNIX 386