xref: /qemu/python/qemu/qmp/qmp_shell.py (revision ec6f3fc3)
1#
2# Copyright (C) 2009-2022 Red Hat Inc.
3#
4# Authors:
5#  Luiz Capitulino <lcapitulino@redhat.com>
6#  John Snow <jsnow@redhat.com>
7#
8# This work is licensed under the terms of the GNU LGPL, version 2 or
9# later. See the COPYING file in the top-level directory.
10#
11
12"""
13Low-level QEMU shell on top of QMP.
14
15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
16
17positional arguments:
18  qmp_server            < UNIX socket path | TCP address:port >
19
20optional arguments:
21  -h, --help            show this help message and exit
22  -H, --hmp             Use HMP interface
23  -N, --skip-negotiation
24                        Skip negotiate (for qemu-ga)
25  -v, --verbose         Verbose (echo commands sent and received)
26  -p, --pretty          Pretty-print JSON
27
28
29Start QEMU with:
30
31# qemu [...] -qmp unix:./qmp-sock,server
32
33Run the shell:
34
35$ qmp-shell ./qmp-sock
36
37Commands have the following format:
38
39   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
40
41For example:
42
43(QEMU) device_add driver=e1000 id=net1
44{'return': {}}
45(QEMU)
46
47key=value pairs also support Python or JSON object literal subset notations,
48without spaces. Dictionaries/objects {} are supported as are arrays [].
49
50   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
51
52Both JSON and Python formatting should work, including both styles of
53string literal quotes. Both paradigms of literal values should work,
54including null/true/false for JSON and None/True/False for Python.
55
56
57Transactions have the following multi-line format:
58
59   transaction(
60   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
61   ...
62   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
63   )
64
65One line transactions are also supported:
66
67   transaction( action-name1 ... )
68
69For example:
70
71    (QEMU) transaction(
72    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
73    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
74    TRANS> )
75    {"return": {}}
76    (QEMU)
77
78Use the -v and -p options to activate the verbose and pretty-print options,
79which will echo back the properly formatted JSON-compliant QMP that is being
80sent to QEMU, which is useful for debugging and documentation generation.
81"""
82
83import argparse
84import ast
85import json
86import logging
87import os
88import re
89import readline
90from subprocess import Popen
91import sys
92from typing import (
93    IO,
94    Dict,
95    Iterator,
96    List,
97    NoReturn,
98    Optional,
99    Sequence,
100    cast,
101)
102
103from qemu.qmp import (
104    ConnectError,
105    ExecuteError,
106    QMPError,
107    SocketAddrT,
108)
109from qemu.qmp.legacy import (
110    QEMUMonitorProtocol,
111    QMPBadPortError,
112    QMPMessage,
113    QMPObject,
114)
115
116
117LOG = logging.getLogger(__name__)
118
119
120class QMPCompleter:
121    """
122    QMPCompleter provides a readline library tab-complete behavior.
123    """
124    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
125    # but pylint as of today does not know that List[str] is simply 'list'.
126    def __init__(self) -> None:
127        self._matches: List[str] = []
128
129    def append(self, value: str) -> None:
130        """Append a new valid completion to the list of possibilities."""
131        return self._matches.append(value)
132
133    def complete(self, text: str, state: int) -> Optional[str]:
134        """readline.set_completer() callback implementation."""
135        for cmd in self._matches:
136            if cmd.startswith(text):
137                if state == 0:
138                    return cmd
139                state -= 1
140        return None
141
142
143class QMPShellError(QMPError):
144    """
145    QMP Shell Base error class.
146    """
147
148
149class FuzzyJSON(ast.NodeTransformer):
150    """
151    This extension of ast.NodeTransformer filters literal "true/false/null"
152    values in a Python AST and replaces them by proper "True/False/None" values
153    that Python can properly evaluate.
154    """
155
156    @classmethod
157    def visit_Name(cls,  # pylint: disable=invalid-name
158                   node: ast.Name) -> ast.AST:
159        """
160        Transform Name nodes with certain values into Constant (keyword) nodes.
161        """
162        if node.id == 'true':
163            return ast.Constant(value=True)
164        if node.id == 'false':
165            return ast.Constant(value=False)
166        if node.id == 'null':
167            return ast.Constant(value=None)
168        return node
169
170
171class QMPShell(QEMUMonitorProtocol):
172    """
173    QMPShell provides a basic readline-based QMP shell.
174
175    :param address: Address of the QMP server.
176    :param pretty: Pretty-print QMP messages.
177    :param verbose: Echo outgoing QMP messages to console.
178    """
179    def __init__(self, address: SocketAddrT,
180                 pretty: bool = False,
181                 verbose: bool = False,
182                 server: bool = False,
183                 logfile: Optional[str] = None):
184        super().__init__(address, server=server)
185        self._greeting: Optional[QMPMessage] = None
186        self._completer = QMPCompleter()
187        self._transmode = False
188        self._actions: List[QMPMessage] = []
189        self._histfile = os.path.join(os.path.expanduser('~'),
190                                      '.qmp-shell_history')
191        self.pretty = pretty
192        self.verbose = verbose
193        self.logfile = None
194
195        if logfile is not None:
196            self.logfile = open(logfile, "w", encoding='utf-8')
197
198    def close(self) -> None:
199        # Hook into context manager of parent to save shell history.
200        self._save_history()
201        super().close()
202
203    def _fill_completion(self) -> None:
204        try:
205            cmds = cast(List[Dict[str, str]], self.cmd('query-commands'))
206            for cmd in cmds:
207                self._completer.append(cmd['name'])
208        except ExecuteError:
209            pass
210
211    def _completer_setup(self) -> None:
212        self._completer = QMPCompleter()
213        self._fill_completion()
214        readline.set_history_length(1024)
215        readline.set_completer(self._completer.complete)
216        readline.parse_and_bind("tab: complete")
217        # NB: default delimiters conflict with some command names
218        # (eg. query-), clearing everything as it doesn't seem to matter
219        readline.set_completer_delims('')
220        try:
221            readline.read_history_file(self._histfile)
222        except FileNotFoundError:
223            pass
224        except IOError as err:
225            msg = f"Failed to read history '{self._histfile}': {err!s}"
226            LOG.warning(msg)
227
228    def _save_history(self) -> None:
229        try:
230            readline.write_history_file(self._histfile)
231        except IOError as err:
232            msg = f"Failed to save history file '{self._histfile}': {err!s}"
233            LOG.warning(msg)
234
235    @classmethod
236    def _parse_value(cls, val: str) -> object:
237        try:
238            return int(val)
239        except ValueError:
240            pass
241
242        if val.lower() == 'true':
243            return True
244        if val.lower() == 'false':
245            return False
246        if val.startswith(('{', '[')):
247            # Try first as pure JSON:
248            try:
249                return json.loads(val)
250            except ValueError:
251                pass
252            # Try once again as FuzzyJSON:
253            try:
254                tree = ast.parse(val, mode='eval')
255                transformed = FuzzyJSON().visit(tree)
256                return ast.literal_eval(transformed)
257            except (SyntaxError, ValueError):
258                pass
259        return val
260
261    def _cli_expr(self,
262                  tokens: Sequence[str],
263                  parent: QMPObject) -> None:
264        for arg in tokens:
265            (key, sep, val) = arg.partition('=')
266            if sep != '=':
267                raise QMPShellError(
268                    f"Expected a key=value pair, got '{arg!s}'"
269                )
270
271            value = self._parse_value(val)
272            optpath = key.split('.')
273            curpath = []
274            for path in optpath[:-1]:
275                curpath.append(path)
276                obj = parent.get(path, {})
277                if not isinstance(obj, dict):
278                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
279                    raise QMPShellError(msg.format('.'.join(curpath)))
280                parent[path] = obj
281                parent = obj
282            if optpath[-1] in parent:
283                if isinstance(parent[optpath[-1]], dict):
284                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
285                    raise QMPShellError(msg.format('.'.join(curpath)))
286                raise QMPShellError(f'Cannot set "{key}" multiple times')
287            parent[optpath[-1]] = value
288
289    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
290        """
291        Build a QMP input object from a user provided command-line in the
292        following format:
293
294            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
295        """
296        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
297        cmdargs = re.findall(argument_regex, cmdline)
298        qmpcmd: QMPMessage
299
300        # Transactional CLI entry:
301        if cmdargs and cmdargs[0] == 'transaction(':
302            self._transmode = True
303            self._actions = []
304            cmdargs.pop(0)
305
306        # Transactional CLI exit:
307        if cmdargs and cmdargs[0] == ')' and self._transmode:
308            self._transmode = False
309            if len(cmdargs) > 1:
310                msg = 'Unexpected input after close of Transaction sub-shell'
311                raise QMPShellError(msg)
312            qmpcmd = {
313                'execute': 'transaction',
314                'arguments': {'actions': self._actions}
315            }
316            return qmpcmd
317
318        # No args, or no args remaining
319        if not cmdargs:
320            return None
321
322        if self._transmode:
323            # Parse and cache this Transactional Action
324            finalize = False
325            action = {'type': cmdargs[0], 'data': {}}
326            if cmdargs[-1] == ')':
327                cmdargs.pop(-1)
328                finalize = True
329            self._cli_expr(cmdargs[1:], action['data'])
330            self._actions.append(action)
331            return self._build_cmd(')') if finalize else None
332
333        # Standard command: parse and return it to be executed.
334        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
335        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
336        return qmpcmd
337
338    def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None:
339        jsobj = json.dumps(qmp_message,
340                           indent=4 if self.pretty else None,
341                           sort_keys=self.pretty)
342        print(str(jsobj), file=fh)
343
344    def _execute_cmd(self, cmdline: str) -> bool:
345        try:
346            qmpcmd = self._build_cmd(cmdline)
347        except QMPShellError as err:
348            print(
349                f"Error while parsing command line: {err!s}\n"
350                "command format: <command-name> "
351                "[arg-name1=arg1] ... [arg-nameN=argN",
352                file=sys.stderr
353            )
354            return True
355        # For transaction mode, we may have just cached the action:
356        if qmpcmd is None:
357            return True
358        if self.verbose:
359            self._print(qmpcmd)
360        resp = self.cmd_obj(qmpcmd)
361        if resp is None:
362            print('Disconnected')
363            return False
364        self._print(resp)
365        if self.logfile is not None:
366            cmd = {**qmpcmd, **resp}
367            self._print(cmd, fh=self.logfile)
368        return True
369
370    def connect(self, negotiate: bool = True) -> None:
371        self._greeting = super().connect(negotiate)
372        self._completer_setup()
373
374    def show_banner(self,
375                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
376        """
377        Print to stdio a greeting, and the QEMU version if available.
378        """
379        print(msg)
380        if not self._greeting:
381            print('Connected')
382            return
383        version = self._greeting['QMP']['version']['qemu']
384        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
385
386    @property
387    def prompt(self) -> str:
388        """
389        Return the current shell prompt, including a trailing space.
390        """
391        if self._transmode:
392            return 'TRANS> '
393        return '(QEMU) '
394
395    def read_exec_command(self) -> bool:
396        """
397        Read and execute a command.
398
399        @return True if execution was ok, return False if disconnected.
400        """
401        try:
402            cmdline = input(self.prompt)
403        except EOFError:
404            print()
405            return False
406
407        if cmdline == '':
408            for event in self.get_events():
409                print(event)
410            return True
411
412        return self._execute_cmd(cmdline)
413
414    def repl(self) -> Iterator[None]:
415        """
416        Return an iterator that implements the REPL.
417        """
418        self.show_banner()
419        while self.read_exec_command():
420            yield
421        self.close()
422
423
424class HMPShell(QMPShell):
425    """
426    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
427
428    :param address: Address of the QMP server.
429    :param pretty: Pretty-print QMP messages.
430    :param verbose: Echo outgoing QMP messages to console.
431    """
432    def __init__(self, address: SocketAddrT,
433                 pretty: bool = False,
434                 verbose: bool = False,
435                 server: bool = False,
436                 logfile: Optional[str] = None):
437        super().__init__(address, pretty, verbose, server, logfile)
438        self._cpu_index = 0
439
440    def _cmd_completion(self) -> None:
441        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
442            if cmd and cmd[0] != '[' and cmd[0] != '\t':
443                name = cmd.split()[0]  # drop help text
444                if name == 'info':
445                    continue
446                if name.find('|') != -1:
447                    # Command in the form 'foobar|f' or 'f|foobar', take the
448                    # full name
449                    opt = name.split('|')
450                    if len(opt[0]) == 1:
451                        name = opt[1]
452                    else:
453                        name = opt[0]
454                self._completer.append(name)
455                self._completer.append('help ' + name)  # help completion
456
457    def _info_completion(self) -> None:
458        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
459            if cmd:
460                self._completer.append('info ' + cmd.split()[1])
461
462    def _other_completion(self) -> None:
463        # special cases
464        self._completer.append('help info')
465
466    def _fill_completion(self) -> None:
467        self._cmd_completion()
468        self._info_completion()
469        self._other_completion()
470
471    def _cmd_passthrough(self, cmdline: str,
472                         cpu_index: int = 0) -> QMPMessage:
473        return self.cmd_obj({
474            'execute': 'human-monitor-command',
475            'arguments': {
476                'command-line': cmdline,
477                'cpu-index': cpu_index
478            }
479        })
480
481    def _execute_cmd(self, cmdline: str) -> bool:
482        if cmdline.split()[0] == "cpu":
483            # trap the cpu command, it requires special setting
484            try:
485                idx = int(cmdline.split()[1])
486                if 'return' not in self._cmd_passthrough('info version', idx):
487                    print('bad CPU index')
488                    return True
489                self._cpu_index = idx
490            except ValueError:
491                print('cpu command takes an integer argument')
492                return True
493        resp = self._cmd_passthrough(cmdline, self._cpu_index)
494        if resp is None:
495            print('Disconnected')
496            return False
497        assert 'return' in resp or 'error' in resp
498        if 'return' in resp:
499            # Success
500            if len(resp['return']) > 0:
501                print(resp['return'], end=' ')
502        else:
503            # Error
504            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
505        return True
506
507    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
508        QMPShell.show_banner(self, msg)
509
510
511def die(msg: str) -> NoReturn:
512    """Write an error to stderr, then exit with a return code of 1."""
513    sys.stderr.write('ERROR: %s\n' % msg)
514    sys.exit(1)
515
516
517def main() -> None:
518    """
519    qmp-shell entry point: parse command line arguments and start the REPL.
520    """
521    parser = argparse.ArgumentParser()
522    parser.add_argument('-H', '--hmp', action='store_true',
523                        help='Use HMP interface')
524    parser.add_argument('-N', '--skip-negotiation', action='store_true',
525                        help='Skip negotiate (for qemu-ga)')
526    parser.add_argument('-v', '--verbose', action='store_true',
527                        help='Verbose (echo commands sent and received)')
528    parser.add_argument('-p', '--pretty', action='store_true',
529                        help='Pretty-print JSON')
530    parser.add_argument('-l', '--logfile',
531                        help='Save log of all QMP messages to PATH')
532
533    default_server = os.environ.get('QMP_SOCKET')
534    parser.add_argument('qmp_server', action='store',
535                        default=default_server,
536                        help='< UNIX socket path | TCP address:port >')
537
538    args = parser.parse_args()
539    if args.qmp_server is None:
540        parser.error("QMP socket or TCP address must be specified")
541
542    shell_class = HMPShell if args.hmp else QMPShell
543
544    try:
545        address = shell_class.parse_address(args.qmp_server)
546    except QMPBadPortError:
547        parser.error(f"Bad port number: {args.qmp_server}")
548        return  # pycharm doesn't know error() is noreturn
549
550    with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu:
551        try:
552            qemu.connect(negotiate=not args.skip_negotiation)
553        except ConnectError as err:
554            if isinstance(err.exc, OSError):
555                die(f"Couldn't connect to {args.qmp_server}: {err!s}")
556            die(str(err))
557
558        for _ in qemu.repl():
559            pass
560
561
562def main_wrap() -> None:
563    """
564    qmp-shell-wrap entry point: parse command line arguments and
565    start the REPL.
566    """
567    parser = argparse.ArgumentParser()
568    parser.add_argument('-H', '--hmp', action='store_true',
569                        help='Use HMP interface')
570    parser.add_argument('-v', '--verbose', action='store_true',
571                        help='Verbose (echo commands sent and received)')
572    parser.add_argument('-p', '--pretty', action='store_true',
573                        help='Pretty-print JSON')
574    parser.add_argument('-l', '--logfile',
575                        help='Save log of all QMP messages to PATH')
576
577    parser.add_argument('command', nargs=argparse.REMAINDER,
578                        help='QEMU command line to invoke')
579
580    args = parser.parse_args()
581
582    cmd = args.command
583    if len(cmd) != 0 and cmd[0] == '--':
584        cmd = cmd[1:]
585    if len(cmd) == 0:
586        cmd = ["qemu-system-x86_64"]
587
588    sockpath = "qmp-shell-wrap-%d" % os.getpid()
589    cmd += ["-qmp", "unix:%s" % sockpath]
590
591    shell_class = HMPShell if args.hmp else QMPShell
592
593    try:
594        address = shell_class.parse_address(sockpath)
595    except QMPBadPortError:
596        parser.error(f"Bad port number: {sockpath}")
597        return  # pycharm doesn't know error() is noreturn
598
599    try:
600        with shell_class(address, args.pretty, args.verbose,
601                         True, args.logfile) as qemu:
602            with Popen(cmd):
603
604                try:
605                    qemu.accept()
606                except ConnectError as err:
607                    if isinstance(err.exc, OSError):
608                        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
609                    die(str(err))
610
611                for _ in qemu.repl():
612                    pass
613    finally:
614        os.unlink(sockpath)
615
616
617if __name__ == '__main__':
618    main()
619