1# 2# Copyright (C) 2009-2022 Red Hat Inc. 3# 4# Authors: 5# Luiz Capitulino <lcapitulino@redhat.com> 6# John Snow <jsnow@redhat.com> 7# 8# This work is licensed under the terms of the GNU LGPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10# 11 12""" 13Low-level QEMU shell on top of QMP. 14 15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 16 17positional arguments: 18 qmp_server < UNIX socket path | TCP address:port > 19 20optional arguments: 21 -h, --help show this help message and exit 22 -H, --hmp Use HMP interface 23 -N, --skip-negotiation 24 Skip negotiate (for qemu-ga) 25 -v, --verbose Verbose (echo commands sent and received) 26 -p, --pretty Pretty-print JSON 27 28 29Start QEMU with: 30 31# qemu [...] -qmp unix:./qmp-sock,server 32 33Run the shell: 34 35$ qmp-shell ./qmp-sock 36 37Commands have the following format: 38 39 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 40 41For example: 42 43(QEMU) device_add driver=e1000 id=net1 44{'return': {}} 45(QEMU) 46 47key=value pairs also support Python or JSON object literal subset notations, 48without spaces. Dictionaries/objects {} are supported as are arrays []. 49 50 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 51 52Both JSON and Python formatting should work, including both styles of 53string literal quotes. Both paradigms of literal values should work, 54including null/true/false for JSON and None/True/False for Python. 55 56 57Transactions have the following multi-line format: 58 59 transaction( 60 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 61 ... 62 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 63 ) 64 65One line transactions are also supported: 66 67 transaction( action-name1 ... ) 68 69For example: 70 71 (QEMU) transaction( 72 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 73 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 74 TRANS> ) 75 {"return": {}} 76 (QEMU) 77 78Use the -v and -p options to activate the verbose and pretty-print options, 79which will echo back the properly formatted JSON-compliant QMP that is being 80sent to QEMU, which is useful for debugging and documentation generation. 81""" 82 83import argparse 84import ast 85import json 86import logging 87import os 88import re 89import readline 90from subprocess import Popen 91import sys 92from typing import ( 93 IO, 94 Iterator, 95 List, 96 NoReturn, 97 Optional, 98 Sequence, 99) 100 101from qemu.qmp import ConnectError, QMPError, SocketAddrT 102from qemu.qmp.legacy import ( 103 QEMUMonitorProtocol, 104 QMPBadPortError, 105 QMPMessage, 106 QMPObject, 107) 108 109 110LOG = logging.getLogger(__name__) 111 112 113class QMPCompleter: 114 """ 115 QMPCompleter provides a readline library tab-complete behavior. 116 """ 117 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 118 # but pylint as of today does not know that List[str] is simply 'list'. 119 def __init__(self) -> None: 120 self._matches: List[str] = [] 121 122 def append(self, value: str) -> None: 123 """Append a new valid completion to the list of possibilities.""" 124 return self._matches.append(value) 125 126 def complete(self, text: str, state: int) -> Optional[str]: 127 """readline.set_completer() callback implementation.""" 128 for cmd in self._matches: 129 if cmd.startswith(text): 130 if state == 0: 131 return cmd 132 state -= 1 133 return None 134 135 136class QMPShellError(QMPError): 137 """ 138 QMP Shell Base error class. 139 """ 140 141 142class FuzzyJSON(ast.NodeTransformer): 143 """ 144 This extension of ast.NodeTransformer filters literal "true/false/null" 145 values in a Python AST and replaces them by proper "True/False/None" values 146 that Python can properly evaluate. 147 """ 148 149 @classmethod 150 def visit_Name(cls, # pylint: disable=invalid-name 151 node: ast.Name) -> ast.AST: 152 """ 153 Transform Name nodes with certain values into Constant (keyword) nodes. 154 """ 155 if node.id == 'true': 156 return ast.Constant(value=True) 157 if node.id == 'false': 158 return ast.Constant(value=False) 159 if node.id == 'null': 160 return ast.Constant(value=None) 161 return node 162 163 164class QMPShell(QEMUMonitorProtocol): 165 """ 166 QMPShell provides a basic readline-based QMP shell. 167 168 :param address: Address of the QMP server. 169 :param pretty: Pretty-print QMP messages. 170 :param verbose: Echo outgoing QMP messages to console. 171 """ 172 def __init__(self, address: SocketAddrT, 173 pretty: bool = False, 174 verbose: bool = False, 175 server: bool = False, 176 logfile: Optional[str] = None): 177 super().__init__(address, server=server) 178 self._greeting: Optional[QMPMessage] = None 179 self._completer = QMPCompleter() 180 self._transmode = False 181 self._actions: List[QMPMessage] = [] 182 self._histfile = os.path.join(os.path.expanduser('~'), 183 '.qmp-shell_history') 184 self.pretty = pretty 185 self.verbose = verbose 186 self.logfile = None 187 188 if logfile is not None: 189 self.logfile = open(logfile, "w", encoding='utf-8') 190 191 def close(self) -> None: 192 # Hook into context manager of parent to save shell history. 193 self._save_history() 194 super().close() 195 196 def _fill_completion(self) -> None: 197 cmds = self.cmd('query-commands') 198 if 'error' in cmds: 199 return 200 for cmd in cmds['return']: 201 self._completer.append(cmd['name']) 202 203 def _completer_setup(self) -> None: 204 self._completer = QMPCompleter() 205 self._fill_completion() 206 readline.set_history_length(1024) 207 readline.set_completer(self._completer.complete) 208 readline.parse_and_bind("tab: complete") 209 # NB: default delimiters conflict with some command names 210 # (eg. query-), clearing everything as it doesn't seem to matter 211 readline.set_completer_delims('') 212 try: 213 readline.read_history_file(self._histfile) 214 except FileNotFoundError: 215 pass 216 except IOError as err: 217 msg = f"Failed to read history '{self._histfile}': {err!s}" 218 LOG.warning(msg) 219 220 def _save_history(self) -> None: 221 try: 222 readline.write_history_file(self._histfile) 223 except IOError as err: 224 msg = f"Failed to save history file '{self._histfile}': {err!s}" 225 LOG.warning(msg) 226 227 @classmethod 228 def _parse_value(cls, val: str) -> object: 229 try: 230 return int(val) 231 except ValueError: 232 pass 233 234 if val.lower() == 'true': 235 return True 236 if val.lower() == 'false': 237 return False 238 if val.startswith(('{', '[')): 239 # Try first as pure JSON: 240 try: 241 return json.loads(val) 242 except ValueError: 243 pass 244 # Try once again as FuzzyJSON: 245 try: 246 tree = ast.parse(val, mode='eval') 247 transformed = FuzzyJSON().visit(tree) 248 return ast.literal_eval(transformed) 249 except (SyntaxError, ValueError): 250 pass 251 return val 252 253 def _cli_expr(self, 254 tokens: Sequence[str], 255 parent: QMPObject) -> None: 256 for arg in tokens: 257 (key, sep, val) = arg.partition('=') 258 if sep != '=': 259 raise QMPShellError( 260 f"Expected a key=value pair, got '{arg!s}'" 261 ) 262 263 value = self._parse_value(val) 264 optpath = key.split('.') 265 curpath = [] 266 for path in optpath[:-1]: 267 curpath.append(path) 268 obj = parent.get(path, {}) 269 if not isinstance(obj, dict): 270 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 271 raise QMPShellError(msg.format('.'.join(curpath))) 272 parent[path] = obj 273 parent = obj 274 if optpath[-1] in parent: 275 if isinstance(parent[optpath[-1]], dict): 276 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 277 raise QMPShellError(msg.format('.'.join(curpath))) 278 raise QMPShellError(f'Cannot set "{key}" multiple times') 279 parent[optpath[-1]] = value 280 281 def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 282 """ 283 Build a QMP input object from a user provided command-line in the 284 following format: 285 286 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 287 """ 288 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 289 cmdargs = re.findall(argument_regex, cmdline) 290 qmpcmd: QMPMessage 291 292 # Transactional CLI entry: 293 if cmdargs and cmdargs[0] == 'transaction(': 294 self._transmode = True 295 self._actions = [] 296 cmdargs.pop(0) 297 298 # Transactional CLI exit: 299 if cmdargs and cmdargs[0] == ')' and self._transmode: 300 self._transmode = False 301 if len(cmdargs) > 1: 302 msg = 'Unexpected input after close of Transaction sub-shell' 303 raise QMPShellError(msg) 304 qmpcmd = { 305 'execute': 'transaction', 306 'arguments': {'actions': self._actions} 307 } 308 return qmpcmd 309 310 # No args, or no args remaining 311 if not cmdargs: 312 return None 313 314 if self._transmode: 315 # Parse and cache this Transactional Action 316 finalize = False 317 action = {'type': cmdargs[0], 'data': {}} 318 if cmdargs[-1] == ')': 319 cmdargs.pop(-1) 320 finalize = True 321 self._cli_expr(cmdargs[1:], action['data']) 322 self._actions.append(action) 323 return self._build_cmd(')') if finalize else None 324 325 # Standard command: parse and return it to be executed. 326 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 327 self._cli_expr(cmdargs[1:], qmpcmd['arguments']) 328 return qmpcmd 329 330 def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None: 331 jsobj = json.dumps(qmp_message, 332 indent=4 if self.pretty else None, 333 sort_keys=self.pretty) 334 print(str(jsobj), file=fh) 335 336 def _execute_cmd(self, cmdline: str) -> bool: 337 try: 338 qmpcmd = self._build_cmd(cmdline) 339 except QMPShellError as err: 340 print( 341 f"Error while parsing command line: {err!s}\n" 342 "command format: <command-name> " 343 "[arg-name1=arg1] ... [arg-nameN=argN", 344 file=sys.stderr 345 ) 346 return True 347 # For transaction mode, we may have just cached the action: 348 if qmpcmd is None: 349 return True 350 if self.verbose: 351 self._print(qmpcmd) 352 resp = self.cmd_obj(qmpcmd) 353 if resp is None: 354 print('Disconnected') 355 return False 356 self._print(resp) 357 if self.logfile is not None: 358 cmd = {**qmpcmd, **resp} 359 self._print(cmd, fh=self.logfile) 360 return True 361 362 def connect(self, negotiate: bool = True) -> None: 363 self._greeting = super().connect(negotiate) 364 self._completer_setup() 365 366 def show_banner(self, 367 msg: str = 'Welcome to the QMP low-level shell!') -> None: 368 """ 369 Print to stdio a greeting, and the QEMU version if available. 370 """ 371 print(msg) 372 if not self._greeting: 373 print('Connected') 374 return 375 version = self._greeting['QMP']['version']['qemu'] 376 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 377 378 @property 379 def prompt(self) -> str: 380 """ 381 Return the current shell prompt, including a trailing space. 382 """ 383 if self._transmode: 384 return 'TRANS> ' 385 return '(QEMU) ' 386 387 def read_exec_command(self) -> bool: 388 """ 389 Read and execute a command. 390 391 @return True if execution was ok, return False if disconnected. 392 """ 393 try: 394 cmdline = input(self.prompt) 395 except EOFError: 396 print() 397 return False 398 399 if cmdline == '': 400 for event in self.get_events(): 401 print(event) 402 return True 403 404 return self._execute_cmd(cmdline) 405 406 def repl(self) -> Iterator[None]: 407 """ 408 Return an iterator that implements the REPL. 409 """ 410 self.show_banner() 411 while self.read_exec_command(): 412 yield 413 self.close() 414 415 416class HMPShell(QMPShell): 417 """ 418 HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. 419 420 :param address: Address of the QMP server. 421 :param pretty: Pretty-print QMP messages. 422 :param verbose: Echo outgoing QMP messages to console. 423 """ 424 def __init__(self, address: SocketAddrT, 425 pretty: bool = False, 426 verbose: bool = False, 427 server: bool = False, 428 logfile: Optional[str] = None): 429 super().__init__(address, pretty, verbose, server, logfile) 430 self._cpu_index = 0 431 432 def _cmd_completion(self) -> None: 433 for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): 434 if cmd and cmd[0] != '[' and cmd[0] != '\t': 435 name = cmd.split()[0] # drop help text 436 if name == 'info': 437 continue 438 if name.find('|') != -1: 439 # Command in the form 'foobar|f' or 'f|foobar', take the 440 # full name 441 opt = name.split('|') 442 if len(opt[0]) == 1: 443 name = opt[1] 444 else: 445 name = opt[0] 446 self._completer.append(name) 447 self._completer.append('help ' + name) # help completion 448 449 def _info_completion(self) -> None: 450 for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): 451 if cmd: 452 self._completer.append('info ' + cmd.split()[1]) 453 454 def _other_completion(self) -> None: 455 # special cases 456 self._completer.append('help info') 457 458 def _fill_completion(self) -> None: 459 self._cmd_completion() 460 self._info_completion() 461 self._other_completion() 462 463 def _cmd_passthrough(self, cmdline: str, 464 cpu_index: int = 0) -> QMPMessage: 465 return self.cmd_obj({ 466 'execute': 'human-monitor-command', 467 'arguments': { 468 'command-line': cmdline, 469 'cpu-index': cpu_index 470 } 471 }) 472 473 def _execute_cmd(self, cmdline: str) -> bool: 474 if cmdline.split()[0] == "cpu": 475 # trap the cpu command, it requires special setting 476 try: 477 idx = int(cmdline.split()[1]) 478 if 'return' not in self._cmd_passthrough('info version', idx): 479 print('bad CPU index') 480 return True 481 self._cpu_index = idx 482 except ValueError: 483 print('cpu command takes an integer argument') 484 return True 485 resp = self._cmd_passthrough(cmdline, self._cpu_index) 486 if resp is None: 487 print('Disconnected') 488 return False 489 assert 'return' in resp or 'error' in resp 490 if 'return' in resp: 491 # Success 492 if len(resp['return']) > 0: 493 print(resp['return'], end=' ') 494 else: 495 # Error 496 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 497 return True 498 499 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 500 QMPShell.show_banner(self, msg) 501 502 503def die(msg: str) -> NoReturn: 504 """Write an error to stderr, then exit with a return code of 1.""" 505 sys.stderr.write('ERROR: %s\n' % msg) 506 sys.exit(1) 507 508 509def main() -> None: 510 """ 511 qmp-shell entry point: parse command line arguments and start the REPL. 512 """ 513 parser = argparse.ArgumentParser() 514 parser.add_argument('-H', '--hmp', action='store_true', 515 help='Use HMP interface') 516 parser.add_argument('-N', '--skip-negotiation', action='store_true', 517 help='Skip negotiate (for qemu-ga)') 518 parser.add_argument('-v', '--verbose', action='store_true', 519 help='Verbose (echo commands sent and received)') 520 parser.add_argument('-p', '--pretty', action='store_true', 521 help='Pretty-print JSON') 522 parser.add_argument('-l', '--logfile', 523 help='Save log of all QMP messages to PATH') 524 525 default_server = os.environ.get('QMP_SOCKET') 526 parser.add_argument('qmp_server', action='store', 527 default=default_server, 528 help='< UNIX socket path | TCP address:port >') 529 530 args = parser.parse_args() 531 if args.qmp_server is None: 532 parser.error("QMP socket or TCP address must be specified") 533 534 shell_class = HMPShell if args.hmp else QMPShell 535 536 try: 537 address = shell_class.parse_address(args.qmp_server) 538 except QMPBadPortError: 539 parser.error(f"Bad port number: {args.qmp_server}") 540 return # pycharm doesn't know error() is noreturn 541 542 with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu: 543 try: 544 qemu.connect(negotiate=not args.skip_negotiation) 545 except ConnectError as err: 546 if isinstance(err.exc, OSError): 547 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 548 die(str(err)) 549 550 for _ in qemu.repl(): 551 pass 552 553 554def main_wrap() -> None: 555 """ 556 qmp-shell-wrap entry point: parse command line arguments and 557 start the REPL. 558 """ 559 parser = argparse.ArgumentParser() 560 parser.add_argument('-H', '--hmp', action='store_true', 561 help='Use HMP interface') 562 parser.add_argument('-v', '--verbose', action='store_true', 563 help='Verbose (echo commands sent and received)') 564 parser.add_argument('-p', '--pretty', action='store_true', 565 help='Pretty-print JSON') 566 parser.add_argument('-l', '--logfile', 567 help='Save log of all QMP messages to PATH') 568 569 parser.add_argument('command', nargs=argparse.REMAINDER, 570 help='QEMU command line to invoke') 571 572 args = parser.parse_args() 573 574 cmd = args.command 575 if len(cmd) != 0 and cmd[0] == '--': 576 cmd = cmd[1:] 577 if len(cmd) == 0: 578 cmd = ["qemu-system-x86_64"] 579 580 sockpath = "qmp-shell-wrap-%d" % os.getpid() 581 cmd += ["-qmp", "unix:%s" % sockpath] 582 583 shell_class = HMPShell if args.hmp else QMPShell 584 585 try: 586 address = shell_class.parse_address(sockpath) 587 except QMPBadPortError: 588 parser.error(f"Bad port number: {sockpath}") 589 return # pycharm doesn't know error() is noreturn 590 591 try: 592 with shell_class(address, args.pretty, args.verbose, 593 True, args.logfile) as qemu: 594 with Popen(cmd): 595 596 try: 597 qemu.accept() 598 except ConnectError as err: 599 if isinstance(err.exc, OSError): 600 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 601 die(str(err)) 602 603 for _ in qemu.repl(): 604 pass 605 finally: 606 os.unlink(sockpath) 607 608 609if __name__ == '__main__': 610 main() 611