xref: /qemu/python/qemu/qmp/qmp_shell.py (revision b2a3cbb8)
1#
2# Copyright (C) 2009-2022 Red Hat Inc.
3#
4# Authors:
5#  Luiz Capitulino <lcapitulino@redhat.com>
6#  John Snow <jsnow@redhat.com>
7#
8# This work is licensed under the terms of the GNU LGPL, version 2 or
9# later. See the COPYING file in the top-level directory.
10#
11
12"""
13Low-level QEMU shell on top of QMP.
14
15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
16
17positional arguments:
18  qmp_server            < UNIX socket path | TCP address:port >
19
20optional arguments:
21  -h, --help            show this help message and exit
22  -H, --hmp             Use HMP interface
23  -N, --skip-negotiation
24                        Skip negotiate (for qemu-ga)
25  -v, --verbose         Verbose (echo commands sent and received)
26  -p, --pretty          Pretty-print JSON
27
28
29Start QEMU with:
30
31# qemu [...] -qmp unix:./qmp-sock,server
32
33Run the shell:
34
35$ qmp-shell ./qmp-sock
36
37Commands have the following format:
38
39   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
40
41For example:
42
43(QEMU) device_add driver=e1000 id=net1
44{'return': {}}
45(QEMU)
46
47key=value pairs also support Python or JSON object literal subset notations,
48without spaces. Dictionaries/objects {} are supported as are arrays [].
49
50   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
51
52Both JSON and Python formatting should work, including both styles of
53string literal quotes. Both paradigms of literal values should work,
54including null/true/false for JSON and None/True/False for Python.
55
56
57Transactions have the following multi-line format:
58
59   transaction(
60   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
61   ...
62   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
63   )
64
65One line transactions are also supported:
66
67   transaction( action-name1 ... )
68
69For example:
70
71    (QEMU) transaction(
72    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
73    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
74    TRANS> )
75    {"return": {}}
76    (QEMU)
77
78Use the -v and -p options to activate the verbose and pretty-print options,
79which will echo back the properly formatted JSON-compliant QMP that is being
80sent to QEMU, which is useful for debugging and documentation generation.
81"""
82
83import argparse
84import ast
85import json
86import logging
87import os
88import re
89import readline
90from subprocess import Popen
91import sys
92from typing import (
93    IO,
94    Iterator,
95    List,
96    NoReturn,
97    Optional,
98    Sequence,
99)
100
101from qemu.qmp import ConnectError, QMPError, SocketAddrT
102from qemu.qmp.legacy import (
103    QEMUMonitorProtocol,
104    QMPBadPortError,
105    QMPMessage,
106    QMPObject,
107)
108
109
110LOG = logging.getLogger(__name__)
111
112
113class QMPCompleter:
114    """
115    QMPCompleter provides a readline library tab-complete behavior.
116    """
117    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
118    # but pylint as of today does not know that List[str] is simply 'list'.
119    def __init__(self) -> None:
120        self._matches: List[str] = []
121
122    def append(self, value: str) -> None:
123        """Append a new valid completion to the list of possibilities."""
124        return self._matches.append(value)
125
126    def complete(self, text: str, state: int) -> Optional[str]:
127        """readline.set_completer() callback implementation."""
128        for cmd in self._matches:
129            if cmd.startswith(text):
130                if state == 0:
131                    return cmd
132                state -= 1
133        return None
134
135
136class QMPShellError(QMPError):
137    """
138    QMP Shell Base error class.
139    """
140
141
142class FuzzyJSON(ast.NodeTransformer):
143    """
144    This extension of ast.NodeTransformer filters literal "true/false/null"
145    values in a Python AST and replaces them by proper "True/False/None" values
146    that Python can properly evaluate.
147    """
148
149    @classmethod
150    def visit_Name(cls,  # pylint: disable=invalid-name
151                   node: ast.Name) -> ast.AST:
152        """
153        Transform Name nodes with certain values into Constant (keyword) nodes.
154        """
155        if node.id == 'true':
156            return ast.Constant(value=True)
157        if node.id == 'false':
158            return ast.Constant(value=False)
159        if node.id == 'null':
160            return ast.Constant(value=None)
161        return node
162
163
164class QMPShell(QEMUMonitorProtocol):
165    """
166    QMPShell provides a basic readline-based QMP shell.
167
168    :param address: Address of the QMP server.
169    :param pretty: Pretty-print QMP messages.
170    :param verbose: Echo outgoing QMP messages to console.
171    """
172    def __init__(self, address: SocketAddrT,
173                 pretty: bool = False,
174                 verbose: bool = False,
175                 server: bool = False,
176                 logfile: Optional[str] = None):
177        super().__init__(address, server=server)
178        self._greeting: Optional[QMPMessage] = None
179        self._completer = QMPCompleter()
180        self._transmode = False
181        self._actions: List[QMPMessage] = []
182        self._histfile = os.path.join(os.path.expanduser('~'),
183                                      '.qmp-shell_history')
184        self.pretty = pretty
185        self.verbose = verbose
186        self.logfile = None
187
188        if logfile is not None:
189            self.logfile = open(logfile, "w", encoding='utf-8')
190
191    def close(self) -> None:
192        # Hook into context manager of parent to save shell history.
193        self._save_history()
194        super().close()
195
196    def _fill_completion(self) -> None:
197        cmds = self.cmd('query-commands')
198        if 'error' in cmds:
199            return
200        for cmd in cmds['return']:
201            self._completer.append(cmd['name'])
202
203    def _completer_setup(self) -> None:
204        self._completer = QMPCompleter()
205        self._fill_completion()
206        readline.set_history_length(1024)
207        readline.set_completer(self._completer.complete)
208        readline.parse_and_bind("tab: complete")
209        # NB: default delimiters conflict with some command names
210        # (eg. query-), clearing everything as it doesn't seem to matter
211        readline.set_completer_delims('')
212        try:
213            readline.read_history_file(self._histfile)
214        except FileNotFoundError:
215            pass
216        except IOError as err:
217            msg = f"Failed to read history '{self._histfile}': {err!s}"
218            LOG.warning(msg)
219
220    def _save_history(self) -> None:
221        try:
222            readline.write_history_file(self._histfile)
223        except IOError as err:
224            msg = f"Failed to save history file '{self._histfile}': {err!s}"
225            LOG.warning(msg)
226
227    @classmethod
228    def _parse_value(cls, val: str) -> object:
229        try:
230            return int(val)
231        except ValueError:
232            pass
233
234        if val.lower() == 'true':
235            return True
236        if val.lower() == 'false':
237            return False
238        if val.startswith(('{', '[')):
239            # Try first as pure JSON:
240            try:
241                return json.loads(val)
242            except ValueError:
243                pass
244            # Try once again as FuzzyJSON:
245            try:
246                tree = ast.parse(val, mode='eval')
247                transformed = FuzzyJSON().visit(tree)
248                return ast.literal_eval(transformed)
249            except (SyntaxError, ValueError):
250                pass
251        return val
252
253    def _cli_expr(self,
254                  tokens: Sequence[str],
255                  parent: QMPObject) -> None:
256        for arg in tokens:
257            (key, sep, val) = arg.partition('=')
258            if sep != '=':
259                raise QMPShellError(
260                    f"Expected a key=value pair, got '{arg!s}'"
261                )
262
263            value = self._parse_value(val)
264            optpath = key.split('.')
265            curpath = []
266            for path in optpath[:-1]:
267                curpath.append(path)
268                obj = parent.get(path, {})
269                if not isinstance(obj, dict):
270                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
271                    raise QMPShellError(msg.format('.'.join(curpath)))
272                parent[path] = obj
273                parent = obj
274            if optpath[-1] in parent:
275                if isinstance(parent[optpath[-1]], dict):
276                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
277                    raise QMPShellError(msg.format('.'.join(curpath)))
278                raise QMPShellError(f'Cannot set "{key}" multiple times')
279            parent[optpath[-1]] = value
280
281    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
282        """
283        Build a QMP input object from a user provided command-line in the
284        following format:
285
286            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
287        """
288        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
289        cmdargs = re.findall(argument_regex, cmdline)
290        qmpcmd: QMPMessage
291
292        # Transactional CLI entry:
293        if cmdargs and cmdargs[0] == 'transaction(':
294            self._transmode = True
295            self._actions = []
296            cmdargs.pop(0)
297
298        # Transactional CLI exit:
299        if cmdargs and cmdargs[0] == ')' and self._transmode:
300            self._transmode = False
301            if len(cmdargs) > 1:
302                msg = 'Unexpected input after close of Transaction sub-shell'
303                raise QMPShellError(msg)
304            qmpcmd = {
305                'execute': 'transaction',
306                'arguments': {'actions': self._actions}
307            }
308            return qmpcmd
309
310        # No args, or no args remaining
311        if not cmdargs:
312            return None
313
314        if self._transmode:
315            # Parse and cache this Transactional Action
316            finalize = False
317            action = {'type': cmdargs[0], 'data': {}}
318            if cmdargs[-1] == ')':
319                cmdargs.pop(-1)
320                finalize = True
321            self._cli_expr(cmdargs[1:], action['data'])
322            self._actions.append(action)
323            return self._build_cmd(')') if finalize else None
324
325        # Standard command: parse and return it to be executed.
326        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
327        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
328        return qmpcmd
329
330    def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None:
331        jsobj = json.dumps(qmp_message,
332                           indent=4 if self.pretty else None,
333                           sort_keys=self.pretty)
334        print(str(jsobj), file=fh)
335
336    def _execute_cmd(self, cmdline: str) -> bool:
337        try:
338            qmpcmd = self._build_cmd(cmdline)
339        except QMPShellError as err:
340            print(
341                f"Error while parsing command line: {err!s}\n"
342                "command format: <command-name> "
343                "[arg-name1=arg1] ... [arg-nameN=argN",
344                file=sys.stderr
345            )
346            return True
347        # For transaction mode, we may have just cached the action:
348        if qmpcmd is None:
349            return True
350        if self.verbose:
351            self._print(qmpcmd)
352        resp = self.cmd_obj(qmpcmd)
353        if resp is None:
354            print('Disconnected')
355            return False
356        self._print(resp)
357        if self.logfile is not None:
358            cmd = {**qmpcmd, **resp}
359            self._print(cmd, fh=self.logfile)
360        return True
361
362    def connect(self, negotiate: bool = True) -> None:
363        self._greeting = super().connect(negotiate)
364        self._completer_setup()
365
366    def show_banner(self,
367                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
368        """
369        Print to stdio a greeting, and the QEMU version if available.
370        """
371        print(msg)
372        if not self._greeting:
373            print('Connected')
374            return
375        version = self._greeting['QMP']['version']['qemu']
376        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
377
378    @property
379    def prompt(self) -> str:
380        """
381        Return the current shell prompt, including a trailing space.
382        """
383        if self._transmode:
384            return 'TRANS> '
385        return '(QEMU) '
386
387    def read_exec_command(self) -> bool:
388        """
389        Read and execute a command.
390
391        @return True if execution was ok, return False if disconnected.
392        """
393        try:
394            cmdline = input(self.prompt)
395        except EOFError:
396            print()
397            return False
398
399        if cmdline == '':
400            for event in self.get_events():
401                print(event)
402            return True
403
404        return self._execute_cmd(cmdline)
405
406    def repl(self) -> Iterator[None]:
407        """
408        Return an iterator that implements the REPL.
409        """
410        self.show_banner()
411        while self.read_exec_command():
412            yield
413        self.close()
414
415
416class HMPShell(QMPShell):
417    """
418    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
419
420    :param address: Address of the QMP server.
421    :param pretty: Pretty-print QMP messages.
422    :param verbose: Echo outgoing QMP messages to console.
423    """
424    def __init__(self, address: SocketAddrT,
425                 pretty: bool = False,
426                 verbose: bool = False,
427                 server: bool = False,
428                 logfile: Optional[str] = None):
429        super().__init__(address, pretty, verbose, server, logfile)
430        self._cpu_index = 0
431
432    def _cmd_completion(self) -> None:
433        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
434            if cmd and cmd[0] != '[' and cmd[0] != '\t':
435                name = cmd.split()[0]  # drop help text
436                if name == 'info':
437                    continue
438                if name.find('|') != -1:
439                    # Command in the form 'foobar|f' or 'f|foobar', take the
440                    # full name
441                    opt = name.split('|')
442                    if len(opt[0]) == 1:
443                        name = opt[1]
444                    else:
445                        name = opt[0]
446                self._completer.append(name)
447                self._completer.append('help ' + name)  # help completion
448
449    def _info_completion(self) -> None:
450        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
451            if cmd:
452                self._completer.append('info ' + cmd.split()[1])
453
454    def _other_completion(self) -> None:
455        # special cases
456        self._completer.append('help info')
457
458    def _fill_completion(self) -> None:
459        self._cmd_completion()
460        self._info_completion()
461        self._other_completion()
462
463    def _cmd_passthrough(self, cmdline: str,
464                         cpu_index: int = 0) -> QMPMessage:
465        return self.cmd_obj({
466            'execute': 'human-monitor-command',
467            'arguments': {
468                'command-line': cmdline,
469                'cpu-index': cpu_index
470            }
471        })
472
473    def _execute_cmd(self, cmdline: str) -> bool:
474        if cmdline.split()[0] == "cpu":
475            # trap the cpu command, it requires special setting
476            try:
477                idx = int(cmdline.split()[1])
478                if 'return' not in self._cmd_passthrough('info version', idx):
479                    print('bad CPU index')
480                    return True
481                self._cpu_index = idx
482            except ValueError:
483                print('cpu command takes an integer argument')
484                return True
485        resp = self._cmd_passthrough(cmdline, self._cpu_index)
486        if resp is None:
487            print('Disconnected')
488            return False
489        assert 'return' in resp or 'error' in resp
490        if 'return' in resp:
491            # Success
492            if len(resp['return']) > 0:
493                print(resp['return'], end=' ')
494        else:
495            # Error
496            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
497        return True
498
499    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
500        QMPShell.show_banner(self, msg)
501
502
503def die(msg: str) -> NoReturn:
504    """Write an error to stderr, then exit with a return code of 1."""
505    sys.stderr.write('ERROR: %s\n' % msg)
506    sys.exit(1)
507
508
509def main() -> None:
510    """
511    qmp-shell entry point: parse command line arguments and start the REPL.
512    """
513    parser = argparse.ArgumentParser()
514    parser.add_argument('-H', '--hmp', action='store_true',
515                        help='Use HMP interface')
516    parser.add_argument('-N', '--skip-negotiation', action='store_true',
517                        help='Skip negotiate (for qemu-ga)')
518    parser.add_argument('-v', '--verbose', action='store_true',
519                        help='Verbose (echo commands sent and received)')
520    parser.add_argument('-p', '--pretty', action='store_true',
521                        help='Pretty-print JSON')
522    parser.add_argument('-l', '--logfile',
523                        help='Save log of all QMP messages to PATH')
524
525    default_server = os.environ.get('QMP_SOCKET')
526    parser.add_argument('qmp_server', action='store',
527                        default=default_server,
528                        help='< UNIX socket path | TCP address:port >')
529
530    args = parser.parse_args()
531    if args.qmp_server is None:
532        parser.error("QMP socket or TCP address must be specified")
533
534    shell_class = HMPShell if args.hmp else QMPShell
535
536    try:
537        address = shell_class.parse_address(args.qmp_server)
538    except QMPBadPortError:
539        parser.error(f"Bad port number: {args.qmp_server}")
540        return  # pycharm doesn't know error() is noreturn
541
542    with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu:
543        try:
544            qemu.connect(negotiate=not args.skip_negotiation)
545        except ConnectError as err:
546            if isinstance(err.exc, OSError):
547                die(f"Couldn't connect to {args.qmp_server}: {err!s}")
548            die(str(err))
549
550        for _ in qemu.repl():
551            pass
552
553
554def main_wrap() -> None:
555    """
556    qmp-shell-wrap entry point: parse command line arguments and
557    start the REPL.
558    """
559    parser = argparse.ArgumentParser()
560    parser.add_argument('-H', '--hmp', action='store_true',
561                        help='Use HMP interface')
562    parser.add_argument('-v', '--verbose', action='store_true',
563                        help='Verbose (echo commands sent and received)')
564    parser.add_argument('-p', '--pretty', action='store_true',
565                        help='Pretty-print JSON')
566    parser.add_argument('-l', '--logfile',
567                        help='Save log of all QMP messages to PATH')
568
569    parser.add_argument('command', nargs=argparse.REMAINDER,
570                        help='QEMU command line to invoke')
571
572    args = parser.parse_args()
573
574    cmd = args.command
575    if len(cmd) != 0 and cmd[0] == '--':
576        cmd = cmd[1:]
577    if len(cmd) == 0:
578        cmd = ["qemu-system-x86_64"]
579
580    sockpath = "qmp-shell-wrap-%d" % os.getpid()
581    cmd += ["-qmp", "unix:%s" % sockpath]
582
583    shell_class = HMPShell if args.hmp else QMPShell
584
585    try:
586        address = shell_class.parse_address(sockpath)
587    except QMPBadPortError:
588        parser.error(f"Bad port number: {sockpath}")
589        return  # pycharm doesn't know error() is noreturn
590
591    try:
592        with shell_class(address, args.pretty, args.verbose,
593                         True, args.logfile) as qemu:
594            with Popen(cmd):
595
596                try:
597                    qemu.accept()
598                except ConnectError as err:
599                    if isinstance(err.exc, OSError):
600                        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
601                    die(str(err))
602
603                for _ in qemu.repl():
604                    pass
605    finally:
606        os.unlink(sockpath)
607
608
609if __name__ == '__main__':
610    main()
611