xref: /qemu/tests/qemu-iotests/iotests.py (revision b355f08a)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39# pylint: disable=import-error, wrong-import-position
40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41from qemu.machine import qtest
42from qemu.qmp import QMPMessage
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77gdb_qemu_env = os.environ.get('GDB_OPTIONS')
78qemu_gdb = []
79if gdb_qemu_env:
80    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
81
82qemu_print = os.environ.get('PRINT_QEMU', False)
83
84imgfmt = os.environ.get('IMGFMT', 'raw')
85imgproto = os.environ.get('IMGPROTO', 'file')
86output_dir = os.environ.get('OUTPUT_DIR', '.')
87
88try:
89    test_dir = os.environ['TEST_DIR']
90    sock_dir = os.environ['SOCK_DIR']
91    cachemode = os.environ['CACHEMODE']
92    aiomode = os.environ['AIOMODE']
93    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
94except KeyError:
95    # We are using these variables as proxies to indicate that we're
96    # not being run via "check". There may be other things set up by
97    # "check" that individual test cases rely on.
98    sys.stderr.write('Please run this test via the "check" script\n')
99    sys.exit(os.EX_USAGE)
100
101qemu_valgrind = []
102if os.environ.get('VALGRIND_QEMU') == "y" and \
103    os.environ.get('NO_VALGRIND') != "y":
104    valgrind_logfile = "--log-file=" + test_dir
105    # %p allows to put the valgrind process PID, since
106    # we don't know it a priori (subprocess.Popen is
107    # not yet invoked)
108    valgrind_logfile += "/%p.valgrind"
109
110    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
111
112socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
113
114luks_default_secret_object = 'secret,id=keysec0,data=' + \
115                             os.environ.get('IMGKEYSECRET', '')
116luks_default_key_secret_opt = 'key-secret=keysec0'
117
118sample_img_dir = os.environ['SAMPLE_IMG_DIR']
119
120
121def unarchive_sample_image(sample, fname):
122    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
123    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
124        shutil.copyfileobj(f_in, f_out)
125
126
127def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
128                              connect_stderr: bool = True) -> Tuple[str, int]:
129    """
130    Run a tool and return both its output and its exit code
131    """
132    stderr = subprocess.STDOUT if connect_stderr else None
133    with subprocess.Popen(args, stdout=subprocess.PIPE,
134                          stderr=stderr, universal_newlines=True) as subp:
135        output = subp.communicate()[0]
136        if subp.returncode < 0:
137            cmd = ' '.join(args)
138            sys.stderr.write(f'{tool} received signal \
139                               {-subp.returncode}: {cmd}\n')
140        return (output, subp.returncode)
141
142def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
143    """
144    Run qemu-img and return both its output and its exit code
145    """
146    full_args = qemu_img_args + list(args)
147    return qemu_tool_pipe_and_status('qemu-img', full_args)
148
149def qemu_img(*args: str) -> int:
150    '''Run qemu-img and return the exit code'''
151    return qemu_img_pipe_and_status(*args)[1]
152
153def ordered_qmp(qmsg, conv_keys=True):
154    # Dictionaries are not ordered prior to 3.6, therefore:
155    if isinstance(qmsg, list):
156        return [ordered_qmp(atom) for atom in qmsg]
157    if isinstance(qmsg, dict):
158        od = OrderedDict()
159        for k, v in sorted(qmsg.items()):
160            if conv_keys:
161                k = k.replace('_', '-')
162            od[k] = ordered_qmp(v, conv_keys=False)
163        return od
164    return qmsg
165
166def qemu_img_create(*args):
167    args = list(args)
168
169    # default luks support
170    if '-f' in args and args[args.index('-f') + 1] == 'luks':
171        if '-o' in args:
172            i = args.index('-o')
173            if 'key-secret' not in args[i + 1]:
174                args[i + 1].append(luks_default_key_secret_opt)
175                args.insert(i + 2, '--object')
176                args.insert(i + 3, luks_default_secret_object)
177        else:
178            args = ['-o', luks_default_key_secret_opt,
179                    '--object', luks_default_secret_object] + args
180
181    args.insert(0, 'create')
182
183    return qemu_img(*args)
184
185def qemu_img_measure(*args):
186    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
187
188def qemu_img_check(*args):
189    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
190
191def qemu_img_verbose(*args):
192    '''Run qemu-img without suppressing its output and return the exit code'''
193    exitcode = subprocess.call(qemu_img_args + list(args))
194    if exitcode < 0:
195        sys.stderr.write('qemu-img received signal %i: %s\n'
196                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
197    return exitcode
198
199def qemu_img_pipe(*args: str) -> str:
200    '''Run qemu-img and return its output'''
201    return qemu_img_pipe_and_status(*args)[0]
202
203def qemu_img_log(*args):
204    result = qemu_img_pipe(*args)
205    log(result, filters=[filter_testfiles])
206    return result
207
208def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
209    args = ['info']
210    if imgopts:
211        args.append('--image-opts')
212    else:
213        args += ['-f', imgfmt]
214    args += extra_args
215    args.append(filename)
216
217    output = qemu_img_pipe(*args)
218    if not filter_path:
219        filter_path = filename
220    log(filter_img_info(output, filter_path))
221
222def qemu_io(*args):
223    '''Run qemu-io and return the stdout data'''
224    args = qemu_io_args + list(args)
225    return qemu_tool_pipe_and_status('qemu-io', args)[0]
226
227def qemu_io_log(*args):
228    result = qemu_io(*args)
229    log(result, filters=[filter_testfiles, filter_qemu_io])
230    return result
231
232def qemu_io_silent(*args):
233    '''Run qemu-io and return the exit code, suppressing stdout'''
234    if '-f' in args or '--image-opts' in args:
235        default_args = qemu_io_args_no_fmt
236    else:
237        default_args = qemu_io_args
238
239    args = default_args + list(args)
240    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
241    if result.returncode < 0:
242        sys.stderr.write('qemu-io received signal %i: %s\n' %
243                         (-result.returncode, ' '.join(args)))
244    return result.returncode
245
246def qemu_io_silent_check(*args):
247    '''Run qemu-io and return the true if subprocess returned 0'''
248    args = qemu_io_args + list(args)
249    result = subprocess.run(args, stdout=subprocess.DEVNULL,
250                            stderr=subprocess.STDOUT, check=False)
251    return result.returncode == 0
252
253class QemuIoInteractive:
254    def __init__(self, *args):
255        self.args = qemu_io_args_no_fmt + list(args)
256        # We need to keep the Popen objext around, and not
257        # close it immediately. Therefore, disable the pylint check:
258        # pylint: disable=consider-using-with
259        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
260                                   stdout=subprocess.PIPE,
261                                   stderr=subprocess.STDOUT,
262                                   universal_newlines=True)
263        out = self._p.stdout.read(9)
264        if out != 'qemu-io> ':
265            # Most probably qemu-io just failed to start.
266            # Let's collect the whole output and exit.
267            out += self._p.stdout.read()
268            self._p.wait(timeout=1)
269            raise ValueError(out)
270
271    def close(self):
272        self._p.communicate('q\n')
273
274    def _read_output(self):
275        pattern = 'qemu-io> '
276        n = len(pattern)
277        pos = 0
278        s = []
279        while pos != n:
280            c = self._p.stdout.read(1)
281            # check unexpected EOF
282            assert c != ''
283            s.append(c)
284            if c == pattern[pos]:
285                pos += 1
286            else:
287                pos = 0
288
289        return ''.join(s[:-n])
290
291    def cmd(self, cmd):
292        # quit command is in close(), '\n' is added automatically
293        assert '\n' not in cmd
294        cmd = cmd.strip()
295        assert cmd not in ('q', 'quit')
296        self._p.stdin.write(cmd + '\n')
297        self._p.stdin.flush()
298        return self._read_output()
299
300
301def qemu_nbd(*args):
302    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
303    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
304
305def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
306    '''Run qemu-nbd in daemon mode and return both the parent's exit code
307       and its output in case of an error'''
308    full_args = qemu_nbd_args + ['--fork'] + list(args)
309    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
310                                                   connect_stderr=False)
311    return returncode, output if returncode else ''
312
313def qemu_nbd_list_log(*args: str) -> str:
314    '''Run qemu-nbd to list remote exports'''
315    full_args = [qemu_nbd_prog, '-L'] + list(args)
316    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
317    log(output, filters=[filter_testfiles, filter_nbd_exports])
318    return output
319
320@contextmanager
321def qemu_nbd_popen(*args):
322    '''Context manager running qemu-nbd within the context'''
323    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
324
325    assert not os.path.exists(pid_file)
326
327    cmd = list(qemu_nbd_args)
328    cmd.extend(('--persistent', '--pid-file', pid_file))
329    cmd.extend(args)
330
331    log('Start NBD server')
332    with subprocess.Popen(cmd) as p:
333        try:
334            while not os.path.exists(pid_file):
335                if p.poll() is not None:
336                    raise RuntimeError(
337                        "qemu-nbd terminated with exit code {}: {}"
338                        .format(p.returncode, ' '.join(cmd)))
339
340                time.sleep(0.01)
341            yield
342        finally:
343            if os.path.exists(pid_file):
344                os.remove(pid_file)
345            log('Kill NBD server')
346            p.kill()
347            p.wait()
348
349def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
350    '''Return True if two image files are identical'''
351    return qemu_img('compare', '-f', fmt1,
352                    '-F', fmt2, img1, img2) == 0
353
354def create_image(name, size):
355    '''Create a fully-allocated raw image with sector markers'''
356    with open(name, 'wb') as file:
357        i = 0
358        while i < size:
359            sector = struct.pack('>l504xl', i // 512, i // 512)
360            file.write(sector)
361            i = i + 512
362
363def image_size(img):
364    '''Return image's virtual size'''
365    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
366    return json.loads(r)['virtual-size']
367
368def is_str(val):
369    return isinstance(val, str)
370
371test_dir_re = re.compile(r"%s" % test_dir)
372def filter_test_dir(msg):
373    return test_dir_re.sub("TEST_DIR", msg)
374
375win32_re = re.compile(r"\r")
376def filter_win32(msg):
377    return win32_re.sub("", msg)
378
379qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
380                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
381                        r"and [0-9\/.inf]* ops\/sec\)")
382def filter_qemu_io(msg):
383    msg = filter_win32(msg)
384    return qemu_io_re.sub("X ops; XX:XX:XX.X "
385                          "(XXX YYY/sec and XXX ops/sec)", msg)
386
387chown_re = re.compile(r"chown [0-9]+:[0-9]+")
388def filter_chown(msg):
389    return chown_re.sub("chown UID:GID", msg)
390
391def filter_qmp_event(event):
392    '''Filter a QMP event dict'''
393    event = dict(event)
394    if 'timestamp' in event:
395        event['timestamp']['seconds'] = 'SECS'
396        event['timestamp']['microseconds'] = 'USECS'
397    return event
398
399def filter_qmp(qmsg, filter_fn):
400    '''Given a string filter, filter a QMP object's values.
401    filter_fn takes a (key, value) pair.'''
402    # Iterate through either lists or dicts;
403    if isinstance(qmsg, list):
404        items = enumerate(qmsg)
405    else:
406        items = qmsg.items()
407
408    for k, v in items:
409        if isinstance(v, (dict, list)):
410            qmsg[k] = filter_qmp(v, filter_fn)
411        else:
412            qmsg[k] = filter_fn(k, v)
413    return qmsg
414
415def filter_testfiles(msg):
416    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
417    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
418    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
419
420def filter_qmp_testfiles(qmsg):
421    def _filter(_key, value):
422        if is_str(value):
423            return filter_testfiles(value)
424        return value
425    return filter_qmp(qmsg, _filter)
426
427def filter_virtio_scsi(output: str) -> str:
428    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
429
430def filter_qmp_virtio_scsi(qmsg):
431    def _filter(_key, value):
432        if is_str(value):
433            return filter_virtio_scsi(value)
434        return value
435    return filter_qmp(qmsg, _filter)
436
437def filter_generated_node_ids(msg):
438    return re.sub("#block[0-9]+", "NODE_NAME", msg)
439
440def filter_img_info(output, filename):
441    lines = []
442    for line in output.split('\n'):
443        if 'disk size' in line or 'actual-size' in line:
444            continue
445        line = line.replace(filename, 'TEST_IMG')
446        line = filter_testfiles(line)
447        line = line.replace(imgfmt, 'IMGFMT')
448        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
449        line = re.sub('uuid: [-a-f0-9]+',
450                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
451                      line)
452        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
453        lines.append(line)
454    return '\n'.join(lines)
455
456def filter_imgfmt(msg):
457    return msg.replace(imgfmt, 'IMGFMT')
458
459def filter_qmp_imgfmt(qmsg):
460    def _filter(_key, value):
461        if is_str(value):
462            return filter_imgfmt(value)
463        return value
464    return filter_qmp(qmsg, _filter)
465
466def filter_nbd_exports(output: str) -> str:
467    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
468
469
470Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
471
472def log(msg: Msg,
473        filters: Iterable[Callable[[Msg], Msg]] = (),
474        indent: Optional[int] = None) -> None:
475    """
476    Logs either a string message or a JSON serializable message (like QMP).
477    If indent is provided, JSON serializable messages are pretty-printed.
478    """
479    for flt in filters:
480        msg = flt(msg)
481    if isinstance(msg, (dict, list)):
482        # Don't sort if it's already sorted
483        do_sort = not isinstance(msg, OrderedDict)
484        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
485    else:
486        test_logger.info(msg)
487
488class Timeout:
489    def __init__(self, seconds, errmsg="Timeout"):
490        self.seconds = seconds
491        self.errmsg = errmsg
492    def __enter__(self):
493        if qemu_gdb or qemu_valgrind:
494            return self
495        signal.signal(signal.SIGALRM, self.timeout)
496        signal.setitimer(signal.ITIMER_REAL, self.seconds)
497        return self
498    def __exit__(self, exc_type, value, traceback):
499        if qemu_gdb or qemu_valgrind:
500            return False
501        signal.setitimer(signal.ITIMER_REAL, 0)
502        return False
503    def timeout(self, signum, frame):
504        raise Exception(self.errmsg)
505
506def file_pattern(name):
507    return "{0}-{1}".format(os.getpid(), name)
508
509class FilePath:
510    """
511    Context manager generating multiple file names. The generated files are
512    removed when exiting the context.
513
514    Example usage:
515
516        with FilePath('a.img', 'b.img') as (img_a, img_b):
517            # Use img_a and img_b here...
518
519        # a.img and b.img are automatically removed here.
520
521    By default images are created in iotests.test_dir. To create sockets use
522    iotests.sock_dir:
523
524       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
525
526    For convenience, calling with one argument yields a single file instead of
527    a tuple with one item.
528
529    """
530    def __init__(self, *names, base_dir=test_dir):
531        self.paths = [os.path.join(base_dir, file_pattern(name))
532                      for name in names]
533
534    def __enter__(self):
535        if len(self.paths) == 1:
536            return self.paths[0]
537        else:
538            return self.paths
539
540    def __exit__(self, exc_type, exc_val, exc_tb):
541        for path in self.paths:
542            try:
543                os.remove(path)
544            except OSError:
545                pass
546        return False
547
548
549def try_remove(img):
550    try:
551        os.remove(img)
552    except OSError:
553        pass
554
555def file_path_remover():
556    for path in reversed(file_path_remover.paths):
557        try_remove(path)
558
559
560def file_path(*names, base_dir=test_dir):
561    ''' Another way to get auto-generated filename that cleans itself up.
562
563    Use is as simple as:
564
565    img_a, img_b = file_path('a.img', 'b.img')
566    sock = file_path('socket')
567    '''
568
569    if not hasattr(file_path_remover, 'paths'):
570        file_path_remover.paths = []
571        atexit.register(file_path_remover)
572
573    paths = []
574    for name in names:
575        filename = file_pattern(name)
576        path = os.path.join(base_dir, filename)
577        file_path_remover.paths.append(path)
578        paths.append(path)
579
580    return paths[0] if len(paths) == 1 else paths
581
582def remote_filename(path):
583    if imgproto == 'file':
584        return path
585    elif imgproto == 'ssh':
586        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
587    else:
588        raise Exception("Protocol %s not supported" % (imgproto))
589
590class VM(qtest.QEMUQtestMachine):
591    '''A QEMU VM'''
592
593    def __init__(self, path_suffix=''):
594        name = "qemu%s-%d" % (path_suffix, os.getpid())
595        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
596        if qemu_gdb and qemu_valgrind:
597            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
598            sys.exit(1)
599        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
600        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
601                         name=name,
602                         base_temp_dir=test_dir,
603                         socket_scm_helper=socket_scm_helper,
604                         sock_dir=sock_dir, qmp_timer=timer)
605        self._num_drives = 0
606
607    def _post_shutdown(self) -> None:
608        super()._post_shutdown()
609        if not qemu_valgrind or not self._popen:
610            return
611        valgrind_filename =  f"{test_dir}/{self._popen.pid}.valgrind"
612        if self.exitcode() == 99:
613            with open(valgrind_filename, encoding='utf-8') as f:
614                print(f.read())
615        else:
616            os.remove(valgrind_filename)
617
618    def _pre_launch(self) -> None:
619        super()._pre_launch()
620        if qemu_print:
621            # set QEMU binary output to stdout
622            self._close_qemu_log_file()
623
624    def add_object(self, opts):
625        self._args.append('-object')
626        self._args.append(opts)
627        return self
628
629    def add_device(self, opts):
630        self._args.append('-device')
631        self._args.append(opts)
632        return self
633
634    def add_drive_raw(self, opts):
635        self._args.append('-drive')
636        self._args.append(opts)
637        return self
638
639    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
640        '''Add a virtio-blk drive to the VM'''
641        options = ['if=%s' % interface,
642                   'id=drive%d' % self._num_drives]
643
644        if path is not None:
645            options.append('file=%s' % path)
646            options.append('format=%s' % img_format)
647            options.append('cache=%s' % cachemode)
648            options.append('aio=%s' % aiomode)
649
650        if opts:
651            options.append(opts)
652
653        if img_format == 'luks' and 'key-secret' not in opts:
654            # default luks support
655            if luks_default_secret_object not in self._args:
656                self.add_object(luks_default_secret_object)
657
658            options.append(luks_default_key_secret_opt)
659
660        self._args.append('-drive')
661        self._args.append(','.join(options))
662        self._num_drives += 1
663        return self
664
665    def add_blockdev(self, opts):
666        self._args.append('-blockdev')
667        if isinstance(opts, str):
668            self._args.append(opts)
669        else:
670            self._args.append(','.join(opts))
671        return self
672
673    def add_incoming(self, addr):
674        self._args.append('-incoming')
675        self._args.append(addr)
676        return self
677
678    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
679        cmd = 'human-monitor-command'
680        kwargs: Dict[str, Any] = {'command-line': command_line}
681        if use_log:
682            return self.qmp_log(cmd, **kwargs)
683        else:
684            return self.qmp(cmd, **kwargs)
685
686    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
687        """Pause drive r/w operations"""
688        if not event:
689            self.pause_drive(drive, "read_aio")
690            self.pause_drive(drive, "write_aio")
691            return
692        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
693
694    def resume_drive(self, drive: str) -> None:
695        """Resume drive r/w operations"""
696        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
697
698    def hmp_qemu_io(self, drive: str, cmd: str,
699                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
700        """Write to a given drive using an HMP command"""
701        d = '-d ' if qdev else ''
702        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
703
704    def flatten_qmp_object(self, obj, output=None, basestr=''):
705        if output is None:
706            output = {}
707        if isinstance(obj, list):
708            for i, item in enumerate(obj):
709                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
710        elif isinstance(obj, dict):
711            for key in obj:
712                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
713        else:
714            output[basestr[:-1]] = obj # Strip trailing '.'
715        return output
716
717    def qmp_to_opts(self, obj):
718        obj = self.flatten_qmp_object(obj)
719        output_list = []
720        for key in obj:
721            output_list += [key + '=' + obj[key]]
722        return ','.join(output_list)
723
724    def get_qmp_events_filtered(self, wait=60.0):
725        result = []
726        for ev in self.get_qmp_events(wait=wait):
727            result.append(filter_qmp_event(ev))
728        return result
729
730    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
731        full_cmd = OrderedDict((
732            ("execute", cmd),
733            ("arguments", ordered_qmp(kwargs))
734        ))
735        log(full_cmd, filters, indent=indent)
736        result = self.qmp(cmd, **kwargs)
737        log(result, filters, indent=indent)
738        return result
739
740    # Returns None on success, and an error string on failure
741    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
742                pre_finalize=None, cancel=False, wait=60.0):
743        """
744        run_job moves a job from creation through to dismissal.
745
746        :param job: String. ID of recently-launched job
747        :param auto_finalize: Bool. True if the job was launched with
748                              auto_finalize. Defaults to True.
749        :param auto_dismiss: Bool. True if the job was launched with
750                             auto_dismiss=True. Defaults to False.
751        :param pre_finalize: Callback. A callable that takes no arguments to be
752                             invoked prior to issuing job-finalize, if any.
753        :param cancel: Bool. When true, cancels the job after the pre_finalize
754                       callback.
755        :param wait: Float. Timeout value specifying how long to wait for any
756                     event, in seconds. Defaults to 60.0.
757        """
758        match_device = {'data': {'device': job}}
759        match_id = {'data': {'id': job}}
760        events = [
761            ('BLOCK_JOB_COMPLETED', match_device),
762            ('BLOCK_JOB_CANCELLED', match_device),
763            ('BLOCK_JOB_ERROR', match_device),
764            ('BLOCK_JOB_READY', match_device),
765            ('BLOCK_JOB_PENDING', match_id),
766            ('JOB_STATUS_CHANGE', match_id)
767        ]
768        error = None
769        while True:
770            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
771            if ev['event'] != 'JOB_STATUS_CHANGE':
772                log(ev)
773                continue
774            status = ev['data']['status']
775            if status == 'aborting':
776                result = self.qmp('query-jobs')
777                for j in result['return']:
778                    if j['id'] == job:
779                        error = j['error']
780                        log('Job failed: %s' % (j['error']))
781            elif status == 'ready':
782                self.qmp_log('job-complete', id=job)
783            elif status == 'pending' and not auto_finalize:
784                if pre_finalize:
785                    pre_finalize()
786                if cancel:
787                    self.qmp_log('job-cancel', id=job)
788                else:
789                    self.qmp_log('job-finalize', id=job)
790            elif status == 'concluded' and not auto_dismiss:
791                self.qmp_log('job-dismiss', id=job)
792            elif status == 'null':
793                return error
794
795    # Returns None on success, and an error string on failure
796    def blockdev_create(self, options, job_id='job0', filters=None):
797        if filters is None:
798            filters = [filter_qmp_testfiles]
799        result = self.qmp_log('blockdev-create', filters=filters,
800                              job_id=job_id, options=options)
801
802        if 'return' in result:
803            assert result['return'] == {}
804            job_result = self.run_job(job_id)
805        else:
806            job_result = result['error']
807
808        log("")
809        return job_result
810
811    def enable_migration_events(self, name):
812        log('Enabling migration QMP events on %s...' % name)
813        log(self.qmp('migrate-set-capabilities', capabilities=[
814            {
815                'capability': 'events',
816                'state': True
817            }
818        ]))
819
820    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
821        while True:
822            event = self.event_wait('MIGRATION')
823            # We use the default timeout, and with a timeout, event_wait()
824            # never returns None
825            assert event
826
827            log(event, filters=[filter_qmp_event])
828            if event['data']['status'] in ('completed', 'failed'):
829                break
830
831        if event['data']['status'] == 'completed':
832            # The event may occur in finish-migrate, so wait for the expected
833            # post-migration runstate
834            runstate = None
835            while runstate != expect_runstate:
836                runstate = self.qmp('query-status')['return']['status']
837            return True
838        else:
839            return False
840
841    def node_info(self, node_name):
842        nodes = self.qmp('query-named-block-nodes')
843        for x in nodes['return']:
844            if x['node-name'] == node_name:
845                return x
846        return None
847
848    def query_bitmaps(self):
849        res = self.qmp("query-named-block-nodes")
850        return {device['node-name']: device['dirty-bitmaps']
851                for device in res['return'] if 'dirty-bitmaps' in device}
852
853    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
854        """
855        get a specific bitmap from the object returned by query_bitmaps.
856        :param recording: If specified, filter results by the specified value.
857        :param bitmaps: If specified, use it instead of call query_bitmaps()
858        """
859        if bitmaps is None:
860            bitmaps = self.query_bitmaps()
861
862        for bitmap in bitmaps[node_name]:
863            if bitmap.get('name', '') == bitmap_name:
864                if recording is None or bitmap.get('recording') == recording:
865                    return bitmap
866        return None
867
868    def check_bitmap_status(self, node_name, bitmap_name, fields):
869        ret = self.get_bitmap(node_name, bitmap_name)
870
871        return fields.items() <= ret.items()
872
873    def assert_block_path(self, root, path, expected_node, graph=None):
874        """
875        Check whether the node under the given path in the block graph
876        is @expected_node.
877
878        @root is the node name of the node where the @path is rooted.
879
880        @path is a string that consists of child names separated by
881        slashes.  It must begin with a slash.
882
883        Examples for @root + @path:
884          - root="qcow2-node", path="/backing/file"
885          - root="quorum-node", path="/children.2/file"
886
887        Hypothetically, @path could be empty, in which case it would
888        point to @root.  However, in practice this case is not useful
889        and hence not allowed.
890
891        @expected_node may be None.  (All elements of the path but the
892        leaf must still exist.)
893
894        @graph may be None or the result of an x-debug-query-block-graph
895        call that has already been performed.
896        """
897        if graph is None:
898            graph = self.qmp('x-debug-query-block-graph')['return']
899
900        iter_path = iter(path.split('/'))
901
902        # Must start with a /
903        assert next(iter_path) == ''
904
905        node = next((node for node in graph['nodes'] if node['name'] == root),
906                    None)
907
908        # An empty @path is not allowed, so the root node must be present
909        assert node is not None, 'Root node %s not found' % root
910
911        for child_name in iter_path:
912            assert node is not None, 'Cannot follow path %s%s' % (root, path)
913
914            try:
915                node_id = next(edge['child'] for edge in graph['edges']
916                               if (edge['parent'] == node['id'] and
917                                   edge['name'] == child_name))
918
919                node = next(node for node in graph['nodes']
920                            if node['id'] == node_id)
921
922            except StopIteration:
923                node = None
924
925        if node is None:
926            assert expected_node is None, \
927                   'No node found under %s (but expected %s)' % \
928                   (path, expected_node)
929        else:
930            assert node['name'] == expected_node, \
931                   'Found node %s under %s (but expected %s)' % \
932                   (node['name'], path, expected_node)
933
934index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
935
936class QMPTestCase(unittest.TestCase):
937    '''Abstract base class for QMP test cases'''
938
939    def __init__(self, *args, **kwargs):
940        super().__init__(*args, **kwargs)
941        # Many users of this class set a VM property we rely on heavily
942        # in the methods below.
943        self.vm = None
944
945    def dictpath(self, d, path):
946        '''Traverse a path in a nested dict'''
947        for component in path.split('/'):
948            m = index_re.match(component)
949            if m:
950                component, idx = m.groups()
951                idx = int(idx)
952
953            if not isinstance(d, dict) or component not in d:
954                self.fail(f'failed path traversal for "{path}" in "{d}"')
955            d = d[component]
956
957            if m:
958                if not isinstance(d, list):
959                    self.fail(f'path component "{component}" in "{path}" '
960                              f'is not a list in "{d}"')
961                try:
962                    d = d[idx]
963                except IndexError:
964                    self.fail(f'invalid index "{idx}" in path "{path}" '
965                              f'in "{d}"')
966        return d
967
968    def assert_qmp_absent(self, d, path):
969        try:
970            result = self.dictpath(d, path)
971        except AssertionError:
972            return
973        self.fail('path "%s" has value "%s"' % (path, str(result)))
974
975    def assert_qmp(self, d, path, value):
976        '''Assert that the value for a specific path in a QMP dict
977           matches.  When given a list of values, assert that any of
978           them matches.'''
979
980        result = self.dictpath(d, path)
981
982        # [] makes no sense as a list of valid values, so treat it as
983        # an actual single value.
984        if isinstance(value, list) and value != []:
985            for v in value:
986                if result == v:
987                    return
988            self.fail('no match for "%s" in %s' % (str(result), str(value)))
989        else:
990            self.assertEqual(result, value,
991                             '"%s" is "%s", expected "%s"'
992                             % (path, str(result), str(value)))
993
994    def assert_no_active_block_jobs(self):
995        result = self.vm.qmp('query-block-jobs')
996        self.assert_qmp(result, 'return', [])
997
998    def assert_has_block_node(self, node_name=None, file_name=None):
999        """Issue a query-named-block-nodes and assert node_name and/or
1000        file_name is present in the result"""
1001        def check_equal_or_none(a, b):
1002            return a is None or b is None or a == b
1003        assert node_name or file_name
1004        result = self.vm.qmp('query-named-block-nodes')
1005        for x in result["return"]:
1006            if check_equal_or_none(x.get("node-name"), node_name) and \
1007                    check_equal_or_none(x.get("file"), file_name):
1008                return
1009        self.fail("Cannot find %s %s in result:\n%s" %
1010                  (node_name, file_name, result))
1011
1012    def assert_json_filename_equal(self, json_filename, reference):
1013        '''Asserts that the given filename is a json: filename and that its
1014           content is equal to the given reference object'''
1015        self.assertEqual(json_filename[:5], 'json:')
1016        self.assertEqual(
1017            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1018            self.vm.flatten_qmp_object(reference)
1019        )
1020
1021    def cancel_and_wait(self, drive='drive0', force=False,
1022                        resume=False, wait=60.0):
1023        '''Cancel a block job and wait for it to finish, returning the event'''
1024        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1025        self.assert_qmp(result, 'return', {})
1026
1027        if resume:
1028            self.vm.resume_drive(drive)
1029
1030        cancelled = False
1031        result = None
1032        while not cancelled:
1033            for event in self.vm.get_qmp_events(wait=wait):
1034                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1035                   event['event'] == 'BLOCK_JOB_CANCELLED':
1036                    self.assert_qmp(event, 'data/device', drive)
1037                    result = event
1038                    cancelled = True
1039                elif event['event'] == 'JOB_STATUS_CHANGE':
1040                    self.assert_qmp(event, 'data/id', drive)
1041
1042
1043        self.assert_no_active_block_jobs()
1044        return result
1045
1046    def wait_until_completed(self, drive='drive0', check_offset=True,
1047                             wait=60.0, error=None):
1048        '''Wait for a block job to finish, returning the event'''
1049        while True:
1050            for event in self.vm.get_qmp_events(wait=wait):
1051                if event['event'] == 'BLOCK_JOB_COMPLETED':
1052                    self.assert_qmp(event, 'data/device', drive)
1053                    if error is None:
1054                        self.assert_qmp_absent(event, 'data/error')
1055                        if check_offset:
1056                            self.assert_qmp(event, 'data/offset',
1057                                            event['data']['len'])
1058                    else:
1059                        self.assert_qmp(event, 'data/error', error)
1060                    self.assert_no_active_block_jobs()
1061                    return event
1062                if event['event'] == 'JOB_STATUS_CHANGE':
1063                    self.assert_qmp(event, 'data/id', drive)
1064
1065    def wait_ready(self, drive='drive0'):
1066        """Wait until a BLOCK_JOB_READY event, and return the event."""
1067        return self.vm.events_wait([
1068            ('BLOCK_JOB_READY',
1069             {'data': {'type': 'mirror', 'device': drive}}),
1070            ('BLOCK_JOB_READY',
1071             {'data': {'type': 'commit', 'device': drive}})
1072        ])
1073
1074    def wait_ready_and_cancel(self, drive='drive0'):
1075        self.wait_ready(drive=drive)
1076        event = self.cancel_and_wait(drive=drive)
1077        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1078        self.assert_qmp(event, 'data/type', 'mirror')
1079        self.assert_qmp(event, 'data/offset', event['data']['len'])
1080
1081    def complete_and_wait(self, drive='drive0', wait_ready=True,
1082                          completion_error=None):
1083        '''Complete a block job and wait for it to finish'''
1084        if wait_ready:
1085            self.wait_ready(drive=drive)
1086
1087        result = self.vm.qmp('block-job-complete', device=drive)
1088        self.assert_qmp(result, 'return', {})
1089
1090        event = self.wait_until_completed(drive=drive, error=completion_error)
1091        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1092
1093    def pause_wait(self, job_id='job0'):
1094        with Timeout(3, "Timeout waiting for job to pause"):
1095            while True:
1096                result = self.vm.qmp('query-block-jobs')
1097                found = False
1098                for job in result['return']:
1099                    if job['device'] == job_id:
1100                        found = True
1101                        if job['paused'] and not job['busy']:
1102                            return job
1103                        break
1104                assert found
1105
1106    def pause_job(self, job_id='job0', wait=True):
1107        result = self.vm.qmp('block-job-pause', device=job_id)
1108        self.assert_qmp(result, 'return', {})
1109        if wait:
1110            return self.pause_wait(job_id)
1111        return result
1112
1113    def case_skip(self, reason):
1114        '''Skip this test case'''
1115        case_notrun(reason)
1116        self.skipTest(reason)
1117
1118
1119def notrun(reason):
1120    '''Skip this test suite'''
1121    # Each test in qemu-iotests has a number ("seq")
1122    seq = os.path.basename(sys.argv[0])
1123
1124    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1125            as outfile:
1126        outfile.write(reason + '\n')
1127    logger.warning("%s not run: %s", seq, reason)
1128    sys.exit(0)
1129
1130def case_notrun(reason):
1131    '''Mark this test case as not having been run (without actually
1132    skipping it, that is left to the caller).  See
1133    QMPTestCase.case_skip() for a variant that actually skips the
1134    current test case.'''
1135
1136    # Each test in qemu-iotests has a number ("seq")
1137    seq = os.path.basename(sys.argv[0])
1138
1139    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1140            as outfile:
1141        outfile.write('    [case not run] ' + reason + '\n')
1142
1143def _verify_image_format(supported_fmts: Sequence[str] = (),
1144                         unsupported_fmts: Sequence[str] = ()) -> None:
1145    if 'generic' in supported_fmts and \
1146            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1147        # similar to
1148        #   _supported_fmt generic
1149        # for bash tests
1150        supported_fmts = ()
1151
1152    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1153    if not_sup or (imgfmt in unsupported_fmts):
1154        notrun('not suitable for this image format: %s' % imgfmt)
1155
1156    if imgfmt == 'luks':
1157        verify_working_luks()
1158
1159def _verify_protocol(supported: Sequence[str] = (),
1160                     unsupported: Sequence[str] = ()) -> None:
1161    assert not (supported and unsupported)
1162
1163    if 'generic' in supported:
1164        return
1165
1166    not_sup = supported and (imgproto not in supported)
1167    if not_sup or (imgproto in unsupported):
1168        notrun('not suitable for this protocol: %s' % imgproto)
1169
1170def _verify_platform(supported: Sequence[str] = (),
1171                     unsupported: Sequence[str] = ()) -> None:
1172    if any((sys.platform.startswith(x) for x in unsupported)):
1173        notrun('not suitable for this OS: %s' % sys.platform)
1174
1175    if supported:
1176        if not any((sys.platform.startswith(x) for x in supported)):
1177            notrun('not suitable for this OS: %s' % sys.platform)
1178
1179def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1180    if supported_cache_modes and (cachemode not in supported_cache_modes):
1181        notrun('not suitable for this cache mode: %s' % cachemode)
1182
1183def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1184    if supported_aio_modes and (aiomode not in supported_aio_modes):
1185        notrun('not suitable for this aio mode: %s' % aiomode)
1186
1187def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1188    usf_list = list(set(required_formats) - set(supported_formats()))
1189    if usf_list:
1190        notrun(f'formats {usf_list} are not whitelisted')
1191
1192
1193def _verify_virtio_blk() -> None:
1194    out = qemu_pipe('-M', 'none', '-device', 'help')
1195    if 'virtio-blk' not in out:
1196        notrun('Missing virtio-blk in QEMU binary')
1197
1198def _verify_virtio_scsi_pci_or_ccw() -> None:
1199    out = qemu_pipe('-M', 'none', '-device', 'help')
1200    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1201        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1202
1203
1204def supports_quorum():
1205    return 'quorum' in qemu_img_pipe('--help')
1206
1207def verify_quorum():
1208    '''Skip test suite if quorum support is not available'''
1209    if not supports_quorum():
1210        notrun('quorum support missing')
1211
1212def has_working_luks() -> Tuple[bool, str]:
1213    """
1214    Check whether our LUKS driver can actually create images
1215    (this extends to LUKS encryption for qcow2).
1216
1217    If not, return the reason why.
1218    """
1219
1220    img_file = f'{test_dir}/luks-test.luks'
1221    (output, status) = \
1222        qemu_img_pipe_and_status('create', '-f', 'luks',
1223                                 '--object', luks_default_secret_object,
1224                                 '-o', luks_default_key_secret_opt,
1225                                 '-o', 'iter-time=10',
1226                                 img_file, '1G')
1227    try:
1228        os.remove(img_file)
1229    except OSError:
1230        pass
1231
1232    if status != 0:
1233        reason = output
1234        for line in output.splitlines():
1235            if img_file + ':' in line:
1236                reason = line.split(img_file + ':', 1)[1].strip()
1237                break
1238
1239        return (False, reason)
1240    else:
1241        return (True, '')
1242
1243def verify_working_luks():
1244    """
1245    Skip test suite if LUKS does not work
1246    """
1247    (working, reason) = has_working_luks()
1248    if not working:
1249        notrun(reason)
1250
1251def qemu_pipe(*args: str) -> str:
1252    """
1253    Run qemu with an option to print something and exit (e.g. a help option).
1254
1255    :return: QEMU's stdout output.
1256    """
1257    full_args = [qemu_prog] + qemu_opts + list(args)
1258    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1259    return output
1260
1261def supported_formats(read_only=False):
1262    '''Set 'read_only' to True to check ro-whitelist
1263       Otherwise, rw-whitelist is checked'''
1264
1265    if not hasattr(supported_formats, "formats"):
1266        supported_formats.formats = {}
1267
1268    if read_only not in supported_formats.formats:
1269        format_message = qemu_pipe("-drive", "format=help")
1270        line = 1 if read_only else 0
1271        supported_formats.formats[read_only] = \
1272            format_message.splitlines()[line].split(":")[1].split()
1273
1274    return supported_formats.formats[read_only]
1275
1276def skip_if_unsupported(required_formats=(), read_only=False):
1277    '''Skip Test Decorator
1278       Runs the test if all the required formats are whitelisted'''
1279    def skip_test_decorator(func):
1280        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1281                         **kwargs: Dict[str, Any]) -> None:
1282            if callable(required_formats):
1283                fmts = required_formats(test_case)
1284            else:
1285                fmts = required_formats
1286
1287            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1288            if usf_list:
1289                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1290                test_case.case_skip(msg)
1291            else:
1292                func(test_case, *args, **kwargs)
1293        return func_wrapper
1294    return skip_test_decorator
1295
1296def skip_for_formats(formats: Sequence[str] = ()) \
1297    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1298                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1299    '''Skip Test Decorator
1300       Skips the test for the given formats'''
1301    def skip_test_decorator(func):
1302        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1303                         **kwargs: Dict[str, Any]) -> None:
1304            if imgfmt in formats:
1305                msg = f'{test_case}: Skipped for format {imgfmt}'
1306                test_case.case_skip(msg)
1307            else:
1308                func(test_case, *args, **kwargs)
1309        return func_wrapper
1310    return skip_test_decorator
1311
1312def skip_if_user_is_root(func):
1313    '''Skip Test Decorator
1314       Runs the test only without root permissions'''
1315    def func_wrapper(*args, **kwargs):
1316        if os.getuid() == 0:
1317            case_notrun('{}: cannot be run as root'.format(args[0]))
1318            return None
1319        else:
1320            return func(*args, **kwargs)
1321    return func_wrapper
1322
1323# We need to filter out the time taken from the output so that
1324# qemu-iotest can reliably diff the results against master output,
1325# and hide skipped tests from the reference output.
1326
1327class ReproducibleTestResult(unittest.TextTestResult):
1328    def addSkip(self, test, reason):
1329        # Same as TextTestResult, but print dot instead of "s"
1330        unittest.TestResult.addSkip(self, test, reason)
1331        if self.showAll:
1332            self.stream.writeln("skipped {0!r}".format(reason))
1333        elif self.dots:
1334            self.stream.write(".")
1335            self.stream.flush()
1336
1337class ReproducibleStreamWrapper:
1338    def __init__(self, stream: TextIO):
1339        self.stream = stream
1340
1341    def __getattr__(self, attr):
1342        if attr in ('stream', '__getstate__'):
1343            raise AttributeError(attr)
1344        return getattr(self.stream, attr)
1345
1346    def write(self, arg=None):
1347        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1348        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1349        self.stream.write(arg)
1350
1351class ReproducibleTestRunner(unittest.TextTestRunner):
1352    def __init__(self, stream: Optional[TextIO] = None,
1353             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1354             **kwargs: Any) -> None:
1355        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1356        super().__init__(stream=rstream,           # type: ignore
1357                         descriptions=True,
1358                         resultclass=resultclass,
1359                         **kwargs)
1360
1361def execute_unittest(argv: List[str], debug: bool = False) -> None:
1362    """Executes unittests within the calling module."""
1363
1364    # Some tests have warnings, especially ResourceWarnings for unclosed
1365    # files and sockets.  Ignore them for now to ensure reproducibility of
1366    # the test output.
1367    unittest.main(argv=argv,
1368                  testRunner=ReproducibleTestRunner,
1369                  verbosity=2 if debug else 1,
1370                  warnings=None if sys.warnoptions else 'ignore')
1371
1372def execute_setup_common(supported_fmts: Sequence[str] = (),
1373                         supported_platforms: Sequence[str] = (),
1374                         supported_cache_modes: Sequence[str] = (),
1375                         supported_aio_modes: Sequence[str] = (),
1376                         unsupported_fmts: Sequence[str] = (),
1377                         supported_protocols: Sequence[str] = (),
1378                         unsupported_protocols: Sequence[str] = (),
1379                         required_fmts: Sequence[str] = ()) -> bool:
1380    """
1381    Perform necessary setup for either script-style or unittest-style tests.
1382
1383    :return: Bool; Whether or not debug mode has been requested via the CLI.
1384    """
1385    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1386
1387    debug = '-d' in sys.argv
1388    if debug:
1389        sys.argv.remove('-d')
1390    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1391
1392    _verify_image_format(supported_fmts, unsupported_fmts)
1393    _verify_protocol(supported_protocols, unsupported_protocols)
1394    _verify_platform(supported=supported_platforms)
1395    _verify_cache_mode(supported_cache_modes)
1396    _verify_aio_mode(supported_aio_modes)
1397    _verify_formats(required_fmts)
1398    _verify_virtio_blk()
1399
1400    return debug
1401
1402def execute_test(*args, test_function=None, **kwargs):
1403    """Run either unittest or script-style tests."""
1404
1405    debug = execute_setup_common(*args, **kwargs)
1406    if not test_function:
1407        execute_unittest(sys.argv, debug)
1408    else:
1409        test_function()
1410
1411def activate_logging():
1412    """Activate iotests.log() output to stdout for script-style tests."""
1413    handler = logging.StreamHandler(stream=sys.stdout)
1414    formatter = logging.Formatter('%(message)s')
1415    handler.setFormatter(formatter)
1416    test_logger.addHandler(handler)
1417    test_logger.setLevel(logging.INFO)
1418    test_logger.propagate = False
1419
1420# This is called from script-style iotests without a single point of entry
1421def script_initialize(*args, **kwargs):
1422    """Initialize script-style tests without running any tests."""
1423    activate_logging()
1424    execute_setup_common(*args, **kwargs)
1425
1426# This is called from script-style iotests with a single point of entry
1427def script_main(test_function, *args, **kwargs):
1428    """Run script-style tests outside of the unittest framework"""
1429    activate_logging()
1430    execute_test(*args, test_function=test_function, **kwargs)
1431
1432# This is called from unittest style iotests
1433def main(*args, **kwargs):
1434    """Run tests using the unittest framework"""
1435    execute_test(*args, **kwargs)
1436