xref: /qemu/tests/qemu-iotests/iotests.py (revision 92eecfff)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78test_dir = os.environ.get('TEST_DIR')
79sock_dir = os.environ.get('SOCK_DIR')
80output_dir = os.environ.get('OUTPUT_DIR', '.')
81cachemode = os.environ.get('CACHEMODE')
82aiomode = os.environ.get('AIOMODE')
83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87luks_default_secret_object = 'secret,id=keysec0,data=' + \
88                             os.environ.get('IMGKEYSECRET', '')
89luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93                              connect_stderr: bool = True) -> Tuple[str, int]:
94    """
95    Run a tool and return both its output and its exit code
96    """
97    stderr = subprocess.STDOUT if connect_stderr else None
98    subp = subprocess.Popen(args,
99                            stdout=subprocess.PIPE,
100                            stderr=stderr,
101                            universal_newlines=True)
102    output = subp.communicate()[0]
103    if subp.returncode < 0:
104        sys.stderr.write('%s received signal %i: %s\n'
105                         % (tool, -subp.returncode,
106                            ' '.join(qemu_img_args + list(args))))
107    return (output, subp.returncode)
108
109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110    """
111    Run qemu-img and return both its output and its exit code
112    """
113    full_args = qemu_img_args + list(args)
114    return qemu_tool_pipe_and_status('qemu-img', full_args)
115
116def qemu_img(*args: str) -> int:
117    '''Run qemu-img and return the exit code'''
118    return qemu_img_pipe_and_status(*args)[1]
119
120def ordered_qmp(qmsg, conv_keys=True):
121    # Dictionaries are not ordered prior to 3.6, therefore:
122    if isinstance(qmsg, list):
123        return [ordered_qmp(atom) for atom in qmsg]
124    if isinstance(qmsg, dict):
125        od = OrderedDict()
126        for k, v in sorted(qmsg.items()):
127            if conv_keys:
128                k = k.replace('_', '-')
129            od[k] = ordered_qmp(v, conv_keys=False)
130        return od
131    return qmsg
132
133def qemu_img_create(*args):
134    args = list(args)
135
136    # default luks support
137    if '-f' in args and args[args.index('-f') + 1] == 'luks':
138        if '-o' in args:
139            i = args.index('-o')
140            if 'key-secret' not in args[i + 1]:
141                args[i + 1].append(luks_default_key_secret_opt)
142                args.insert(i + 2, '--object')
143                args.insert(i + 3, luks_default_secret_object)
144        else:
145            args = ['-o', luks_default_key_secret_opt,
146                    '--object', luks_default_secret_object] + args
147
148    args.insert(0, 'create')
149
150    return qemu_img(*args)
151
152def qemu_img_measure(*args):
153    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154
155def qemu_img_check(*args):
156    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157
158def qemu_img_verbose(*args):
159    '''Run qemu-img without suppressing its output and return the exit code'''
160    exitcode = subprocess.call(qemu_img_args + list(args))
161    if exitcode < 0:
162        sys.stderr.write('qemu-img received signal %i: %s\n'
163                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
164    return exitcode
165
166def qemu_img_pipe(*args: str) -> str:
167    '''Run qemu-img and return its output'''
168    return qemu_img_pipe_and_status(*args)[0]
169
170def qemu_img_log(*args):
171    result = qemu_img_pipe(*args)
172    log(result, filters=[filter_testfiles])
173    return result
174
175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176    args = ['info']
177    if imgopts:
178        args.append('--image-opts')
179    else:
180        args += ['-f', imgfmt]
181    args += extra_args
182    args.append(filename)
183
184    output = qemu_img_pipe(*args)
185    if not filter_path:
186        filter_path = filename
187    log(filter_img_info(output, filter_path))
188
189def qemu_io(*args):
190    '''Run qemu-io and return the stdout data'''
191    args = qemu_io_args + list(args)
192    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193                            stderr=subprocess.STDOUT,
194                            universal_newlines=True)
195    output = subp.communicate()[0]
196    if subp.returncode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n'
198                         % (-subp.returncode, ' '.join(args)))
199    return output
200
201def qemu_io_log(*args):
202    result = qemu_io(*args)
203    log(result, filters=[filter_testfiles, filter_qemu_io])
204    return result
205
206def qemu_io_silent(*args):
207    '''Run qemu-io and return the exit code, suppressing stdout'''
208    args = qemu_io_args + list(args)
209    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
210    if exitcode < 0:
211        sys.stderr.write('qemu-io received signal %i: %s\n' %
212                         (-exitcode, ' '.join(args)))
213    return exitcode
214
215def qemu_io_silent_check(*args):
216    '''Run qemu-io and return the true if subprocess returned 0'''
217    args = qemu_io_args + list(args)
218    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
219                               stderr=subprocess.STDOUT)
220    return exitcode == 0
221
222def get_virtio_scsi_device():
223    if qemu_default_machine == 's390-ccw-virtio':
224        return 'virtio-scsi-ccw'
225    return 'virtio-scsi-pci'
226
227class QemuIoInteractive:
228    def __init__(self, *args):
229        self.args = qemu_io_args_no_fmt + list(args)
230        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
231                                   stdout=subprocess.PIPE,
232                                   stderr=subprocess.STDOUT,
233                                   universal_newlines=True)
234        out = self._p.stdout.read(9)
235        if out != 'qemu-io> ':
236            # Most probably qemu-io just failed to start.
237            # Let's collect the whole output and exit.
238            out += self._p.stdout.read()
239            self._p.wait(timeout=1)
240            raise ValueError(out)
241
242    def close(self):
243        self._p.communicate('q\n')
244
245    def _read_output(self):
246        pattern = 'qemu-io> '
247        n = len(pattern)
248        pos = 0
249        s = []
250        while pos != n:
251            c = self._p.stdout.read(1)
252            # check unexpected EOF
253            assert c != ''
254            s.append(c)
255            if c == pattern[pos]:
256                pos += 1
257            else:
258                pos = 0
259
260        return ''.join(s[:-n])
261
262    def cmd(self, cmd):
263        # quit command is in close(), '\n' is added automatically
264        assert '\n' not in cmd
265        cmd = cmd.strip()
266        assert cmd not in ('q', 'quit')
267        self._p.stdin.write(cmd + '\n')
268        self._p.stdin.flush()
269        return self._read_output()
270
271
272def qemu_nbd(*args):
273    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
274    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
275
276def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
277    '''Run qemu-nbd in daemon mode and return both the parent's exit code
278       and its output in case of an error'''
279    full_args = qemu_nbd_args + ['--fork'] + list(args)
280    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
281                                                   connect_stderr=False)
282    return returncode, output if returncode else ''
283
284def qemu_nbd_list_log(*args: str) -> str:
285    '''Run qemu-nbd to list remote exports'''
286    full_args = [qemu_nbd_prog, '-L'] + list(args)
287    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
288    log(output, filters=[filter_testfiles, filter_nbd_exports])
289    return output
290
291@contextmanager
292def qemu_nbd_popen(*args):
293    '''Context manager running qemu-nbd within the context'''
294    pid_file = file_path("pid")
295
296    cmd = list(qemu_nbd_args)
297    cmd.extend(('--persistent', '--pid-file', pid_file))
298    cmd.extend(args)
299
300    log('Start NBD server')
301    p = subprocess.Popen(cmd)
302    try:
303        while not os.path.exists(pid_file):
304            if p.poll() is not None:
305                raise RuntimeError(
306                    "qemu-nbd terminated with exit code {}: {}"
307                    .format(p.returncode, ' '.join(cmd)))
308
309            time.sleep(0.01)
310        yield
311    finally:
312        log('Kill NBD server')
313        p.kill()
314        p.wait()
315
316def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
317    '''Return True if two image files are identical'''
318    return qemu_img('compare', '-f', fmt1,
319                    '-F', fmt2, img1, img2) == 0
320
321def create_image(name, size):
322    '''Create a fully-allocated raw image with sector markers'''
323    file = open(name, 'wb')
324    i = 0
325    while i < size:
326        sector = struct.pack('>l504xl', i // 512, i // 512)
327        file.write(sector)
328        i = i + 512
329    file.close()
330
331def image_size(img):
332    '''Return image's virtual size'''
333    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
334    return json.loads(r)['virtual-size']
335
336def is_str(val):
337    return isinstance(val, str)
338
339test_dir_re = re.compile(r"%s" % test_dir)
340def filter_test_dir(msg):
341    return test_dir_re.sub("TEST_DIR", msg)
342
343win32_re = re.compile(r"\r")
344def filter_win32(msg):
345    return win32_re.sub("", msg)
346
347qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
348                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
349                        r"and [0-9\/.inf]* ops\/sec\)")
350def filter_qemu_io(msg):
351    msg = filter_win32(msg)
352    return qemu_io_re.sub("X ops; XX:XX:XX.X "
353                          "(XXX YYY/sec and XXX ops/sec)", msg)
354
355chown_re = re.compile(r"chown [0-9]+:[0-9]+")
356def filter_chown(msg):
357    return chown_re.sub("chown UID:GID", msg)
358
359def filter_qmp_event(event):
360    '''Filter a QMP event dict'''
361    event = dict(event)
362    if 'timestamp' in event:
363        event['timestamp']['seconds'] = 'SECS'
364        event['timestamp']['microseconds'] = 'USECS'
365    return event
366
367def filter_qmp(qmsg, filter_fn):
368    '''Given a string filter, filter a QMP object's values.
369    filter_fn takes a (key, value) pair.'''
370    # Iterate through either lists or dicts;
371    if isinstance(qmsg, list):
372        items = enumerate(qmsg)
373    else:
374        items = qmsg.items()
375
376    for k, v in items:
377        if isinstance(v, (dict, list)):
378            qmsg[k] = filter_qmp(v, filter_fn)
379        else:
380            qmsg[k] = filter_fn(k, v)
381    return qmsg
382
383def filter_testfiles(msg):
384    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
385    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
386    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
387
388def filter_qmp_testfiles(qmsg):
389    def _filter(_key, value):
390        if is_str(value):
391            return filter_testfiles(value)
392        return value
393    return filter_qmp(qmsg, _filter)
394
395def filter_virtio_scsi(output: str) -> str:
396    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
397
398def filter_qmp_virtio_scsi(qmsg):
399    def _filter(_key, value):
400        if is_str(value):
401            return filter_virtio_scsi(value)
402        return value
403    return filter_qmp(qmsg, _filter)
404
405def filter_generated_node_ids(msg):
406    return re.sub("#block[0-9]+", "NODE_NAME", msg)
407
408def filter_img_info(output, filename):
409    lines = []
410    for line in output.split('\n'):
411        if 'disk size' in line or 'actual-size' in line:
412            continue
413        line = line.replace(filename, 'TEST_IMG')
414        line = filter_testfiles(line)
415        line = line.replace(imgfmt, 'IMGFMT')
416        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
417        line = re.sub('uuid: [-a-f0-9]+',
418                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
419                      line)
420        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
421        lines.append(line)
422    return '\n'.join(lines)
423
424def filter_imgfmt(msg):
425    return msg.replace(imgfmt, 'IMGFMT')
426
427def filter_qmp_imgfmt(qmsg):
428    def _filter(_key, value):
429        if is_str(value):
430            return filter_imgfmt(value)
431        return value
432    return filter_qmp(qmsg, _filter)
433
434def filter_nbd_exports(output: str) -> str:
435    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
436
437
438Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
439
440def log(msg: Msg,
441        filters: Iterable[Callable[[Msg], Msg]] = (),
442        indent: Optional[int] = None) -> None:
443    """
444    Logs either a string message or a JSON serializable message (like QMP).
445    If indent is provided, JSON serializable messages are pretty-printed.
446    """
447    for flt in filters:
448        msg = flt(msg)
449    if isinstance(msg, (dict, list)):
450        # Don't sort if it's already sorted
451        do_sort = not isinstance(msg, OrderedDict)
452        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
453    else:
454        test_logger.info(msg)
455
456class Timeout:
457    def __init__(self, seconds, errmsg="Timeout"):
458        self.seconds = seconds
459        self.errmsg = errmsg
460    def __enter__(self):
461        signal.signal(signal.SIGALRM, self.timeout)
462        signal.setitimer(signal.ITIMER_REAL, self.seconds)
463        return self
464    def __exit__(self, exc_type, value, traceback):
465        signal.setitimer(signal.ITIMER_REAL, 0)
466        return False
467    def timeout(self, signum, frame):
468        raise Exception(self.errmsg)
469
470def file_pattern(name):
471    return "{0}-{1}".format(os.getpid(), name)
472
473class FilePath:
474    """
475    Context manager generating multiple file names. The generated files are
476    removed when exiting the context.
477
478    Example usage:
479
480        with FilePath('a.img', 'b.img') as (img_a, img_b):
481            # Use img_a and img_b here...
482
483        # a.img and b.img are automatically removed here.
484
485    By default images are created in iotests.test_dir. To create sockets use
486    iotests.sock_dir:
487
488       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
489
490    For convenience, calling with one argument yields a single file instead of
491    a tuple with one item.
492
493    """
494    def __init__(self, *names, base_dir=test_dir):
495        self.paths = [os.path.join(base_dir, file_pattern(name))
496                      for name in names]
497
498    def __enter__(self):
499        if len(self.paths) == 1:
500            return self.paths[0]
501        else:
502            return self.paths
503
504    def __exit__(self, exc_type, exc_val, exc_tb):
505        for path in self.paths:
506            try:
507                os.remove(path)
508            except OSError:
509                pass
510        return False
511
512
513def file_path_remover():
514    for path in reversed(file_path_remover.paths):
515        try:
516            os.remove(path)
517        except OSError:
518            pass
519
520
521def file_path(*names, base_dir=test_dir):
522    ''' Another way to get auto-generated filename that cleans itself up.
523
524    Use is as simple as:
525
526    img_a, img_b = file_path('a.img', 'b.img')
527    sock = file_path('socket')
528    '''
529
530    if not hasattr(file_path_remover, 'paths'):
531        file_path_remover.paths = []
532        atexit.register(file_path_remover)
533
534    paths = []
535    for name in names:
536        filename = file_pattern(name)
537        path = os.path.join(base_dir, filename)
538        file_path_remover.paths.append(path)
539        paths.append(path)
540
541    return paths[0] if len(paths) == 1 else paths
542
543def remote_filename(path):
544    if imgproto == 'file':
545        return path
546    elif imgproto == 'ssh':
547        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
548    else:
549        raise Exception("Protocol %s not supported" % (imgproto))
550
551class VM(qtest.QEMUQtestMachine):
552    '''A QEMU VM'''
553
554    def __init__(self, path_suffix=''):
555        name = "qemu%s-%d" % (path_suffix, os.getpid())
556        super().__init__(qemu_prog, qemu_opts, name=name,
557                         test_dir=test_dir,
558                         socket_scm_helper=socket_scm_helper,
559                         sock_dir=sock_dir)
560        self._num_drives = 0
561
562    def add_object(self, opts):
563        self._args.append('-object')
564        self._args.append(opts)
565        return self
566
567    def add_device(self, opts):
568        self._args.append('-device')
569        self._args.append(opts)
570        return self
571
572    def add_drive_raw(self, opts):
573        self._args.append('-drive')
574        self._args.append(opts)
575        return self
576
577    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
578        '''Add a virtio-blk drive to the VM'''
579        options = ['if=%s' % interface,
580                   'id=drive%d' % self._num_drives]
581
582        if path is not None:
583            options.append('file=%s' % path)
584            options.append('format=%s' % img_format)
585            options.append('cache=%s' % cachemode)
586            options.append('aio=%s' % aiomode)
587
588        if opts:
589            options.append(opts)
590
591        if img_format == 'luks' and 'key-secret' not in opts:
592            # default luks support
593            if luks_default_secret_object not in self._args:
594                self.add_object(luks_default_secret_object)
595
596            options.append(luks_default_key_secret_opt)
597
598        self._args.append('-drive')
599        self._args.append(','.join(options))
600        self._num_drives += 1
601        return self
602
603    def add_blockdev(self, opts):
604        self._args.append('-blockdev')
605        if isinstance(opts, str):
606            self._args.append(opts)
607        else:
608            self._args.append(','.join(opts))
609        return self
610
611    def add_incoming(self, addr):
612        self._args.append('-incoming')
613        self._args.append(addr)
614        return self
615
616    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
617        cmd = 'human-monitor-command'
618        kwargs: Dict[str, Any] = {'command-line': command_line}
619        if use_log:
620            return self.qmp_log(cmd, **kwargs)
621        else:
622            return self.qmp(cmd, **kwargs)
623
624    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
625        """Pause drive r/w operations"""
626        if not event:
627            self.pause_drive(drive, "read_aio")
628            self.pause_drive(drive, "write_aio")
629            return
630        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
631
632    def resume_drive(self, drive: str) -> None:
633        """Resume drive r/w operations"""
634        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
635
636    def hmp_qemu_io(self, drive: str, cmd: str,
637                    use_log: bool = False) -> QMPMessage:
638        """Write to a given drive using an HMP command"""
639        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
640
641    def flatten_qmp_object(self, obj, output=None, basestr=''):
642        if output is None:
643            output = dict()
644        if isinstance(obj, list):
645            for i, item in enumerate(obj):
646                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
647        elif isinstance(obj, dict):
648            for key in obj:
649                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
650        else:
651            output[basestr[:-1]] = obj # Strip trailing '.'
652        return output
653
654    def qmp_to_opts(self, obj):
655        obj = self.flatten_qmp_object(obj)
656        output_list = list()
657        for key in obj:
658            output_list += [key + '=' + obj[key]]
659        return ','.join(output_list)
660
661    def get_qmp_events_filtered(self, wait=60.0):
662        result = []
663        for ev in self.get_qmp_events(wait=wait):
664            result.append(filter_qmp_event(ev))
665        return result
666
667    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
668        full_cmd = OrderedDict((
669            ("execute", cmd),
670            ("arguments", ordered_qmp(kwargs))
671        ))
672        log(full_cmd, filters, indent=indent)
673        result = self.qmp(cmd, **kwargs)
674        log(result, filters, indent=indent)
675        return result
676
677    # Returns None on success, and an error string on failure
678    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
679                pre_finalize=None, cancel=False, wait=60.0):
680        """
681        run_job moves a job from creation through to dismissal.
682
683        :param job: String. ID of recently-launched job
684        :param auto_finalize: Bool. True if the job was launched with
685                              auto_finalize. Defaults to True.
686        :param auto_dismiss: Bool. True if the job was launched with
687                             auto_dismiss=True. Defaults to False.
688        :param pre_finalize: Callback. A callable that takes no arguments to be
689                             invoked prior to issuing job-finalize, if any.
690        :param cancel: Bool. When true, cancels the job after the pre_finalize
691                       callback.
692        :param wait: Float. Timeout value specifying how long to wait for any
693                     event, in seconds. Defaults to 60.0.
694        """
695        match_device = {'data': {'device': job}}
696        match_id = {'data': {'id': job}}
697        events = [
698            ('BLOCK_JOB_COMPLETED', match_device),
699            ('BLOCK_JOB_CANCELLED', match_device),
700            ('BLOCK_JOB_ERROR', match_device),
701            ('BLOCK_JOB_READY', match_device),
702            ('BLOCK_JOB_PENDING', match_id),
703            ('JOB_STATUS_CHANGE', match_id)
704        ]
705        error = None
706        while True:
707            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
708            if ev['event'] != 'JOB_STATUS_CHANGE':
709                log(ev)
710                continue
711            status = ev['data']['status']
712            if status == 'aborting':
713                result = self.qmp('query-jobs')
714                for j in result['return']:
715                    if j['id'] == job:
716                        error = j['error']
717                        log('Job failed: %s' % (j['error']))
718            elif status == 'ready':
719                self.qmp_log('job-complete', id=job)
720            elif status == 'pending' and not auto_finalize:
721                if pre_finalize:
722                    pre_finalize()
723                if cancel:
724                    self.qmp_log('job-cancel', id=job)
725                else:
726                    self.qmp_log('job-finalize', id=job)
727            elif status == 'concluded' and not auto_dismiss:
728                self.qmp_log('job-dismiss', id=job)
729            elif status == 'null':
730                return error
731
732    # Returns None on success, and an error string on failure
733    def blockdev_create(self, options, job_id='job0', filters=None):
734        if filters is None:
735            filters = [filter_qmp_testfiles]
736        result = self.qmp_log('blockdev-create', filters=filters,
737                              job_id=job_id, options=options)
738
739        if 'return' in result:
740            assert result['return'] == {}
741            job_result = self.run_job(job_id)
742        else:
743            job_result = result['error']
744
745        log("")
746        return job_result
747
748    def enable_migration_events(self, name):
749        log('Enabling migration QMP events on %s...' % name)
750        log(self.qmp('migrate-set-capabilities', capabilities=[
751            {
752                'capability': 'events',
753                'state': True
754            }
755        ]))
756
757    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
758        while True:
759            event = self.event_wait('MIGRATION')
760            # We use the default timeout, and with a timeout, event_wait()
761            # never returns None
762            assert event
763
764            log(event, filters=[filter_qmp_event])
765            if event['data']['status'] in ('completed', 'failed'):
766                break
767
768        if event['data']['status'] == 'completed':
769            # The event may occur in finish-migrate, so wait for the expected
770            # post-migration runstate
771            runstate = None
772            while runstate != expect_runstate:
773                runstate = self.qmp('query-status')['return']['status']
774            return True
775        else:
776            return False
777
778    def node_info(self, node_name):
779        nodes = self.qmp('query-named-block-nodes')
780        for x in nodes['return']:
781            if x['node-name'] == node_name:
782                return x
783        return None
784
785    def query_bitmaps(self):
786        res = self.qmp("query-named-block-nodes")
787        return {device['node-name']: device['dirty-bitmaps']
788                for device in res['return'] if 'dirty-bitmaps' in device}
789
790    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
791        """
792        get a specific bitmap from the object returned by query_bitmaps.
793        :param recording: If specified, filter results by the specified value.
794        :param bitmaps: If specified, use it instead of call query_bitmaps()
795        """
796        if bitmaps is None:
797            bitmaps = self.query_bitmaps()
798
799        for bitmap in bitmaps[node_name]:
800            if bitmap.get('name', '') == bitmap_name:
801                if recording is None or bitmap.get('recording') == recording:
802                    return bitmap
803        return None
804
805    def check_bitmap_status(self, node_name, bitmap_name, fields):
806        ret = self.get_bitmap(node_name, bitmap_name)
807
808        return fields.items() <= ret.items()
809
810    def assert_block_path(self, root, path, expected_node, graph=None):
811        """
812        Check whether the node under the given path in the block graph
813        is @expected_node.
814
815        @root is the node name of the node where the @path is rooted.
816
817        @path is a string that consists of child names separated by
818        slashes.  It must begin with a slash.
819
820        Examples for @root + @path:
821          - root="qcow2-node", path="/backing/file"
822          - root="quorum-node", path="/children.2/file"
823
824        Hypothetically, @path could be empty, in which case it would
825        point to @root.  However, in practice this case is not useful
826        and hence not allowed.
827
828        @expected_node may be None.  (All elements of the path but the
829        leaf must still exist.)
830
831        @graph may be None or the result of an x-debug-query-block-graph
832        call that has already been performed.
833        """
834        if graph is None:
835            graph = self.qmp('x-debug-query-block-graph')['return']
836
837        iter_path = iter(path.split('/'))
838
839        # Must start with a /
840        assert next(iter_path) == ''
841
842        node = next((node for node in graph['nodes'] if node['name'] == root),
843                    None)
844
845        # An empty @path is not allowed, so the root node must be present
846        assert node is not None, 'Root node %s not found' % root
847
848        for child_name in iter_path:
849            assert node is not None, 'Cannot follow path %s%s' % (root, path)
850
851            try:
852                node_id = next(edge['child'] for edge in graph['edges']
853                               if (edge['parent'] == node['id'] and
854                                   edge['name'] == child_name))
855
856                node = next(node for node in graph['nodes']
857                            if node['id'] == node_id)
858
859            except StopIteration:
860                node = None
861
862        if node is None:
863            assert expected_node is None, \
864                   'No node found under %s (but expected %s)' % \
865                   (path, expected_node)
866        else:
867            assert node['name'] == expected_node, \
868                   'Found node %s under %s (but expected %s)' % \
869                   (node['name'], path, expected_node)
870
871index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
872
873class QMPTestCase(unittest.TestCase):
874    '''Abstract base class for QMP test cases'''
875
876    def __init__(self, *args, **kwargs):
877        super().__init__(*args, **kwargs)
878        # Many users of this class set a VM property we rely on heavily
879        # in the methods below.
880        self.vm = None
881
882    def dictpath(self, d, path):
883        '''Traverse a path in a nested dict'''
884        for component in path.split('/'):
885            m = index_re.match(component)
886            if m:
887                component, idx = m.groups()
888                idx = int(idx)
889
890            if not isinstance(d, dict) or component not in d:
891                self.fail(f'failed path traversal for "{path}" in "{d}"')
892            d = d[component]
893
894            if m:
895                if not isinstance(d, list):
896                    self.fail(f'path component "{component}" in "{path}" '
897                              f'is not a list in "{d}"')
898                try:
899                    d = d[idx]
900                except IndexError:
901                    self.fail(f'invalid index "{idx}" in path "{path}" '
902                              f'in "{d}"')
903        return d
904
905    def assert_qmp_absent(self, d, path):
906        try:
907            result = self.dictpath(d, path)
908        except AssertionError:
909            return
910        self.fail('path "%s" has value "%s"' % (path, str(result)))
911
912    def assert_qmp(self, d, path, value):
913        '''Assert that the value for a specific path in a QMP dict
914           matches.  When given a list of values, assert that any of
915           them matches.'''
916
917        result = self.dictpath(d, path)
918
919        # [] makes no sense as a list of valid values, so treat it as
920        # an actual single value.
921        if isinstance(value, list) and value != []:
922            for v in value:
923                if result == v:
924                    return
925            self.fail('no match for "%s" in %s' % (str(result), str(value)))
926        else:
927            self.assertEqual(result, value,
928                             '"%s" is "%s", expected "%s"'
929                             % (path, str(result), str(value)))
930
931    def assert_no_active_block_jobs(self):
932        result = self.vm.qmp('query-block-jobs')
933        self.assert_qmp(result, 'return', [])
934
935    def assert_has_block_node(self, node_name=None, file_name=None):
936        """Issue a query-named-block-nodes and assert node_name and/or
937        file_name is present in the result"""
938        def check_equal_or_none(a, b):
939            return a is None or b is None or a == b
940        assert node_name or file_name
941        result = self.vm.qmp('query-named-block-nodes')
942        for x in result["return"]:
943            if check_equal_or_none(x.get("node-name"), node_name) and \
944                    check_equal_or_none(x.get("file"), file_name):
945                return
946        self.fail("Cannot find %s %s in result:\n%s" %
947                  (node_name, file_name, result))
948
949    def assert_json_filename_equal(self, json_filename, reference):
950        '''Asserts that the given filename is a json: filename and that its
951           content is equal to the given reference object'''
952        self.assertEqual(json_filename[:5], 'json:')
953        self.assertEqual(
954            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
955            self.vm.flatten_qmp_object(reference)
956        )
957
958    def cancel_and_wait(self, drive='drive0', force=False,
959                        resume=False, wait=60.0):
960        '''Cancel a block job and wait for it to finish, returning the event'''
961        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
962        self.assert_qmp(result, 'return', {})
963
964        if resume:
965            self.vm.resume_drive(drive)
966
967        cancelled = False
968        result = None
969        while not cancelled:
970            for event in self.vm.get_qmp_events(wait=wait):
971                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
972                   event['event'] == 'BLOCK_JOB_CANCELLED':
973                    self.assert_qmp(event, 'data/device', drive)
974                    result = event
975                    cancelled = True
976                elif event['event'] == 'JOB_STATUS_CHANGE':
977                    self.assert_qmp(event, 'data/id', drive)
978
979
980        self.assert_no_active_block_jobs()
981        return result
982
983    def wait_until_completed(self, drive='drive0', check_offset=True,
984                             wait=60.0, error=None):
985        '''Wait for a block job to finish, returning the event'''
986        while True:
987            for event in self.vm.get_qmp_events(wait=wait):
988                if event['event'] == 'BLOCK_JOB_COMPLETED':
989                    self.assert_qmp(event, 'data/device', drive)
990                    if error is None:
991                        self.assert_qmp_absent(event, 'data/error')
992                        if check_offset:
993                            self.assert_qmp(event, 'data/offset',
994                                            event['data']['len'])
995                    else:
996                        self.assert_qmp(event, 'data/error', error)
997                    self.assert_no_active_block_jobs()
998                    return event
999                if event['event'] == 'JOB_STATUS_CHANGE':
1000                    self.assert_qmp(event, 'data/id', drive)
1001
1002    def wait_ready(self, drive='drive0'):
1003        """Wait until a BLOCK_JOB_READY event, and return the event."""
1004        return self.vm.events_wait([
1005            ('BLOCK_JOB_READY',
1006             {'data': {'type': 'mirror', 'device': drive}}),
1007            ('BLOCK_JOB_READY',
1008             {'data': {'type': 'commit', 'device': drive}})
1009        ])
1010
1011    def wait_ready_and_cancel(self, drive='drive0'):
1012        self.wait_ready(drive=drive)
1013        event = self.cancel_and_wait(drive=drive)
1014        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1015        self.assert_qmp(event, 'data/type', 'mirror')
1016        self.assert_qmp(event, 'data/offset', event['data']['len'])
1017
1018    def complete_and_wait(self, drive='drive0', wait_ready=True,
1019                          completion_error=None):
1020        '''Complete a block job and wait for it to finish'''
1021        if wait_ready:
1022            self.wait_ready(drive=drive)
1023
1024        result = self.vm.qmp('block-job-complete', device=drive)
1025        self.assert_qmp(result, 'return', {})
1026
1027        event = self.wait_until_completed(drive=drive, error=completion_error)
1028        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1029
1030    def pause_wait(self, job_id='job0'):
1031        with Timeout(3, "Timeout waiting for job to pause"):
1032            while True:
1033                result = self.vm.qmp('query-block-jobs')
1034                found = False
1035                for job in result['return']:
1036                    if job['device'] == job_id:
1037                        found = True
1038                        if job['paused'] and not job['busy']:
1039                            return job
1040                        break
1041                assert found
1042
1043    def pause_job(self, job_id='job0', wait=True):
1044        result = self.vm.qmp('block-job-pause', device=job_id)
1045        self.assert_qmp(result, 'return', {})
1046        if wait:
1047            return self.pause_wait(job_id)
1048        return result
1049
1050    def case_skip(self, reason):
1051        '''Skip this test case'''
1052        case_notrun(reason)
1053        self.skipTest(reason)
1054
1055
1056def notrun(reason):
1057    '''Skip this test suite'''
1058    # Each test in qemu-iotests has a number ("seq")
1059    seq = os.path.basename(sys.argv[0])
1060
1061    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1062    logger.warning("%s not run: %s", seq, reason)
1063    sys.exit(0)
1064
1065def case_notrun(reason):
1066    '''Mark this test case as not having been run (without actually
1067    skipping it, that is left to the caller).  See
1068    QMPTestCase.case_skip() for a variant that actually skips the
1069    current test case.'''
1070
1071    # Each test in qemu-iotests has a number ("seq")
1072    seq = os.path.basename(sys.argv[0])
1073
1074    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1075        '    [case not run] ' + reason + '\n')
1076
1077def _verify_image_format(supported_fmts: Sequence[str] = (),
1078                         unsupported_fmts: Sequence[str] = ()) -> None:
1079    if 'generic' in supported_fmts and \
1080            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1081        # similar to
1082        #   _supported_fmt generic
1083        # for bash tests
1084        supported_fmts = ()
1085
1086    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1087    if not_sup or (imgfmt in unsupported_fmts):
1088        notrun('not suitable for this image format: %s' % imgfmt)
1089
1090    if imgfmt == 'luks':
1091        verify_working_luks()
1092
1093def _verify_protocol(supported: Sequence[str] = (),
1094                     unsupported: Sequence[str] = ()) -> None:
1095    assert not (supported and unsupported)
1096
1097    if 'generic' in supported:
1098        return
1099
1100    not_sup = supported and (imgproto not in supported)
1101    if not_sup or (imgproto in unsupported):
1102        notrun('not suitable for this protocol: %s' % imgproto)
1103
1104def _verify_platform(supported: Sequence[str] = (),
1105                     unsupported: Sequence[str] = ()) -> None:
1106    if any((sys.platform.startswith(x) for x in unsupported)):
1107        notrun('not suitable for this OS: %s' % sys.platform)
1108
1109    if supported:
1110        if not any((sys.platform.startswith(x) for x in supported)):
1111            notrun('not suitable for this OS: %s' % sys.platform)
1112
1113def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1114    if supported_cache_modes and (cachemode not in supported_cache_modes):
1115        notrun('not suitable for this cache mode: %s' % cachemode)
1116
1117def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1118    if supported_aio_modes and (aiomode not in supported_aio_modes):
1119        notrun('not suitable for this aio mode: %s' % aiomode)
1120
1121def supports_quorum():
1122    return 'quorum' in qemu_img_pipe('--help')
1123
1124def verify_quorum():
1125    '''Skip test suite if quorum support is not available'''
1126    if not supports_quorum():
1127        notrun('quorum support missing')
1128
1129def has_working_luks() -> Tuple[bool, str]:
1130    """
1131    Check whether our LUKS driver can actually create images
1132    (this extends to LUKS encryption for qcow2).
1133
1134    If not, return the reason why.
1135    """
1136
1137    img_file = f'{test_dir}/luks-test.luks'
1138    (output, status) = \
1139        qemu_img_pipe_and_status('create', '-f', 'luks',
1140                                 '--object', luks_default_secret_object,
1141                                 '-o', luks_default_key_secret_opt,
1142                                 '-o', 'iter-time=10',
1143                                 img_file, '1G')
1144    try:
1145        os.remove(img_file)
1146    except OSError:
1147        pass
1148
1149    if status != 0:
1150        reason = output
1151        for line in output.splitlines():
1152            if img_file + ':' in line:
1153                reason = line.split(img_file + ':', 1)[1].strip()
1154                break
1155
1156        return (False, reason)
1157    else:
1158        return (True, '')
1159
1160def verify_working_luks():
1161    """
1162    Skip test suite if LUKS does not work
1163    """
1164    (working, reason) = has_working_luks()
1165    if not working:
1166        notrun(reason)
1167
1168def qemu_pipe(*args: str) -> str:
1169    """
1170    Run qemu with an option to print something and exit (e.g. a help option).
1171
1172    :return: QEMU's stdout output.
1173    """
1174    full_args = [qemu_prog] + qemu_opts + list(args)
1175    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1176    return output
1177
1178def supported_formats(read_only=False):
1179    '''Set 'read_only' to True to check ro-whitelist
1180       Otherwise, rw-whitelist is checked'''
1181
1182    if not hasattr(supported_formats, "formats"):
1183        supported_formats.formats = {}
1184
1185    if read_only not in supported_formats.formats:
1186        format_message = qemu_pipe("-drive", "format=help")
1187        line = 1 if read_only else 0
1188        supported_formats.formats[read_only] = \
1189            format_message.splitlines()[line].split(":")[1].split()
1190
1191    return supported_formats.formats[read_only]
1192
1193def skip_if_unsupported(required_formats=(), read_only=False):
1194    '''Skip Test Decorator
1195       Runs the test if all the required formats are whitelisted'''
1196    def skip_test_decorator(func):
1197        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1198                         **kwargs: Dict[str, Any]) -> None:
1199            if callable(required_formats):
1200                fmts = required_formats(test_case)
1201            else:
1202                fmts = required_formats
1203
1204            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1205            if usf_list:
1206                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1207                test_case.case_skip(msg)
1208            else:
1209                func(test_case, *args, **kwargs)
1210        return func_wrapper
1211    return skip_test_decorator
1212
1213def skip_for_formats(formats: Sequence[str] = ()) \
1214    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1215                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1216    '''Skip Test Decorator
1217       Skips the test for the given formats'''
1218    def skip_test_decorator(func):
1219        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1220                         **kwargs: Dict[str, Any]) -> None:
1221            if imgfmt in formats:
1222                msg = f'{test_case}: Skipped for format {imgfmt}'
1223                test_case.case_skip(msg)
1224            else:
1225                func(test_case, *args, **kwargs)
1226        return func_wrapper
1227    return skip_test_decorator
1228
1229def skip_if_user_is_root(func):
1230    '''Skip Test Decorator
1231       Runs the test only without root permissions'''
1232    def func_wrapper(*args, **kwargs):
1233        if os.getuid() == 0:
1234            case_notrun('{}: cannot be run as root'.format(args[0]))
1235            return None
1236        else:
1237            return func(*args, **kwargs)
1238    return func_wrapper
1239
1240def execute_unittest(debug=False):
1241    """Executes unittests within the calling module."""
1242
1243    verbosity = 2 if debug else 1
1244
1245    if debug:
1246        output = sys.stdout
1247    else:
1248        # We need to filter out the time taken from the output so that
1249        # qemu-iotest can reliably diff the results against master output.
1250        output = io.StringIO()
1251
1252    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1253                                     verbosity=verbosity)
1254    try:
1255        # unittest.main() will use sys.exit(); so expect a SystemExit
1256        # exception
1257        unittest.main(testRunner=runner)
1258    finally:
1259        # We need to filter out the time taken from the output so that
1260        # qemu-iotest can reliably diff the results against master output.
1261        if not debug:
1262            out = output.getvalue()
1263            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1264
1265            # Hide skipped tests from the reference output
1266            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1267            out_first_line, out_rest = out.split('\n', 1)
1268            out = out_first_line.replace('s', '.') + '\n' + out_rest
1269
1270            sys.stderr.write(out)
1271
1272def execute_setup_common(supported_fmts: Sequence[str] = (),
1273                         supported_platforms: Sequence[str] = (),
1274                         supported_cache_modes: Sequence[str] = (),
1275                         supported_aio_modes: Sequence[str] = (),
1276                         unsupported_fmts: Sequence[str] = (),
1277                         supported_protocols: Sequence[str] = (),
1278                         unsupported_protocols: Sequence[str] = ()) -> bool:
1279    """
1280    Perform necessary setup for either script-style or unittest-style tests.
1281
1282    :return: Bool; Whether or not debug mode has been requested via the CLI.
1283    """
1284    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1285
1286    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1287    # indicate that we're not being run via "check". There may be
1288    # other things set up by "check" that individual test cases rely
1289    # on.
1290    if test_dir is None or qemu_default_machine is None:
1291        sys.stderr.write('Please run this test via the "check" script\n')
1292        sys.exit(os.EX_USAGE)
1293
1294    debug = '-d' in sys.argv
1295    if debug:
1296        sys.argv.remove('-d')
1297    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1298
1299    _verify_image_format(supported_fmts, unsupported_fmts)
1300    _verify_protocol(supported_protocols, unsupported_protocols)
1301    _verify_platform(supported=supported_platforms)
1302    _verify_cache_mode(supported_cache_modes)
1303    _verify_aio_mode(supported_aio_modes)
1304
1305    return debug
1306
1307def execute_test(*args, test_function=None, **kwargs):
1308    """Run either unittest or script-style tests."""
1309
1310    debug = execute_setup_common(*args, **kwargs)
1311    if not test_function:
1312        execute_unittest(debug)
1313    else:
1314        test_function()
1315
1316def activate_logging():
1317    """Activate iotests.log() output to stdout for script-style tests."""
1318    handler = logging.StreamHandler(stream=sys.stdout)
1319    formatter = logging.Formatter('%(message)s')
1320    handler.setFormatter(formatter)
1321    test_logger.addHandler(handler)
1322    test_logger.setLevel(logging.INFO)
1323    test_logger.propagate = False
1324
1325# This is called from script-style iotests without a single point of entry
1326def script_initialize(*args, **kwargs):
1327    """Initialize script-style tests without running any tests."""
1328    activate_logging()
1329    execute_setup_common(*args, **kwargs)
1330
1331# This is called from script-style iotests with a single point of entry
1332def script_main(test_function, *args, **kwargs):
1333    """Run script-style tests outside of the unittest framework"""
1334    activate_logging()
1335    execute_test(*args, test_function=test_function, **kwargs)
1336
1337# This is called from unittest style iotests
1338def main(*args, **kwargs):
1339    """Run tests using the unittest framework"""
1340    execute_test(*args, **kwargs)
1341