xref: /qemu/python/qemu/qmp/qmp_shell.py (revision 0955d66e)
1#
2# Copyright (C) 2009, 2010 Red Hat Inc.
3#
4# Authors:
5#  Luiz Capitulino <lcapitulino@redhat.com>
6#
7# This work is licensed under the terms of the GNU GPL, version 2.  See
8# the COPYING file in the top-level directory.
9#
10
11"""
12Low-level QEMU shell on top of QMP.
13
14usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
15
16positional arguments:
17  qmp_server            < UNIX socket path | TCP address:port >
18
19optional arguments:
20  -h, --help            show this help message and exit
21  -H, --hmp             Use HMP interface
22  -N, --skip-negotiation
23                        Skip negotiate (for qemu-ga)
24  -v, --verbose         Verbose (echo commands sent and received)
25  -p, --pretty          Pretty-print JSON
26
27
28Start QEMU with:
29
30# qemu [...] -qmp unix:./qmp-sock,server
31
32Run the shell:
33
34$ qmp-shell ./qmp-sock
35
36Commands have the following format:
37
38   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
39
40For example:
41
42(QEMU) device_add driver=e1000 id=net1
43{'return': {}}
44(QEMU)
45
46key=value pairs also support Python or JSON object literal subset notations,
47without spaces. Dictionaries/objects {} are supported as are arrays [].
48
49   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
50
51Both JSON and Python formatting should work, including both styles of
52string literal quotes. Both paradigms of literal values should work,
53including null/true/false for JSON and None/True/False for Python.
54
55
56Transactions have the following multi-line format:
57
58   transaction(
59   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
60   ...
61   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
62   )
63
64One line transactions are also supported:
65
66   transaction( action-name1 ... )
67
68For example:
69
70    (QEMU) transaction(
71    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
72    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
73    TRANS> )
74    {"return": {}}
75    (QEMU)
76
77Use the -v and -p options to activate the verbose and pretty-print options,
78which will echo back the properly formatted JSON-compliant QMP that is being
79sent to QEMU, which is useful for debugging and documentation generation.
80"""
81
82import argparse
83import ast
84import json
85import logging
86import os
87import re
88import readline
89import sys
90from typing import (
91    Iterator,
92    List,
93    NoReturn,
94    Optional,
95    Sequence,
96)
97
98from qemu import qmp
99from qemu.qmp import QMPMessage
100
101
102LOG = logging.getLogger(__name__)
103
104
105class QMPCompleter:
106    """
107    QMPCompleter provides a readline library tab-complete behavior.
108    """
109    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
110    # but pylint as of today does not know that List[str] is simply 'list'.
111    def __init__(self) -> None:
112        self._matches: List[str] = []
113
114    def append(self, value: str) -> None:
115        """Append a new valid completion to the list of possibilities."""
116        return self._matches.append(value)
117
118    def complete(self, text: str, state: int) -> Optional[str]:
119        """readline.set_completer() callback implementation."""
120        for cmd in self._matches:
121            if cmd.startswith(text):
122                if state == 0:
123                    return cmd
124                state -= 1
125        return None
126
127
128class QMPShellError(qmp.QMPError):
129    """
130    QMP Shell Base error class.
131    """
132
133
134class FuzzyJSON(ast.NodeTransformer):
135    """
136    This extension of ast.NodeTransformer filters literal "true/false/null"
137    values in a Python AST and replaces them by proper "True/False/None" values
138    that Python can properly evaluate.
139    """
140
141    @classmethod
142    def visit_Name(cls,  # pylint: disable=invalid-name
143                   node: ast.Name) -> ast.AST:
144        """
145        Transform Name nodes with certain values into Constant (keyword) nodes.
146        """
147        if node.id == 'true':
148            return ast.Constant(value=True)
149        if node.id == 'false':
150            return ast.Constant(value=False)
151        if node.id == 'null':
152            return ast.Constant(value=None)
153        return node
154
155
156class QMPShell(qmp.QEMUMonitorProtocol):
157    """
158    QMPShell provides a basic readline-based QMP shell.
159
160    :param address: Address of the QMP server.
161    :param pretty: Pretty-print QMP messages.
162    :param verbose: Echo outgoing QMP messages to console.
163    """
164    def __init__(self, address: qmp.SocketAddrT,
165                 pretty: bool = False, verbose: bool = False):
166        super().__init__(address)
167        self._greeting: Optional[QMPMessage] = None
168        self._completer = QMPCompleter()
169        self._transmode = False
170        self._actions: List[QMPMessage] = []
171        self._histfile = os.path.join(os.path.expanduser('~'),
172                                      '.qmp-shell_history')
173        self.pretty = pretty
174        self.verbose = verbose
175
176    def close(self) -> None:
177        # Hook into context manager of parent to save shell history.
178        self._save_history()
179        super().close()
180
181    def _fill_completion(self) -> None:
182        cmds = self.cmd('query-commands')
183        if 'error' in cmds:
184            return
185        for cmd in cmds['return']:
186            self._completer.append(cmd['name'])
187
188    def _completer_setup(self) -> None:
189        self._completer = QMPCompleter()
190        self._fill_completion()
191        readline.set_history_length(1024)
192        readline.set_completer(self._completer.complete)
193        readline.parse_and_bind("tab: complete")
194        # NB: default delimiters conflict with some command names
195        # (eg. query-), clearing everything as it doesn't seem to matter
196        readline.set_completer_delims('')
197        try:
198            readline.read_history_file(self._histfile)
199        except FileNotFoundError:
200            pass
201        except IOError as err:
202            msg = f"Failed to read history '{self._histfile}': {err!s}"
203            LOG.warning(msg)
204
205    def _save_history(self) -> None:
206        try:
207            readline.write_history_file(self._histfile)
208        except IOError as err:
209            msg = f"Failed to save history file '{self._histfile}': {err!s}"
210            LOG.warning(msg)
211
212    @classmethod
213    def _parse_value(cls, val: str) -> object:
214        try:
215            return int(val)
216        except ValueError:
217            pass
218
219        if val.lower() == 'true':
220            return True
221        if val.lower() == 'false':
222            return False
223        if val.startswith(('{', '[')):
224            # Try first as pure JSON:
225            try:
226                return json.loads(val)
227            except ValueError:
228                pass
229            # Try once again as FuzzyJSON:
230            try:
231                tree = ast.parse(val, mode='eval')
232                transformed = FuzzyJSON().visit(tree)
233                return ast.literal_eval(transformed)
234            except (SyntaxError, ValueError):
235                pass
236        return val
237
238    def _cli_expr(self,
239                  tokens: Sequence[str],
240                  parent: qmp.QMPObject) -> None:
241        for arg in tokens:
242            (key, sep, val) = arg.partition('=')
243            if sep != '=':
244                raise QMPShellError(
245                    f"Expected a key=value pair, got '{arg!s}'"
246                )
247
248            value = self._parse_value(val)
249            optpath = key.split('.')
250            curpath = []
251            for path in optpath[:-1]:
252                curpath.append(path)
253                obj = parent.get(path, {})
254                if not isinstance(obj, dict):
255                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
256                    raise QMPShellError(msg.format('.'.join(curpath)))
257                parent[path] = obj
258                parent = obj
259            if optpath[-1] in parent:
260                if isinstance(parent[optpath[-1]], dict):
261                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
262                    raise QMPShellError(msg.format('.'.join(curpath)))
263                raise QMPShellError(f'Cannot set "{key}" multiple times')
264            parent[optpath[-1]] = value
265
266    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
267        """
268        Build a QMP input object from a user provided command-line in the
269        following format:
270
271            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
272        """
273        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
274        cmdargs = re.findall(argument_regex, cmdline)
275        qmpcmd: QMPMessage
276
277        # Transactional CLI entry:
278        if cmdargs and cmdargs[0] == 'transaction(':
279            self._transmode = True
280            self._actions = []
281            cmdargs.pop(0)
282
283        # Transactional CLI exit:
284        if cmdargs and cmdargs[0] == ')' and self._transmode:
285            self._transmode = False
286            if len(cmdargs) > 1:
287                msg = 'Unexpected input after close of Transaction sub-shell'
288                raise QMPShellError(msg)
289            qmpcmd = {
290                'execute': 'transaction',
291                'arguments': {'actions': self._actions}
292            }
293            return qmpcmd
294
295        # No args, or no args remaining
296        if not cmdargs:
297            return None
298
299        if self._transmode:
300            # Parse and cache this Transactional Action
301            finalize = False
302            action = {'type': cmdargs[0], 'data': {}}
303            if cmdargs[-1] == ')':
304                cmdargs.pop(-1)
305                finalize = True
306            self._cli_expr(cmdargs[1:], action['data'])
307            self._actions.append(action)
308            return self._build_cmd(')') if finalize else None
309
310        # Standard command: parse and return it to be executed.
311        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
312        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
313        return qmpcmd
314
315    def _print(self, qmp_message: object) -> None:
316        jsobj = json.dumps(qmp_message,
317                           indent=4 if self.pretty else None,
318                           sort_keys=self.pretty)
319        print(str(jsobj))
320
321    def _execute_cmd(self, cmdline: str) -> bool:
322        try:
323            qmpcmd = self._build_cmd(cmdline)
324        except QMPShellError as err:
325            print(
326                f"Error while parsing command line: {err!s}\n"
327                "command format: <command-name> "
328                "[arg-name1=arg1] ... [arg-nameN=argN",
329                file=sys.stderr
330            )
331            return True
332        # For transaction mode, we may have just cached the action:
333        if qmpcmd is None:
334            return True
335        if self.verbose:
336            self._print(qmpcmd)
337        resp = self.cmd_obj(qmpcmd)
338        if resp is None:
339            print('Disconnected')
340            return False
341        self._print(resp)
342        return True
343
344    def connect(self, negotiate: bool = True) -> None:
345        self._greeting = super().connect(negotiate)
346        self._completer_setup()
347
348    def show_banner(self,
349                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
350        """
351        Print to stdio a greeting, and the QEMU version if available.
352        """
353        print(msg)
354        if not self._greeting:
355            print('Connected')
356            return
357        version = self._greeting['QMP']['version']['qemu']
358        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
359
360    @property
361    def prompt(self) -> str:
362        """
363        Return the current shell prompt, including a trailing space.
364        """
365        if self._transmode:
366            return 'TRANS> '
367        return '(QEMU) '
368
369    def read_exec_command(self) -> bool:
370        """
371        Read and execute a command.
372
373        @return True if execution was ok, return False if disconnected.
374        """
375        try:
376            cmdline = input(self.prompt)
377        except EOFError:
378            print()
379            return False
380
381        if cmdline == '':
382            for event in self.get_events():
383                print(event)
384            return True
385
386        return self._execute_cmd(cmdline)
387
388    def repl(self) -> Iterator[None]:
389        """
390        Return an iterator that implements the REPL.
391        """
392        self.show_banner()
393        while self.read_exec_command():
394            yield
395        self.close()
396
397
398class HMPShell(QMPShell):
399    """
400    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
401
402    :param address: Address of the QMP server.
403    :param pretty: Pretty-print QMP messages.
404    :param verbose: Echo outgoing QMP messages to console.
405    """
406    def __init__(self, address: qmp.SocketAddrT,
407                 pretty: bool = False, verbose: bool = False):
408        super().__init__(address, pretty, verbose)
409        self._cpu_index = 0
410
411    def _cmd_completion(self) -> None:
412        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
413            if cmd and cmd[0] != '[' and cmd[0] != '\t':
414                name = cmd.split()[0]  # drop help text
415                if name == 'info':
416                    continue
417                if name.find('|') != -1:
418                    # Command in the form 'foobar|f' or 'f|foobar', take the
419                    # full name
420                    opt = name.split('|')
421                    if len(opt[0]) == 1:
422                        name = opt[1]
423                    else:
424                        name = opt[0]
425                self._completer.append(name)
426                self._completer.append('help ' + name)  # help completion
427
428    def _info_completion(self) -> None:
429        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
430            if cmd:
431                self._completer.append('info ' + cmd.split()[1])
432
433    def _other_completion(self) -> None:
434        # special cases
435        self._completer.append('help info')
436
437    def _fill_completion(self) -> None:
438        self._cmd_completion()
439        self._info_completion()
440        self._other_completion()
441
442    def _cmd_passthrough(self, cmdline: str,
443                         cpu_index: int = 0) -> QMPMessage:
444        return self.cmd_obj({
445            'execute': 'human-monitor-command',
446            'arguments': {
447                'command-line': cmdline,
448                'cpu-index': cpu_index
449            }
450        })
451
452    def _execute_cmd(self, cmdline: str) -> bool:
453        if cmdline.split()[0] == "cpu":
454            # trap the cpu command, it requires special setting
455            try:
456                idx = int(cmdline.split()[1])
457                if 'return' not in self._cmd_passthrough('info version', idx):
458                    print('bad CPU index')
459                    return True
460                self._cpu_index = idx
461            except ValueError:
462                print('cpu command takes an integer argument')
463                return True
464        resp = self._cmd_passthrough(cmdline, self._cpu_index)
465        if resp is None:
466            print('Disconnected')
467            return False
468        assert 'return' in resp or 'error' in resp
469        if 'return' in resp:
470            # Success
471            if len(resp['return']) > 0:
472                print(resp['return'], end=' ')
473        else:
474            # Error
475            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
476        return True
477
478    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
479        QMPShell.show_banner(self, msg)
480
481
482def die(msg: str) -> NoReturn:
483    """Write an error to stderr, then exit with a return code of 1."""
484    sys.stderr.write('ERROR: %s\n' % msg)
485    sys.exit(1)
486
487
488def main() -> None:
489    """
490    qmp-shell entry point: parse command line arguments and start the REPL.
491    """
492    parser = argparse.ArgumentParser()
493    parser.add_argument('-H', '--hmp', action='store_true',
494                        help='Use HMP interface')
495    parser.add_argument('-N', '--skip-negotiation', action='store_true',
496                        help='Skip negotiate (for qemu-ga)')
497    parser.add_argument('-v', '--verbose', action='store_true',
498                        help='Verbose (echo commands sent and received)')
499    parser.add_argument('-p', '--pretty', action='store_true',
500                        help='Pretty-print JSON')
501
502    default_server = os.environ.get('QMP_SOCKET')
503    parser.add_argument('qmp_server', action='store',
504                        default=default_server,
505                        help='< UNIX socket path | TCP address:port >')
506
507    args = parser.parse_args()
508    if args.qmp_server is None:
509        parser.error("QMP socket or TCP address must be specified")
510
511    shell_class = HMPShell if args.hmp else QMPShell
512
513    try:
514        address = shell_class.parse_address(args.qmp_server)
515    except qmp.QMPBadPortError:
516        parser.error(f"Bad port number: {args.qmp_server}")
517        return  # pycharm doesn't know error() is noreturn
518
519    with shell_class(address, args.pretty, args.verbose) as qemu:
520        try:
521            qemu.connect(negotiate=not args.skip_negotiation)
522        except qmp.QMPConnectError:
523            die("Didn't get QMP greeting message")
524        except qmp.QMPCapabilitiesError:
525            die("Couldn't negotiate capabilities")
526        except OSError as err:
527            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
528
529        for _ in qemu.repl():
530            pass
531
532
533if __name__ == '__main__':
534    main()
535