1# 2# Copyright (C) 2009, 2010 Red Hat Inc. 3# 4# Authors: 5# Luiz Capitulino <lcapitulino@redhat.com> 6# 7# This work is licensed under the terms of the GNU GPL, version 2. See 8# the COPYING file in the top-level directory. 9# 10 11""" 12Low-level QEMU shell on top of QMP. 13 14usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 15 16positional arguments: 17 qmp_server < UNIX socket path | TCP address:port > 18 19optional arguments: 20 -h, --help show this help message and exit 21 -H, --hmp Use HMP interface 22 -N, --skip-negotiation 23 Skip negotiate (for qemu-ga) 24 -v, --verbose Verbose (echo commands sent and received) 25 -p, --pretty Pretty-print JSON 26 27 28Start QEMU with: 29 30# qemu [...] -qmp unix:./qmp-sock,server 31 32Run the shell: 33 34$ qmp-shell ./qmp-sock 35 36Commands have the following format: 37 38 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 39 40For example: 41 42(QEMU) device_add driver=e1000 id=net1 43{'return': {}} 44(QEMU) 45 46key=value pairs also support Python or JSON object literal subset notations, 47without spaces. Dictionaries/objects {} are supported as are arrays []. 48 49 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 50 51Both JSON and Python formatting should work, including both styles of 52string literal quotes. Both paradigms of literal values should work, 53including null/true/false for JSON and None/True/False for Python. 54 55 56Transactions have the following multi-line format: 57 58 transaction( 59 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 60 ... 61 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 62 ) 63 64One line transactions are also supported: 65 66 transaction( action-name1 ... ) 67 68For example: 69 70 (QEMU) transaction( 71 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 72 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 73 TRANS> ) 74 {"return": {}} 75 (QEMU) 76 77Use the -v and -p options to activate the verbose and pretty-print options, 78which will echo back the properly formatted JSON-compliant QMP that is being 79sent to QEMU, which is useful for debugging and documentation generation. 80""" 81 82import argparse 83import ast 84import json 85import logging 86import os 87import re 88import readline 89import sys 90from typing import ( 91 Iterator, 92 List, 93 NoReturn, 94 Optional, 95 Sequence, 96) 97 98from qemu import qmp 99from qemu.qmp import QMPMessage 100 101 102LOG = logging.getLogger(__name__) 103 104 105class QMPCompleter: 106 """ 107 QMPCompleter provides a readline library tab-complete behavior. 108 """ 109 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 110 # but pylint as of today does not know that List[str] is simply 'list'. 111 def __init__(self) -> None: 112 self._matches: List[str] = [] 113 114 def append(self, value: str) -> None: 115 """Append a new valid completion to the list of possibilities.""" 116 return self._matches.append(value) 117 118 def complete(self, text: str, state: int) -> Optional[str]: 119 """readline.set_completer() callback implementation.""" 120 for cmd in self._matches: 121 if cmd.startswith(text): 122 if state == 0: 123 return cmd 124 state -= 1 125 return None 126 127 128class QMPShellError(qmp.QMPError): 129 """ 130 QMP Shell Base error class. 131 """ 132 133 134class FuzzyJSON(ast.NodeTransformer): 135 """ 136 This extension of ast.NodeTransformer filters literal "true/false/null" 137 values in a Python AST and replaces them by proper "True/False/None" values 138 that Python can properly evaluate. 139 """ 140 141 @classmethod 142 def visit_Name(cls, # pylint: disable=invalid-name 143 node: ast.Name) -> ast.AST: 144 """ 145 Transform Name nodes with certain values into Constant (keyword) nodes. 146 """ 147 if node.id == 'true': 148 return ast.Constant(value=True) 149 if node.id == 'false': 150 return ast.Constant(value=False) 151 if node.id == 'null': 152 return ast.Constant(value=None) 153 return node 154 155 156class QMPShell(qmp.QEMUMonitorProtocol): 157 """ 158 QMPShell provides a basic readline-based QMP shell. 159 160 :param address: Address of the QMP server. 161 :param pretty: Pretty-print QMP messages. 162 :param verbose: Echo outgoing QMP messages to console. 163 """ 164 def __init__(self, address: qmp.SocketAddrT, 165 pretty: bool = False, verbose: bool = False): 166 super().__init__(address) 167 self._greeting: Optional[QMPMessage] = None 168 self._completer = QMPCompleter() 169 self._transmode = False 170 self._actions: List[QMPMessage] = [] 171 self._histfile = os.path.join(os.path.expanduser('~'), 172 '.qmp-shell_history') 173 self.pretty = pretty 174 self.verbose = verbose 175 176 def close(self) -> None: 177 # Hook into context manager of parent to save shell history. 178 self._save_history() 179 super().close() 180 181 def _fill_completion(self) -> None: 182 cmds = self.cmd('query-commands') 183 if 'error' in cmds: 184 return 185 for cmd in cmds['return']: 186 self._completer.append(cmd['name']) 187 188 def _completer_setup(self) -> None: 189 self._completer = QMPCompleter() 190 self._fill_completion() 191 readline.set_history_length(1024) 192 readline.set_completer(self._completer.complete) 193 readline.parse_and_bind("tab: complete") 194 # NB: default delimiters conflict with some command names 195 # (eg. query-), clearing everything as it doesn't seem to matter 196 readline.set_completer_delims('') 197 try: 198 readline.read_history_file(self._histfile) 199 except FileNotFoundError: 200 pass 201 except IOError as err: 202 msg = f"Failed to read history '{self._histfile}': {err!s}" 203 LOG.warning(msg) 204 205 def _save_history(self) -> None: 206 try: 207 readline.write_history_file(self._histfile) 208 except IOError as err: 209 msg = f"Failed to save history file '{self._histfile}': {err!s}" 210 LOG.warning(msg) 211 212 @classmethod 213 def _parse_value(cls, val: str) -> object: 214 try: 215 return int(val) 216 except ValueError: 217 pass 218 219 if val.lower() == 'true': 220 return True 221 if val.lower() == 'false': 222 return False 223 if val.startswith(('{', '[')): 224 # Try first as pure JSON: 225 try: 226 return json.loads(val) 227 except ValueError: 228 pass 229 # Try once again as FuzzyJSON: 230 try: 231 tree = ast.parse(val, mode='eval') 232 transformed = FuzzyJSON().visit(tree) 233 return ast.literal_eval(transformed) 234 except (SyntaxError, ValueError): 235 pass 236 return val 237 238 def _cli_expr(self, 239 tokens: Sequence[str], 240 parent: qmp.QMPObject) -> None: 241 for arg in tokens: 242 (key, sep, val) = arg.partition('=') 243 if sep != '=': 244 raise QMPShellError( 245 f"Expected a key=value pair, got '{arg!s}'" 246 ) 247 248 value = self._parse_value(val) 249 optpath = key.split('.') 250 curpath = [] 251 for path in optpath[:-1]: 252 curpath.append(path) 253 obj = parent.get(path, {}) 254 if not isinstance(obj, dict): 255 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 256 raise QMPShellError(msg.format('.'.join(curpath))) 257 parent[path] = obj 258 parent = obj 259 if optpath[-1] in parent: 260 if isinstance(parent[optpath[-1]], dict): 261 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 262 raise QMPShellError(msg.format('.'.join(curpath))) 263 raise QMPShellError(f'Cannot set "{key}" multiple times') 264 parent[optpath[-1]] = value 265 266 def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 267 """ 268 Build a QMP input object from a user provided command-line in the 269 following format: 270 271 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 272 """ 273 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 274 cmdargs = re.findall(argument_regex, cmdline) 275 qmpcmd: QMPMessage 276 277 # Transactional CLI entry: 278 if cmdargs and cmdargs[0] == 'transaction(': 279 self._transmode = True 280 self._actions = [] 281 cmdargs.pop(0) 282 283 # Transactional CLI exit: 284 if cmdargs and cmdargs[0] == ')' and self._transmode: 285 self._transmode = False 286 if len(cmdargs) > 1: 287 msg = 'Unexpected input after close of Transaction sub-shell' 288 raise QMPShellError(msg) 289 qmpcmd = { 290 'execute': 'transaction', 291 'arguments': {'actions': self._actions} 292 } 293 return qmpcmd 294 295 # No args, or no args remaining 296 if not cmdargs: 297 return None 298 299 if self._transmode: 300 # Parse and cache this Transactional Action 301 finalize = False 302 action = {'type': cmdargs[0], 'data': {}} 303 if cmdargs[-1] == ')': 304 cmdargs.pop(-1) 305 finalize = True 306 self._cli_expr(cmdargs[1:], action['data']) 307 self._actions.append(action) 308 return self._build_cmd(')') if finalize else None 309 310 # Standard command: parse and return it to be executed. 311 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 312 self._cli_expr(cmdargs[1:], qmpcmd['arguments']) 313 return qmpcmd 314 315 def _print(self, qmp_message: object) -> None: 316 jsobj = json.dumps(qmp_message, 317 indent=4 if self.pretty else None, 318 sort_keys=self.pretty) 319 print(str(jsobj)) 320 321 def _execute_cmd(self, cmdline: str) -> bool: 322 try: 323 qmpcmd = self._build_cmd(cmdline) 324 except QMPShellError as err: 325 print( 326 f"Error while parsing command line: {err!s}\n" 327 "command format: <command-name> " 328 "[arg-name1=arg1] ... [arg-nameN=argN", 329 file=sys.stderr 330 ) 331 return True 332 # For transaction mode, we may have just cached the action: 333 if qmpcmd is None: 334 return True 335 if self.verbose: 336 self._print(qmpcmd) 337 resp = self.cmd_obj(qmpcmd) 338 if resp is None: 339 print('Disconnected') 340 return False 341 self._print(resp) 342 return True 343 344 def connect(self, negotiate: bool = True) -> None: 345 self._greeting = super().connect(negotiate) 346 self._completer_setup() 347 348 def show_banner(self, 349 msg: str = 'Welcome to the QMP low-level shell!') -> None: 350 """ 351 Print to stdio a greeting, and the QEMU version if available. 352 """ 353 print(msg) 354 if not self._greeting: 355 print('Connected') 356 return 357 version = self._greeting['QMP']['version']['qemu'] 358 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 359 360 @property 361 def prompt(self) -> str: 362 """ 363 Return the current shell prompt, including a trailing space. 364 """ 365 if self._transmode: 366 return 'TRANS> ' 367 return '(QEMU) ' 368 369 def read_exec_command(self) -> bool: 370 """ 371 Read and execute a command. 372 373 @return True if execution was ok, return False if disconnected. 374 """ 375 try: 376 cmdline = input(self.prompt) 377 except EOFError: 378 print() 379 return False 380 381 if cmdline == '': 382 for event in self.get_events(): 383 print(event) 384 self.clear_events() 385 return True 386 387 return self._execute_cmd(cmdline) 388 389 def repl(self) -> Iterator[None]: 390 """ 391 Return an iterator that implements the REPL. 392 """ 393 self.show_banner() 394 while self.read_exec_command(): 395 yield 396 self.close() 397 398 399class HMPShell(QMPShell): 400 """ 401 HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. 402 403 :param address: Address of the QMP server. 404 :param pretty: Pretty-print QMP messages. 405 :param verbose: Echo outgoing QMP messages to console. 406 """ 407 def __init__(self, address: qmp.SocketAddrT, 408 pretty: bool = False, verbose: bool = False): 409 super().__init__(address, pretty, verbose) 410 self._cpu_index = 0 411 412 def _cmd_completion(self) -> None: 413 for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): 414 if cmd and cmd[0] != '[' and cmd[0] != '\t': 415 name = cmd.split()[0] # drop help text 416 if name == 'info': 417 continue 418 if name.find('|') != -1: 419 # Command in the form 'foobar|f' or 'f|foobar', take the 420 # full name 421 opt = name.split('|') 422 if len(opt[0]) == 1: 423 name = opt[1] 424 else: 425 name = opt[0] 426 self._completer.append(name) 427 self._completer.append('help ' + name) # help completion 428 429 def _info_completion(self) -> None: 430 for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): 431 if cmd: 432 self._completer.append('info ' + cmd.split()[1]) 433 434 def _other_completion(self) -> None: 435 # special cases 436 self._completer.append('help info') 437 438 def _fill_completion(self) -> None: 439 self._cmd_completion() 440 self._info_completion() 441 self._other_completion() 442 443 def _cmd_passthrough(self, cmdline: str, 444 cpu_index: int = 0) -> QMPMessage: 445 return self.cmd_obj({ 446 'execute': 'human-monitor-command', 447 'arguments': { 448 'command-line': cmdline, 449 'cpu-index': cpu_index 450 } 451 }) 452 453 def _execute_cmd(self, cmdline: str) -> bool: 454 if cmdline.split()[0] == "cpu": 455 # trap the cpu command, it requires special setting 456 try: 457 idx = int(cmdline.split()[1]) 458 if 'return' not in self._cmd_passthrough('info version', idx): 459 print('bad CPU index') 460 return True 461 self._cpu_index = idx 462 except ValueError: 463 print('cpu command takes an integer argument') 464 return True 465 resp = self._cmd_passthrough(cmdline, self._cpu_index) 466 if resp is None: 467 print('Disconnected') 468 return False 469 assert 'return' in resp or 'error' in resp 470 if 'return' in resp: 471 # Success 472 if len(resp['return']) > 0: 473 print(resp['return'], end=' ') 474 else: 475 # Error 476 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 477 return True 478 479 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 480 QMPShell.show_banner(self, msg) 481 482 483def die(msg: str) -> NoReturn: 484 """Write an error to stderr, then exit with a return code of 1.""" 485 sys.stderr.write('ERROR: %s\n' % msg) 486 sys.exit(1) 487 488 489def main() -> None: 490 """ 491 qmp-shell entry point: parse command line arguments and start the REPL. 492 """ 493 parser = argparse.ArgumentParser() 494 parser.add_argument('-H', '--hmp', action='store_true', 495 help='Use HMP interface') 496 parser.add_argument('-N', '--skip-negotiation', action='store_true', 497 help='Skip negotiate (for qemu-ga)') 498 parser.add_argument('-v', '--verbose', action='store_true', 499 help='Verbose (echo commands sent and received)') 500 parser.add_argument('-p', '--pretty', action='store_true', 501 help='Pretty-print JSON') 502 503 default_server = os.environ.get('QMP_SOCKET') 504 parser.add_argument('qmp_server', action='store', 505 default=default_server, 506 help='< UNIX socket path | TCP address:port >') 507 508 args = parser.parse_args() 509 if args.qmp_server is None: 510 parser.error("QMP socket or TCP address must be specified") 511 512 shell_class = HMPShell if args.hmp else QMPShell 513 514 try: 515 address = shell_class.parse_address(args.qmp_server) 516 except qmp.QMPBadPortError: 517 parser.error(f"Bad port number: {args.qmp_server}") 518 return # pycharm doesn't know error() is noreturn 519 520 with shell_class(address, args.pretty, args.verbose) as qemu: 521 try: 522 qemu.connect(negotiate=not args.skip_negotiation) 523 except qmp.QMPConnectError: 524 die("Didn't get QMP greeting message") 525 except qmp.QMPCapabilitiesError: 526 die("Couldn't negotiate capabilities") 527 except OSError as err: 528 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 529 530 for _ in qemu.repl(): 531 pass 532 533 534if __name__ == '__main__': 535 main() 536