xref: /qemu/tests/qemu-iotests/iotests.py (revision 34b52615)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
77
78gdb_qemu_env = os.environ.get('GDB_OPTIONS')
79qemu_gdb = []
80if gdb_qemu_env:
81    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
82
83qemu_print = os.environ.get('PRINT_QEMU', False)
84
85imgfmt = os.environ.get('IMGFMT', 'raw')
86imgproto = os.environ.get('IMGPROTO', 'file')
87output_dir = os.environ.get('OUTPUT_DIR', '.')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
210    """
211    Run qemu-img and return both its output and its exit code
212    """
213    is_create = bool(args and args[0] == 'create')
214    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
215    return qemu_tool_pipe_and_status('qemu-img', full_args,
216                                     drop_successful_output=is_create)
217
218def qemu_img(*args: str) -> int:
219    '''Run qemu-img and return the exit code'''
220    return qemu_img_pipe_and_status(*args)[1]
221
222def ordered_qmp(qmsg, conv_keys=True):
223    # Dictionaries are not ordered prior to 3.6, therefore:
224    if isinstance(qmsg, list):
225        return [ordered_qmp(atom) for atom in qmsg]
226    if isinstance(qmsg, dict):
227        od = OrderedDict()
228        for k, v in sorted(qmsg.items()):
229            if conv_keys:
230                k = k.replace('_', '-')
231            od[k] = ordered_qmp(v, conv_keys=False)
232        return od
233    return qmsg
234
235def qemu_img_create(*args):
236    return qemu_img('create', *args)
237
238def qemu_img_measure(*args):
239    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
240
241def qemu_img_check(*args):
242    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
243
244def qemu_img_pipe(*args: str) -> str:
245    '''Run qemu-img and return its output'''
246    return qemu_img_pipe_and_status(*args)[0]
247
248def qemu_img_log(*args):
249    result = qemu_img_pipe(*args)
250    log(result, filters=[filter_testfiles])
251    return result
252
253def img_info_log(filename, filter_path=None, use_image_opts=False,
254                 extra_args=()):
255    args = ['info']
256    if use_image_opts:
257        args.append('--image-opts')
258    else:
259        args += ['-f', imgfmt]
260    args += extra_args
261    args.append(filename)
262
263    output = qemu_img_pipe(*args)
264    if not filter_path:
265        filter_path = filename
266    log(filter_img_info(output, filter_path))
267
268def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
269    if '-f' in args or '--image-opts' in args:
270        return qemu_io_args_no_fmt + list(args)
271    else:
272        return qemu_io_args + list(args)
273
274def qemu_io_popen(*args):
275    return qemu_tool_popen(qemu_io_wrap_args(args))
276
277def qemu_io(*args):
278    '''Run qemu-io and return the stdout data'''
279    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
280
281def qemu_io_log(*args):
282    result = qemu_io(*args)
283    log(result, filters=[filter_testfiles, filter_qemu_io])
284    return result
285
286def qemu_io_silent(*args):
287    '''Run qemu-io and return the exit code, suppressing stdout'''
288    args = qemu_io_wrap_args(args)
289    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
290    if result.returncode < 0:
291        sys.stderr.write('qemu-io received signal %i: %s\n' %
292                         (-result.returncode, ' '.join(args)))
293    return result.returncode
294
295def qemu_io_silent_check(*args):
296    '''Run qemu-io and return the true if subprocess returned 0'''
297    args = qemu_io_wrap_args(args)
298    result = subprocess.run(args, stdout=subprocess.DEVNULL,
299                            stderr=subprocess.STDOUT, check=False)
300    return result.returncode == 0
301
302class QemuIoInteractive:
303    def __init__(self, *args):
304        self.args = qemu_io_wrap_args(args)
305        # We need to keep the Popen objext around, and not
306        # close it immediately. Therefore, disable the pylint check:
307        # pylint: disable=consider-using-with
308        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
309                                   stdout=subprocess.PIPE,
310                                   stderr=subprocess.STDOUT,
311                                   universal_newlines=True)
312        out = self._p.stdout.read(9)
313        if out != 'qemu-io> ':
314            # Most probably qemu-io just failed to start.
315            # Let's collect the whole output and exit.
316            out += self._p.stdout.read()
317            self._p.wait(timeout=1)
318            raise ValueError(out)
319
320    def close(self):
321        self._p.communicate('q\n')
322
323    def _read_output(self):
324        pattern = 'qemu-io> '
325        n = len(pattern)
326        pos = 0
327        s = []
328        while pos != n:
329            c = self._p.stdout.read(1)
330            # check unexpected EOF
331            assert c != ''
332            s.append(c)
333            if c == pattern[pos]:
334                pos += 1
335            else:
336                pos = 0
337
338        return ''.join(s[:-n])
339
340    def cmd(self, cmd):
341        # quit command is in close(), '\n' is added automatically
342        assert '\n' not in cmd
343        cmd = cmd.strip()
344        assert cmd not in ('q', 'quit')
345        self._p.stdin.write(cmd + '\n')
346        self._p.stdin.flush()
347        return self._read_output()
348
349
350class QemuStorageDaemon:
351    def __init__(self, *args: str, instance_id: str = 'a'):
352        assert '--pidfile' not in args
353        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
354        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
355
356        # Cannot use with here, we want the subprocess to stay around
357        # pylint: disable=consider-using-with
358        self._p = subprocess.Popen(all_args)
359        while not os.path.exists(self.pidfile):
360            if self._p.poll() is not None:
361                cmd = ' '.join(all_args)
362                raise RuntimeError(
363                    'qemu-storage-daemon terminated with exit code ' +
364                    f'{self._p.returncode}: {cmd}')
365
366            time.sleep(0.01)
367
368        with open(self.pidfile, encoding='utf-8') as f:
369            self._pid = int(f.read().strip())
370
371        assert self._pid == self._p.pid
372
373    def stop(self, kill_signal=15):
374        self._p.send_signal(kill_signal)
375        self._p.wait()
376        self._p = None
377
378        try:
379            os.remove(self.pidfile)
380        except OSError:
381            pass
382
383    def __del__(self):
384        if self._p is not None:
385            self.stop(kill_signal=9)
386
387
388def qemu_nbd(*args):
389    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
390    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
391
392def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
393    '''Run qemu-nbd in daemon mode and return both the parent's exit code
394       and its output in case of an error'''
395    full_args = qemu_nbd_args + ['--fork'] + list(args)
396    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
397                                                   connect_stderr=False)
398    return returncode, output if returncode else ''
399
400def qemu_nbd_list_log(*args: str) -> str:
401    '''Run qemu-nbd to list remote exports'''
402    full_args = [qemu_nbd_prog, '-L'] + list(args)
403    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
404    log(output, filters=[filter_testfiles, filter_nbd_exports])
405    return output
406
407@contextmanager
408def qemu_nbd_popen(*args):
409    '''Context manager running qemu-nbd within the context'''
410    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
411
412    assert not os.path.exists(pid_file)
413
414    cmd = list(qemu_nbd_args)
415    cmd.extend(('--persistent', '--pid-file', pid_file))
416    cmd.extend(args)
417
418    log('Start NBD server')
419    with subprocess.Popen(cmd) as p:
420        try:
421            while not os.path.exists(pid_file):
422                if p.poll() is not None:
423                    raise RuntimeError(
424                        "qemu-nbd terminated with exit code {}: {}"
425                        .format(p.returncode, ' '.join(cmd)))
426
427                time.sleep(0.01)
428            yield
429        finally:
430            if os.path.exists(pid_file):
431                os.remove(pid_file)
432            log('Kill NBD server')
433            p.kill()
434            p.wait()
435
436def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
437    '''Return True if two image files are identical'''
438    return qemu_img('compare', '-f', fmt1,
439                    '-F', fmt2, img1, img2) == 0
440
441def create_image(name, size):
442    '''Create a fully-allocated raw image with sector markers'''
443    with open(name, 'wb') as file:
444        i = 0
445        while i < size:
446            sector = struct.pack('>l504xl', i // 512, i // 512)
447            file.write(sector)
448            i = i + 512
449
450def image_size(img):
451    '''Return image's virtual size'''
452    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
453    return json.loads(r)['virtual-size']
454
455def is_str(val):
456    return isinstance(val, str)
457
458test_dir_re = re.compile(r"%s" % test_dir)
459def filter_test_dir(msg):
460    return test_dir_re.sub("TEST_DIR", msg)
461
462win32_re = re.compile(r"\r")
463def filter_win32(msg):
464    return win32_re.sub("", msg)
465
466qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
467                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
468                        r"and [0-9\/.inf]* ops\/sec\)")
469def filter_qemu_io(msg):
470    msg = filter_win32(msg)
471    return qemu_io_re.sub("X ops; XX:XX:XX.X "
472                          "(XXX YYY/sec and XXX ops/sec)", msg)
473
474chown_re = re.compile(r"chown [0-9]+:[0-9]+")
475def filter_chown(msg):
476    return chown_re.sub("chown UID:GID", msg)
477
478def filter_qmp_event(event):
479    '''Filter a QMP event dict'''
480    event = dict(event)
481    if 'timestamp' in event:
482        event['timestamp']['seconds'] = 'SECS'
483        event['timestamp']['microseconds'] = 'USECS'
484    return event
485
486def filter_qmp(qmsg, filter_fn):
487    '''Given a string filter, filter a QMP object's values.
488    filter_fn takes a (key, value) pair.'''
489    # Iterate through either lists or dicts;
490    if isinstance(qmsg, list):
491        items = enumerate(qmsg)
492    else:
493        items = qmsg.items()
494
495    for k, v in items:
496        if isinstance(v, (dict, list)):
497            qmsg[k] = filter_qmp(v, filter_fn)
498        else:
499            qmsg[k] = filter_fn(k, v)
500    return qmsg
501
502def filter_testfiles(msg):
503    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
504    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
505    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
506
507def filter_qmp_testfiles(qmsg):
508    def _filter(_key, value):
509        if is_str(value):
510            return filter_testfiles(value)
511        return value
512    return filter_qmp(qmsg, _filter)
513
514def filter_virtio_scsi(output: str) -> str:
515    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
516
517def filter_qmp_virtio_scsi(qmsg):
518    def _filter(_key, value):
519        if is_str(value):
520            return filter_virtio_scsi(value)
521        return value
522    return filter_qmp(qmsg, _filter)
523
524def filter_generated_node_ids(msg):
525    return re.sub("#block[0-9]+", "NODE_NAME", msg)
526
527def filter_img_info(output, filename):
528    lines = []
529    for line in output.split('\n'):
530        if 'disk size' in line or 'actual-size' in line:
531            continue
532        line = line.replace(filename, 'TEST_IMG')
533        line = filter_testfiles(line)
534        line = line.replace(imgfmt, 'IMGFMT')
535        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
536        line = re.sub('uuid: [-a-f0-9]+',
537                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
538                      line)
539        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
540        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
541                      line)
542        lines.append(line)
543    return '\n'.join(lines)
544
545def filter_imgfmt(msg):
546    return msg.replace(imgfmt, 'IMGFMT')
547
548def filter_qmp_imgfmt(qmsg):
549    def _filter(_key, value):
550        if is_str(value):
551            return filter_imgfmt(value)
552        return value
553    return filter_qmp(qmsg, _filter)
554
555def filter_nbd_exports(output: str) -> str:
556    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
557
558
559Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
560
561def log(msg: Msg,
562        filters: Iterable[Callable[[Msg], Msg]] = (),
563        indent: Optional[int] = None) -> None:
564    """
565    Logs either a string message or a JSON serializable message (like QMP).
566    If indent is provided, JSON serializable messages are pretty-printed.
567    """
568    for flt in filters:
569        msg = flt(msg)
570    if isinstance(msg, (dict, list)):
571        # Don't sort if it's already sorted
572        do_sort = not isinstance(msg, OrderedDict)
573        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
574    else:
575        test_logger.info(msg)
576
577class Timeout:
578    def __init__(self, seconds, errmsg="Timeout"):
579        self.seconds = seconds
580        self.errmsg = errmsg
581    def __enter__(self):
582        if qemu_gdb or qemu_valgrind:
583            return self
584        signal.signal(signal.SIGALRM, self.timeout)
585        signal.setitimer(signal.ITIMER_REAL, self.seconds)
586        return self
587    def __exit__(self, exc_type, value, traceback):
588        if qemu_gdb or qemu_valgrind:
589            return False
590        signal.setitimer(signal.ITIMER_REAL, 0)
591        return False
592    def timeout(self, signum, frame):
593        raise Exception(self.errmsg)
594
595def file_pattern(name):
596    return "{0}-{1}".format(os.getpid(), name)
597
598class FilePath:
599    """
600    Context manager generating multiple file names. The generated files are
601    removed when exiting the context.
602
603    Example usage:
604
605        with FilePath('a.img', 'b.img') as (img_a, img_b):
606            # Use img_a and img_b here...
607
608        # a.img and b.img are automatically removed here.
609
610    By default images are created in iotests.test_dir. To create sockets use
611    iotests.sock_dir:
612
613       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
614
615    For convenience, calling with one argument yields a single file instead of
616    a tuple with one item.
617
618    """
619    def __init__(self, *names, base_dir=test_dir):
620        self.paths = [os.path.join(base_dir, file_pattern(name))
621                      for name in names]
622
623    def __enter__(self):
624        if len(self.paths) == 1:
625            return self.paths[0]
626        else:
627            return self.paths
628
629    def __exit__(self, exc_type, exc_val, exc_tb):
630        for path in self.paths:
631            try:
632                os.remove(path)
633            except OSError:
634                pass
635        return False
636
637
638def try_remove(img):
639    try:
640        os.remove(img)
641    except OSError:
642        pass
643
644def file_path_remover():
645    for path in reversed(file_path_remover.paths):
646        try_remove(path)
647
648
649def file_path(*names, base_dir=test_dir):
650    ''' Another way to get auto-generated filename that cleans itself up.
651
652    Use is as simple as:
653
654    img_a, img_b = file_path('a.img', 'b.img')
655    sock = file_path('socket')
656    '''
657
658    if not hasattr(file_path_remover, 'paths'):
659        file_path_remover.paths = []
660        atexit.register(file_path_remover)
661
662    paths = []
663    for name in names:
664        filename = file_pattern(name)
665        path = os.path.join(base_dir, filename)
666        file_path_remover.paths.append(path)
667        paths.append(path)
668
669    return paths[0] if len(paths) == 1 else paths
670
671def remote_filename(path):
672    if imgproto == 'file':
673        return path
674    elif imgproto == 'ssh':
675        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
676    else:
677        raise Exception("Protocol %s not supported" % (imgproto))
678
679class VM(qtest.QEMUQtestMachine):
680    '''A QEMU VM'''
681
682    def __init__(self, path_suffix=''):
683        name = "qemu%s-%d" % (path_suffix, os.getpid())
684        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
685        if qemu_gdb and qemu_valgrind:
686            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
687            sys.exit(1)
688        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
689        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
690                         name=name,
691                         base_temp_dir=test_dir,
692                         sock_dir=sock_dir, qmp_timer=timer)
693        self._num_drives = 0
694
695    def _post_shutdown(self) -> None:
696        super()._post_shutdown()
697        if not qemu_valgrind or not self._popen:
698            return
699        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
700        if self.exitcode() == 99:
701            with open(valgrind_filename, encoding='utf-8') as f:
702                print(f.read())
703        else:
704            os.remove(valgrind_filename)
705
706    def _pre_launch(self) -> None:
707        super()._pre_launch()
708        if qemu_print:
709            # set QEMU binary output to stdout
710            self._close_qemu_log_file()
711
712    def add_object(self, opts):
713        self._args.append('-object')
714        self._args.append(opts)
715        return self
716
717    def add_device(self, opts):
718        self._args.append('-device')
719        self._args.append(opts)
720        return self
721
722    def add_drive_raw(self, opts):
723        self._args.append('-drive')
724        self._args.append(opts)
725        return self
726
727    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
728        '''Add a virtio-blk drive to the VM'''
729        options = ['if=%s' % interface,
730                   'id=drive%d' % self._num_drives]
731
732        if path is not None:
733            options.append('file=%s' % path)
734            options.append('format=%s' % img_format)
735            options.append('cache=%s' % cachemode)
736            options.append('aio=%s' % aiomode)
737
738        if opts:
739            options.append(opts)
740
741        if img_format == 'luks' and 'key-secret' not in opts:
742            # default luks support
743            if luks_default_secret_object not in self._args:
744                self.add_object(luks_default_secret_object)
745
746            options.append(luks_default_key_secret_opt)
747
748        self._args.append('-drive')
749        self._args.append(','.join(options))
750        self._num_drives += 1
751        return self
752
753    def add_blockdev(self, opts):
754        self._args.append('-blockdev')
755        if isinstance(opts, str):
756            self._args.append(opts)
757        else:
758            self._args.append(','.join(opts))
759        return self
760
761    def add_incoming(self, addr):
762        self._args.append('-incoming')
763        self._args.append(addr)
764        return self
765
766    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
767        cmd = 'human-monitor-command'
768        kwargs: Dict[str, Any] = {'command-line': command_line}
769        if use_log:
770            return self.qmp_log(cmd, **kwargs)
771        else:
772            return self.qmp(cmd, **kwargs)
773
774    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
775        """Pause drive r/w operations"""
776        if not event:
777            self.pause_drive(drive, "read_aio")
778            self.pause_drive(drive, "write_aio")
779            return
780        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
781
782    def resume_drive(self, drive: str) -> None:
783        """Resume drive r/w operations"""
784        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
785
786    def hmp_qemu_io(self, drive: str, cmd: str,
787                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
788        """Write to a given drive using an HMP command"""
789        d = '-d ' if qdev else ''
790        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
791
792    def flatten_qmp_object(self, obj, output=None, basestr=''):
793        if output is None:
794            output = {}
795        if isinstance(obj, list):
796            for i, item in enumerate(obj):
797                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
798        elif isinstance(obj, dict):
799            for key in obj:
800                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
801        else:
802            output[basestr[:-1]] = obj # Strip trailing '.'
803        return output
804
805    def qmp_to_opts(self, obj):
806        obj = self.flatten_qmp_object(obj)
807        output_list = []
808        for key in obj:
809            output_list += [key + '=' + obj[key]]
810        return ','.join(output_list)
811
812    def get_qmp_events_filtered(self, wait=60.0):
813        result = []
814        for ev in self.get_qmp_events(wait=wait):
815            result.append(filter_qmp_event(ev))
816        return result
817
818    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
819        full_cmd = OrderedDict((
820            ("execute", cmd),
821            ("arguments", ordered_qmp(kwargs))
822        ))
823        log(full_cmd, filters, indent=indent)
824        result = self.qmp(cmd, **kwargs)
825        log(result, filters, indent=indent)
826        return result
827
828    # Returns None on success, and an error string on failure
829    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
830                pre_finalize=None, cancel=False, wait=60.0):
831        """
832        run_job moves a job from creation through to dismissal.
833
834        :param job: String. ID of recently-launched job
835        :param auto_finalize: Bool. True if the job was launched with
836                              auto_finalize. Defaults to True.
837        :param auto_dismiss: Bool. True if the job was launched with
838                             auto_dismiss=True. Defaults to False.
839        :param pre_finalize: Callback. A callable that takes no arguments to be
840                             invoked prior to issuing job-finalize, if any.
841        :param cancel: Bool. When true, cancels the job after the pre_finalize
842                       callback.
843        :param wait: Float. Timeout value specifying how long to wait for any
844                     event, in seconds. Defaults to 60.0.
845        """
846        match_device = {'data': {'device': job}}
847        match_id = {'data': {'id': job}}
848        events = [
849            ('BLOCK_JOB_COMPLETED', match_device),
850            ('BLOCK_JOB_CANCELLED', match_device),
851            ('BLOCK_JOB_ERROR', match_device),
852            ('BLOCK_JOB_READY', match_device),
853            ('BLOCK_JOB_PENDING', match_id),
854            ('JOB_STATUS_CHANGE', match_id)
855        ]
856        error = None
857        while True:
858            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
859            if ev['event'] != 'JOB_STATUS_CHANGE':
860                log(ev)
861                continue
862            status = ev['data']['status']
863            if status == 'aborting':
864                result = self.qmp('query-jobs')
865                for j in result['return']:
866                    if j['id'] == job:
867                        error = j['error']
868                        log('Job failed: %s' % (j['error']))
869            elif status == 'ready':
870                self.qmp_log('job-complete', id=job)
871            elif status == 'pending' and not auto_finalize:
872                if pre_finalize:
873                    pre_finalize()
874                if cancel:
875                    self.qmp_log('job-cancel', id=job)
876                else:
877                    self.qmp_log('job-finalize', id=job)
878            elif status == 'concluded' and not auto_dismiss:
879                self.qmp_log('job-dismiss', id=job)
880            elif status == 'null':
881                return error
882
883    # Returns None on success, and an error string on failure
884    def blockdev_create(self, options, job_id='job0', filters=None):
885        if filters is None:
886            filters = [filter_qmp_testfiles]
887        result = self.qmp_log('blockdev-create', filters=filters,
888                              job_id=job_id, options=options)
889
890        if 'return' in result:
891            assert result['return'] == {}
892            job_result = self.run_job(job_id)
893        else:
894            job_result = result['error']
895
896        log("")
897        return job_result
898
899    def enable_migration_events(self, name):
900        log('Enabling migration QMP events on %s...' % name)
901        log(self.qmp('migrate-set-capabilities', capabilities=[
902            {
903                'capability': 'events',
904                'state': True
905            }
906        ]))
907
908    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
909        while True:
910            event = self.event_wait('MIGRATION')
911            # We use the default timeout, and with a timeout, event_wait()
912            # never returns None
913            assert event
914
915            log(event, filters=[filter_qmp_event])
916            if event['data']['status'] in ('completed', 'failed'):
917                break
918
919        if event['data']['status'] == 'completed':
920            # The event may occur in finish-migrate, so wait for the expected
921            # post-migration runstate
922            runstate = None
923            while runstate != expect_runstate:
924                runstate = self.qmp('query-status')['return']['status']
925            return True
926        else:
927            return False
928
929    def node_info(self, node_name):
930        nodes = self.qmp('query-named-block-nodes')
931        for x in nodes['return']:
932            if x['node-name'] == node_name:
933                return x
934        return None
935
936    def query_bitmaps(self):
937        res = self.qmp("query-named-block-nodes")
938        return {device['node-name']: device['dirty-bitmaps']
939                for device in res['return'] if 'dirty-bitmaps' in device}
940
941    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
942        """
943        get a specific bitmap from the object returned by query_bitmaps.
944        :param recording: If specified, filter results by the specified value.
945        :param bitmaps: If specified, use it instead of call query_bitmaps()
946        """
947        if bitmaps is None:
948            bitmaps = self.query_bitmaps()
949
950        for bitmap in bitmaps[node_name]:
951            if bitmap.get('name', '') == bitmap_name:
952                if recording is None or bitmap.get('recording') == recording:
953                    return bitmap
954        return None
955
956    def check_bitmap_status(self, node_name, bitmap_name, fields):
957        ret = self.get_bitmap(node_name, bitmap_name)
958
959        return fields.items() <= ret.items()
960
961    def assert_block_path(self, root, path, expected_node, graph=None):
962        """
963        Check whether the node under the given path in the block graph
964        is @expected_node.
965
966        @root is the node name of the node where the @path is rooted.
967
968        @path is a string that consists of child names separated by
969        slashes.  It must begin with a slash.
970
971        Examples for @root + @path:
972          - root="qcow2-node", path="/backing/file"
973          - root="quorum-node", path="/children.2/file"
974
975        Hypothetically, @path could be empty, in which case it would
976        point to @root.  However, in practice this case is not useful
977        and hence not allowed.
978
979        @expected_node may be None.  (All elements of the path but the
980        leaf must still exist.)
981
982        @graph may be None or the result of an x-debug-query-block-graph
983        call that has already been performed.
984        """
985        if graph is None:
986            graph = self.qmp('x-debug-query-block-graph')['return']
987
988        iter_path = iter(path.split('/'))
989
990        # Must start with a /
991        assert next(iter_path) == ''
992
993        node = next((node for node in graph['nodes'] if node['name'] == root),
994                    None)
995
996        # An empty @path is not allowed, so the root node must be present
997        assert node is not None, 'Root node %s not found' % root
998
999        for child_name in iter_path:
1000            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1001
1002            try:
1003                node_id = next(edge['child'] for edge in graph['edges']
1004                               if (edge['parent'] == node['id'] and
1005                                   edge['name'] == child_name))
1006
1007                node = next(node for node in graph['nodes']
1008                            if node['id'] == node_id)
1009
1010            except StopIteration:
1011                node = None
1012
1013        if node is None:
1014            assert expected_node is None, \
1015                   'No node found under %s (but expected %s)' % \
1016                   (path, expected_node)
1017        else:
1018            assert node['name'] == expected_node, \
1019                   'Found node %s under %s (but expected %s)' % \
1020                   (node['name'], path, expected_node)
1021
1022index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1023
1024class QMPTestCase(unittest.TestCase):
1025    '''Abstract base class for QMP test cases'''
1026
1027    def __init__(self, *args, **kwargs):
1028        super().__init__(*args, **kwargs)
1029        # Many users of this class set a VM property we rely on heavily
1030        # in the methods below.
1031        self.vm = None
1032
1033    def dictpath(self, d, path):
1034        '''Traverse a path in a nested dict'''
1035        for component in path.split('/'):
1036            m = index_re.match(component)
1037            if m:
1038                component, idx = m.groups()
1039                idx = int(idx)
1040
1041            if not isinstance(d, dict) or component not in d:
1042                self.fail(f'failed path traversal for "{path}" in "{d}"')
1043            d = d[component]
1044
1045            if m:
1046                if not isinstance(d, list):
1047                    self.fail(f'path component "{component}" in "{path}" '
1048                              f'is not a list in "{d}"')
1049                try:
1050                    d = d[idx]
1051                except IndexError:
1052                    self.fail(f'invalid index "{idx}" in path "{path}" '
1053                              f'in "{d}"')
1054        return d
1055
1056    def assert_qmp_absent(self, d, path):
1057        try:
1058            result = self.dictpath(d, path)
1059        except AssertionError:
1060            return
1061        self.fail('path "%s" has value "%s"' % (path, str(result)))
1062
1063    def assert_qmp(self, d, path, value):
1064        '''Assert that the value for a specific path in a QMP dict
1065           matches.  When given a list of values, assert that any of
1066           them matches.'''
1067
1068        result = self.dictpath(d, path)
1069
1070        # [] makes no sense as a list of valid values, so treat it as
1071        # an actual single value.
1072        if isinstance(value, list) and value != []:
1073            for v in value:
1074                if result == v:
1075                    return
1076            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1077        else:
1078            self.assertEqual(result, value,
1079                             '"%s" is "%s", expected "%s"'
1080                             % (path, str(result), str(value)))
1081
1082    def assert_no_active_block_jobs(self):
1083        result = self.vm.qmp('query-block-jobs')
1084        self.assert_qmp(result, 'return', [])
1085
1086    def assert_has_block_node(self, node_name=None, file_name=None):
1087        """Issue a query-named-block-nodes and assert node_name and/or
1088        file_name is present in the result"""
1089        def check_equal_or_none(a, b):
1090            return a is None or b is None or a == b
1091        assert node_name or file_name
1092        result = self.vm.qmp('query-named-block-nodes')
1093        for x in result["return"]:
1094            if check_equal_or_none(x.get("node-name"), node_name) and \
1095                    check_equal_or_none(x.get("file"), file_name):
1096                return
1097        self.fail("Cannot find %s %s in result:\n%s" %
1098                  (node_name, file_name, result))
1099
1100    def assert_json_filename_equal(self, json_filename, reference):
1101        '''Asserts that the given filename is a json: filename and that its
1102           content is equal to the given reference object'''
1103        self.assertEqual(json_filename[:5], 'json:')
1104        self.assertEqual(
1105            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1106            self.vm.flatten_qmp_object(reference)
1107        )
1108
1109    def cancel_and_wait(self, drive='drive0', force=False,
1110                        resume=False, wait=60.0):
1111        '''Cancel a block job and wait for it to finish, returning the event'''
1112        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1113        self.assert_qmp(result, 'return', {})
1114
1115        if resume:
1116            self.vm.resume_drive(drive)
1117
1118        cancelled = False
1119        result = None
1120        while not cancelled:
1121            for event in self.vm.get_qmp_events(wait=wait):
1122                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1123                   event['event'] == 'BLOCK_JOB_CANCELLED':
1124                    self.assert_qmp(event, 'data/device', drive)
1125                    result = event
1126                    cancelled = True
1127                elif event['event'] == 'JOB_STATUS_CHANGE':
1128                    self.assert_qmp(event, 'data/id', drive)
1129
1130
1131        self.assert_no_active_block_jobs()
1132        return result
1133
1134    def wait_until_completed(self, drive='drive0', check_offset=True,
1135                             wait=60.0, error=None):
1136        '''Wait for a block job to finish, returning the event'''
1137        while True:
1138            for event in self.vm.get_qmp_events(wait=wait):
1139                if event['event'] == 'BLOCK_JOB_COMPLETED':
1140                    self.assert_qmp(event, 'data/device', drive)
1141                    if error is None:
1142                        self.assert_qmp_absent(event, 'data/error')
1143                        if check_offset:
1144                            self.assert_qmp(event, 'data/offset',
1145                                            event['data']['len'])
1146                    else:
1147                        self.assert_qmp(event, 'data/error', error)
1148                    self.assert_no_active_block_jobs()
1149                    return event
1150                if event['event'] == 'JOB_STATUS_CHANGE':
1151                    self.assert_qmp(event, 'data/id', drive)
1152
1153    def wait_ready(self, drive='drive0'):
1154        """Wait until a BLOCK_JOB_READY event, and return the event."""
1155        return self.vm.events_wait([
1156            ('BLOCK_JOB_READY',
1157             {'data': {'type': 'mirror', 'device': drive}}),
1158            ('BLOCK_JOB_READY',
1159             {'data': {'type': 'commit', 'device': drive}})
1160        ])
1161
1162    def wait_ready_and_cancel(self, drive='drive0'):
1163        self.wait_ready(drive=drive)
1164        event = self.cancel_and_wait(drive=drive)
1165        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1166        self.assert_qmp(event, 'data/type', 'mirror')
1167        self.assert_qmp(event, 'data/offset', event['data']['len'])
1168
1169    def complete_and_wait(self, drive='drive0', wait_ready=True,
1170                          completion_error=None):
1171        '''Complete a block job and wait for it to finish'''
1172        if wait_ready:
1173            self.wait_ready(drive=drive)
1174
1175        result = self.vm.qmp('block-job-complete', device=drive)
1176        self.assert_qmp(result, 'return', {})
1177
1178        event = self.wait_until_completed(drive=drive, error=completion_error)
1179        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1180
1181    def pause_wait(self, job_id='job0'):
1182        with Timeout(3, "Timeout waiting for job to pause"):
1183            while True:
1184                result = self.vm.qmp('query-block-jobs')
1185                found = False
1186                for job in result['return']:
1187                    if job['device'] == job_id:
1188                        found = True
1189                        if job['paused'] and not job['busy']:
1190                            return job
1191                        break
1192                assert found
1193
1194    def pause_job(self, job_id='job0', wait=True):
1195        result = self.vm.qmp('block-job-pause', device=job_id)
1196        self.assert_qmp(result, 'return', {})
1197        if wait:
1198            return self.pause_wait(job_id)
1199        return result
1200
1201    def case_skip(self, reason):
1202        '''Skip this test case'''
1203        case_notrun(reason)
1204        self.skipTest(reason)
1205
1206
1207def notrun(reason):
1208    '''Skip this test suite'''
1209    # Each test in qemu-iotests has a number ("seq")
1210    seq = os.path.basename(sys.argv[0])
1211
1212    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1213            as outfile:
1214        outfile.write(reason + '\n')
1215    logger.warning("%s not run: %s", seq, reason)
1216    sys.exit(0)
1217
1218def case_notrun(reason):
1219    '''Mark this test case as not having been run (without actually
1220    skipping it, that is left to the caller).  See
1221    QMPTestCase.case_skip() for a variant that actually skips the
1222    current test case.'''
1223
1224    # Each test in qemu-iotests has a number ("seq")
1225    seq = os.path.basename(sys.argv[0])
1226
1227    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1228            as outfile:
1229        outfile.write('    [case not run] ' + reason + '\n')
1230
1231def _verify_image_format(supported_fmts: Sequence[str] = (),
1232                         unsupported_fmts: Sequence[str] = ()) -> None:
1233    if 'generic' in supported_fmts and \
1234            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1235        # similar to
1236        #   _supported_fmt generic
1237        # for bash tests
1238        supported_fmts = ()
1239
1240    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1241    if not_sup or (imgfmt in unsupported_fmts):
1242        notrun('not suitable for this image format: %s' % imgfmt)
1243
1244    if imgfmt == 'luks':
1245        verify_working_luks()
1246
1247def _verify_protocol(supported: Sequence[str] = (),
1248                     unsupported: Sequence[str] = ()) -> None:
1249    assert not (supported and unsupported)
1250
1251    if 'generic' in supported:
1252        return
1253
1254    not_sup = supported and (imgproto not in supported)
1255    if not_sup or (imgproto in unsupported):
1256        notrun('not suitable for this protocol: %s' % imgproto)
1257
1258def _verify_platform(supported: Sequence[str] = (),
1259                     unsupported: Sequence[str] = ()) -> None:
1260    if any((sys.platform.startswith(x) for x in unsupported)):
1261        notrun('not suitable for this OS: %s' % sys.platform)
1262
1263    if supported:
1264        if not any((sys.platform.startswith(x) for x in supported)):
1265            notrun('not suitable for this OS: %s' % sys.platform)
1266
1267def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1268    if supported_cache_modes and (cachemode not in supported_cache_modes):
1269        notrun('not suitable for this cache mode: %s' % cachemode)
1270
1271def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1272    if supported_aio_modes and (aiomode not in supported_aio_modes):
1273        notrun('not suitable for this aio mode: %s' % aiomode)
1274
1275def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1276    usf_list = list(set(required_formats) - set(supported_formats()))
1277    if usf_list:
1278        notrun(f'formats {usf_list} are not whitelisted')
1279
1280
1281def _verify_virtio_blk() -> None:
1282    out = qemu_pipe('-M', 'none', '-device', 'help')
1283    if 'virtio-blk' not in out:
1284        notrun('Missing virtio-blk in QEMU binary')
1285
1286def _verify_virtio_scsi_pci_or_ccw() -> None:
1287    out = qemu_pipe('-M', 'none', '-device', 'help')
1288    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1289        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1290
1291
1292def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1293    imgopts = os.environ.get('IMGOPTS')
1294    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1295    # but it supported only for bash tests. We don't have a concept of global
1296    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1297    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1298    unsup = list(unsupported) + ['$']
1299    if imgopts and any(x in imgopts for x in unsup):
1300        notrun(f'not suitable for this imgopts: {imgopts}')
1301
1302
1303def supports_quorum():
1304    return 'quorum' in qemu_img_pipe('--help')
1305
1306def verify_quorum():
1307    '''Skip test suite if quorum support is not available'''
1308    if not supports_quorum():
1309        notrun('quorum support missing')
1310
1311def has_working_luks() -> Tuple[bool, str]:
1312    """
1313    Check whether our LUKS driver can actually create images
1314    (this extends to LUKS encryption for qcow2).
1315
1316    If not, return the reason why.
1317    """
1318
1319    img_file = f'{test_dir}/luks-test.luks'
1320    (output, status) = \
1321        qemu_img_pipe_and_status('create', '-f', 'luks',
1322                                 '--object', luks_default_secret_object,
1323                                 '-o', luks_default_key_secret_opt,
1324                                 '-o', 'iter-time=10',
1325                                 img_file, '1G')
1326    try:
1327        os.remove(img_file)
1328    except OSError:
1329        pass
1330
1331    if status != 0:
1332        reason = output
1333        for line in output.splitlines():
1334            if img_file + ':' in line:
1335                reason = line.split(img_file + ':', 1)[1].strip()
1336                break
1337
1338        return (False, reason)
1339    else:
1340        return (True, '')
1341
1342def verify_working_luks():
1343    """
1344    Skip test suite if LUKS does not work
1345    """
1346    (working, reason) = has_working_luks()
1347    if not working:
1348        notrun(reason)
1349
1350def qemu_pipe(*args: str) -> str:
1351    """
1352    Run qemu with an option to print something and exit (e.g. a help option).
1353
1354    :return: QEMU's stdout output.
1355    """
1356    full_args = [qemu_prog] + qemu_opts + list(args)
1357    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1358    return output
1359
1360def supported_formats(read_only=False):
1361    '''Set 'read_only' to True to check ro-whitelist
1362       Otherwise, rw-whitelist is checked'''
1363
1364    if not hasattr(supported_formats, "formats"):
1365        supported_formats.formats = {}
1366
1367    if read_only not in supported_formats.formats:
1368        format_message = qemu_pipe("-drive", "format=help")
1369        line = 1 if read_only else 0
1370        supported_formats.formats[read_only] = \
1371            format_message.splitlines()[line].split(":")[1].split()
1372
1373    return supported_formats.formats[read_only]
1374
1375def skip_if_unsupported(required_formats=(), read_only=False):
1376    '''Skip Test Decorator
1377       Runs the test if all the required formats are whitelisted'''
1378    def skip_test_decorator(func):
1379        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1380                         **kwargs: Dict[str, Any]) -> None:
1381            if callable(required_formats):
1382                fmts = required_formats(test_case)
1383            else:
1384                fmts = required_formats
1385
1386            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1387            if usf_list:
1388                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1389                test_case.case_skip(msg)
1390            else:
1391                func(test_case, *args, **kwargs)
1392        return func_wrapper
1393    return skip_test_decorator
1394
1395def skip_for_formats(formats: Sequence[str] = ()) \
1396    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1397                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1398    '''Skip Test Decorator
1399       Skips the test for the given formats'''
1400    def skip_test_decorator(func):
1401        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1402                         **kwargs: Dict[str, Any]) -> None:
1403            if imgfmt in formats:
1404                msg = f'{test_case}: Skipped for format {imgfmt}'
1405                test_case.case_skip(msg)
1406            else:
1407                func(test_case, *args, **kwargs)
1408        return func_wrapper
1409    return skip_test_decorator
1410
1411def skip_if_user_is_root(func):
1412    '''Skip Test Decorator
1413       Runs the test only without root permissions'''
1414    def func_wrapper(*args, **kwargs):
1415        if os.getuid() == 0:
1416            case_notrun('{}: cannot be run as root'.format(args[0]))
1417            return None
1418        else:
1419            return func(*args, **kwargs)
1420    return func_wrapper
1421
1422# We need to filter out the time taken from the output so that
1423# qemu-iotest can reliably diff the results against master output,
1424# and hide skipped tests from the reference output.
1425
1426class ReproducibleTestResult(unittest.TextTestResult):
1427    def addSkip(self, test, reason):
1428        # Same as TextTestResult, but print dot instead of "s"
1429        unittest.TestResult.addSkip(self, test, reason)
1430        if self.showAll:
1431            self.stream.writeln("skipped {0!r}".format(reason))
1432        elif self.dots:
1433            self.stream.write(".")
1434            self.stream.flush()
1435
1436class ReproducibleStreamWrapper:
1437    def __init__(self, stream: TextIO):
1438        self.stream = stream
1439
1440    def __getattr__(self, attr):
1441        if attr in ('stream', '__getstate__'):
1442            raise AttributeError(attr)
1443        return getattr(self.stream, attr)
1444
1445    def write(self, arg=None):
1446        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1447        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1448        self.stream.write(arg)
1449
1450class ReproducibleTestRunner(unittest.TextTestRunner):
1451    def __init__(self, stream: Optional[TextIO] = None,
1452                 resultclass: Type[unittest.TestResult] =
1453                 ReproducibleTestResult,
1454                 **kwargs: Any) -> None:
1455        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1456        super().__init__(stream=rstream,           # type: ignore
1457                         descriptions=True,
1458                         resultclass=resultclass,
1459                         **kwargs)
1460
1461def execute_unittest(argv: List[str], debug: bool = False) -> None:
1462    """Executes unittests within the calling module."""
1463
1464    # Some tests have warnings, especially ResourceWarnings for unclosed
1465    # files and sockets.  Ignore them for now to ensure reproducibility of
1466    # the test output.
1467    unittest.main(argv=argv,
1468                  testRunner=ReproducibleTestRunner,
1469                  verbosity=2 if debug else 1,
1470                  warnings=None if sys.warnoptions else 'ignore')
1471
1472def execute_setup_common(supported_fmts: Sequence[str] = (),
1473                         supported_platforms: Sequence[str] = (),
1474                         supported_cache_modes: Sequence[str] = (),
1475                         supported_aio_modes: Sequence[str] = (),
1476                         unsupported_fmts: Sequence[str] = (),
1477                         supported_protocols: Sequence[str] = (),
1478                         unsupported_protocols: Sequence[str] = (),
1479                         required_fmts: Sequence[str] = (),
1480                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1481    """
1482    Perform necessary setup for either script-style or unittest-style tests.
1483
1484    :return: Bool; Whether or not debug mode has been requested via the CLI.
1485    """
1486    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1487
1488    debug = '-d' in sys.argv
1489    if debug:
1490        sys.argv.remove('-d')
1491    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1492
1493    _verify_image_format(supported_fmts, unsupported_fmts)
1494    _verify_protocol(supported_protocols, unsupported_protocols)
1495    _verify_platform(supported=supported_platforms)
1496    _verify_cache_mode(supported_cache_modes)
1497    _verify_aio_mode(supported_aio_modes)
1498    _verify_formats(required_fmts)
1499    _verify_virtio_blk()
1500    _verify_imgopts(unsupported_imgopts)
1501
1502    return debug
1503
1504def execute_test(*args, test_function=None, **kwargs):
1505    """Run either unittest or script-style tests."""
1506
1507    debug = execute_setup_common(*args, **kwargs)
1508    if not test_function:
1509        execute_unittest(sys.argv, debug)
1510    else:
1511        test_function()
1512
1513def activate_logging():
1514    """Activate iotests.log() output to stdout for script-style tests."""
1515    handler = logging.StreamHandler(stream=sys.stdout)
1516    formatter = logging.Formatter('%(message)s')
1517    handler.setFormatter(formatter)
1518    test_logger.addHandler(handler)
1519    test_logger.setLevel(logging.INFO)
1520    test_logger.propagate = False
1521
1522# This is called from script-style iotests without a single point of entry
1523def script_initialize(*args, **kwargs):
1524    """Initialize script-style tests without running any tests."""
1525    activate_logging()
1526    execute_setup_common(*args, **kwargs)
1527
1528# This is called from script-style iotests with a single point of entry
1529def script_main(test_function, *args, **kwargs):
1530    """Run script-style tests outside of the unittest framework"""
1531    activate_logging()
1532    execute_test(*args, test_function=test_function, **kwargs)
1533
1534# This is called from unittest style iotests
1535def main(*args, **kwargs):
1536    """Run tests using the unittest framework"""
1537    execute_test(*args, **kwargs)
1538