xref: /qemu/tests/qemu-iotests/iotests.py (revision b49f4755)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd_raw(cmd, args)
464
465    def get_qmp(self) -> QEMUMonitorProtocol:
466        assert self._qmp is not None
467        return self._qmp
468
469    def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \
470            -> QMPReturnValue:
471        assert self._qmp is not None
472        return self._qmp.cmd(cmd, **(args or {}))
473
474    def stop(self, kill_signal=15):
475        self._p.send_signal(kill_signal)
476        self._p.wait()
477        self._p = None
478
479        if self._qmp:
480            self._qmp.close()
481
482        if self._qmpsock is not None:
483            try:
484                os.remove(self._qmpsock)
485            except OSError:
486                pass
487        try:
488            os.remove(self.pidfile)
489        except OSError:
490            pass
491
492    def __del__(self):
493        if self._p is not None:
494            self.stop(kill_signal=9)
495
496
497def qemu_nbd(*args):
498    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
500
501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502    '''Run qemu-nbd in daemon mode and return both the parent's exit code
503       and its output in case of an error'''
504    full_args = qemu_nbd_args + ['--fork'] + list(args)
505    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
506                                                   connect_stderr=False)
507    return returncode, output if returncode else ''
508
509def qemu_nbd_list_log(*args: str) -> str:
510    '''Run qemu-nbd to list remote exports'''
511    full_args = [qemu_nbd_prog, '-L'] + list(args)
512    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
513    log(output, filters=[filter_testfiles, filter_nbd_exports])
514    return output
515
516@contextmanager
517def qemu_nbd_popen(*args):
518    '''Context manager running qemu-nbd within the context'''
519    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
520
521    assert not os.path.exists(pid_file)
522
523    cmd = list(qemu_nbd_args)
524    cmd.extend(('--persistent', '--pid-file', pid_file))
525    cmd.extend(args)
526
527    log('Start NBD server')
528    with subprocess.Popen(cmd) as p:
529        try:
530            while not os.path.exists(pid_file):
531                if p.poll() is not None:
532                    raise RuntimeError(
533                        "qemu-nbd terminated with exit code {}: {}"
534                        .format(p.returncode, ' '.join(cmd)))
535
536                time.sleep(0.01)
537            yield
538        finally:
539            if os.path.exists(pid_file):
540                os.remove(pid_file)
541            log('Kill NBD server')
542            p.kill()
543            p.wait()
544
545def compare_images(img1: str, img2: str,
546                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
547    """
548    Compare two images with QEMU_IMG; return True if they are identical.
549
550    :raise CalledProcessError:
551        when qemu-img crashes or returns a status code of anything other
552        than 0 (identical) or 1 (different).
553    """
554    try:
555        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
556        return True
557    except subprocess.CalledProcessError as exc:
558        if exc.returncode == 1:
559            return False
560        raise
561
562def create_image(name, size):
563    '''Create a fully-allocated raw image with sector markers'''
564    with open(name, 'wb') as file:
565        i = 0
566        while i < size:
567            sector = struct.pack('>l504xl', i // 512, i // 512)
568            file.write(sector)
569            i = i + 512
570
571def image_size(img: str) -> int:
572    """Return image's virtual size"""
573    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
574    if not isinstance(value, int):
575        type_name = type(value).__name__
576        raise TypeError("Expected 'int' for 'virtual-size', "
577                        f"got '{value}' of type '{type_name}'")
578    return value
579
580def is_str(val):
581    return isinstance(val, str)
582
583test_dir_re = re.compile(r"%s" % test_dir)
584def filter_test_dir(msg):
585    return test_dir_re.sub("TEST_DIR", msg)
586
587win32_re = re.compile(r"\r")
588def filter_win32(msg):
589    return win32_re.sub("", msg)
590
591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593                        r"and [0-9\/.inf]* ops\/sec\)")
594def filter_qemu_io(msg):
595    msg = filter_win32(msg)
596    return qemu_io_re.sub("X ops; XX:XX:XX.X "
597                          "(XXX YYY/sec and XXX ops/sec)", msg)
598
599chown_re = re.compile(r"chown [0-9]+:[0-9]+")
600def filter_chown(msg):
601    return chown_re.sub("chown UID:GID", msg)
602
603def filter_qmp_event(event):
604    '''Filter a QMP event dict'''
605    event = dict(event)
606    if 'timestamp' in event:
607        event['timestamp']['seconds'] = 'SECS'
608        event['timestamp']['microseconds'] = 'USECS'
609    return event
610
611def filter_qmp(qmsg, filter_fn):
612    '''Given a string filter, filter a QMP object's values.
613    filter_fn takes a (key, value) pair.'''
614    # Iterate through either lists or dicts;
615    if isinstance(qmsg, list):
616        items = enumerate(qmsg)
617    elif isinstance(qmsg, dict):
618        items = qmsg.items()
619    else:
620        return filter_fn(None, qmsg)
621
622    for k, v in items:
623        if isinstance(v, (dict, list)):
624            qmsg[k] = filter_qmp(v, filter_fn)
625        else:
626            qmsg[k] = filter_fn(k, v)
627    return qmsg
628
629def filter_testfiles(msg):
630    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
631    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
632    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
633
634def filter_qmp_testfiles(qmsg):
635    def _filter(_key, value):
636        if is_str(value):
637            return filter_testfiles(value)
638        return value
639    return filter_qmp(qmsg, _filter)
640
641def filter_virtio_scsi(output: str) -> str:
642    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
643
644def filter_qmp_virtio_scsi(qmsg):
645    def _filter(_key, value):
646        if is_str(value):
647            return filter_virtio_scsi(value)
648        return value
649    return filter_qmp(qmsg, _filter)
650
651def filter_generated_node_ids(msg):
652    return re.sub("#block[0-9]+", "NODE_NAME", msg)
653
654def filter_img_info(output: str, filename: str,
655                    drop_child_info: bool = True) -> str:
656    lines = []
657    drop_indented = False
658    for line in output.split('\n'):
659        if 'disk size' in line or 'actual-size' in line:
660            continue
661
662        # Drop child node info
663        if drop_indented:
664            if line.startswith(' '):
665                continue
666            drop_indented = False
667        if drop_child_info and "Child node '/" in line:
668            drop_indented = True
669            continue
670
671        line = line.replace(filename, 'TEST_IMG')
672        line = filter_testfiles(line)
673        line = line.replace(imgfmt, 'IMGFMT')
674        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
675        line = re.sub('uuid: [-a-f0-9]+',
676                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
677                      line)
678        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
679        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
680                      line)
681        lines.append(line)
682    return '\n'.join(lines)
683
684def filter_imgfmt(msg):
685    return msg.replace(imgfmt, 'IMGFMT')
686
687def filter_qmp_imgfmt(qmsg):
688    def _filter(_key, value):
689        if is_str(value):
690            return filter_imgfmt(value)
691        return value
692    return filter_qmp(qmsg, _filter)
693
694def filter_nbd_exports(output: str) -> str:
695    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
696
697
698Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
699
700def log(msg: Msg,
701        filters: Iterable[Callable[[Msg], Msg]] = (),
702        indent: Optional[int] = None) -> None:
703    """
704    Logs either a string message or a JSON serializable message (like QMP).
705    If indent is provided, JSON serializable messages are pretty-printed.
706    """
707    for flt in filters:
708        msg = flt(msg)
709    if isinstance(msg, (dict, list)):
710        # Don't sort if it's already sorted
711        do_sort = not isinstance(msg, OrderedDict)
712        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
713    else:
714        test_logger.info(msg)
715
716class Timeout:
717    def __init__(self, seconds, errmsg="Timeout"):
718        self.seconds = seconds
719        self.errmsg = errmsg
720    def __enter__(self):
721        if qemu_gdb or qemu_valgrind:
722            return self
723        signal.signal(signal.SIGALRM, self.timeout)
724        signal.setitimer(signal.ITIMER_REAL, self.seconds)
725        return self
726    def __exit__(self, exc_type, value, traceback):
727        if qemu_gdb or qemu_valgrind:
728            return False
729        signal.setitimer(signal.ITIMER_REAL, 0)
730        return False
731    def timeout(self, signum, frame):
732        raise TimeoutError(self.errmsg)
733
734def file_pattern(name):
735    return "{0}-{1}".format(os.getpid(), name)
736
737class FilePath:
738    """
739    Context manager generating multiple file names. The generated files are
740    removed when exiting the context.
741
742    Example usage:
743
744        with FilePath('a.img', 'b.img') as (img_a, img_b):
745            # Use img_a and img_b here...
746
747        # a.img and b.img are automatically removed here.
748
749    By default images are created in iotests.test_dir. To create sockets use
750    iotests.sock_dir:
751
752       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
753
754    For convenience, calling with one argument yields a single file instead of
755    a tuple with one item.
756
757    """
758    def __init__(self, *names, base_dir=test_dir):
759        self.paths = [os.path.join(base_dir, file_pattern(name))
760                      for name in names]
761
762    def __enter__(self):
763        if len(self.paths) == 1:
764            return self.paths[0]
765        else:
766            return self.paths
767
768    def __exit__(self, exc_type, exc_val, exc_tb):
769        for path in self.paths:
770            try:
771                os.remove(path)
772            except OSError:
773                pass
774        return False
775
776
777def try_remove(img):
778    try:
779        os.remove(img)
780    except OSError:
781        pass
782
783def file_path_remover():
784    for path in reversed(file_path_remover.paths):
785        try_remove(path)
786
787
788def file_path(*names, base_dir=test_dir):
789    ''' Another way to get auto-generated filename that cleans itself up.
790
791    Use is as simple as:
792
793    img_a, img_b = file_path('a.img', 'b.img')
794    sock = file_path('socket')
795    '''
796
797    if not hasattr(file_path_remover, 'paths'):
798        file_path_remover.paths = []
799        atexit.register(file_path_remover)
800
801    paths = []
802    for name in names:
803        filename = file_pattern(name)
804        path = os.path.join(base_dir, filename)
805        file_path_remover.paths.append(path)
806        paths.append(path)
807
808    return paths[0] if len(paths) == 1 else paths
809
810def remote_filename(path):
811    if imgproto == 'file':
812        return path
813    elif imgproto == 'ssh':
814        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
815    else:
816        raise ValueError("Protocol %s not supported" % (imgproto))
817
818class VM(qtest.QEMUQtestMachine):
819    '''A QEMU VM'''
820
821    def __init__(self, path_suffix=''):
822        name = "qemu%s-%d" % (path_suffix, os.getpid())
823        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
824        if qemu_gdb and qemu_valgrind:
825            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
826            sys.exit(1)
827        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
828        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
829                         name=name,
830                         base_temp_dir=test_dir,
831                         qmp_timer=timer)
832        self._num_drives = 0
833
834    def _post_shutdown(self) -> None:
835        super()._post_shutdown()
836        if not qemu_valgrind or not self._popen:
837            return
838        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
839        if self.exitcode() == 99:
840            with open(valgrind_filename, encoding='utf-8') as f:
841                print(f.read())
842        else:
843            os.remove(valgrind_filename)
844
845    def _pre_launch(self) -> None:
846        super()._pre_launch()
847        if qemu_print:
848            # set QEMU binary output to stdout
849            self._close_qemu_log_file()
850
851    def add_object(self, opts):
852        self._args.append('-object')
853        self._args.append(opts)
854        return self
855
856    def add_device(self, opts):
857        self._args.append('-device')
858        self._args.append(opts)
859        return self
860
861    def add_drive_raw(self, opts):
862        self._args.append('-drive')
863        self._args.append(opts)
864        return self
865
866    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
867        '''Add a virtio-blk drive to the VM'''
868        options = ['if=%s' % interface,
869                   'id=drive%d' % self._num_drives]
870
871        if path is not None:
872            options.append('file=%s' % path)
873            options.append('format=%s' % img_format)
874            options.append('cache=%s' % cachemode)
875            options.append('aio=%s' % aiomode)
876
877        if opts:
878            options.append(opts)
879
880        if img_format == 'luks' and 'key-secret' not in opts:
881            # default luks support
882            if luks_default_secret_object not in self._args:
883                self.add_object(luks_default_secret_object)
884
885            options.append(luks_default_key_secret_opt)
886
887        self._args.append('-drive')
888        self._args.append(','.join(options))
889        self._num_drives += 1
890        return self
891
892    def add_blockdev(self, opts):
893        self._args.append('-blockdev')
894        if isinstance(opts, str):
895            self._args.append(opts)
896        else:
897            self._args.append(','.join(opts))
898        return self
899
900    def add_incoming(self, addr):
901        self._args.append('-incoming')
902        self._args.append(addr)
903        return self
904
905    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
906        cmd = 'human-monitor-command'
907        kwargs: Dict[str, Any] = {'command-line': command_line}
908        if use_log:
909            return self.qmp_log(cmd, **kwargs)
910        else:
911            return self.qmp(cmd, **kwargs)
912
913    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
914        """Pause drive r/w operations"""
915        if not event:
916            self.pause_drive(drive, "read_aio")
917            self.pause_drive(drive, "write_aio")
918            return
919        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
920
921    def resume_drive(self, drive: str) -> None:
922        """Resume drive r/w operations"""
923        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
924
925    def hmp_qemu_io(self, drive: str, cmd: str,
926                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
927        """Write to a given drive using an HMP command"""
928        d = '-d ' if qdev else ''
929        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
930
931    def flatten_qmp_object(self, obj, output=None, basestr=''):
932        if output is None:
933            output = {}
934        if isinstance(obj, list):
935            for i, item in enumerate(obj):
936                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
937        elif isinstance(obj, dict):
938            for key in obj:
939                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
940        else:
941            output[basestr[:-1]] = obj # Strip trailing '.'
942        return output
943
944    def qmp_to_opts(self, obj):
945        obj = self.flatten_qmp_object(obj)
946        output_list = []
947        for key in obj:
948            output_list += [key + '=' + obj[key]]
949        return ','.join(output_list)
950
951    def get_qmp_events_filtered(self, wait=60.0):
952        result = []
953        for ev in self.get_qmp_events(wait=wait):
954            result.append(filter_qmp_event(ev))
955        return result
956
957    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
958        full_cmd = OrderedDict((
959            ("execute", cmd),
960            ("arguments", ordered_qmp(kwargs))
961        ))
962        log(full_cmd, filters, indent=indent)
963        result = self.qmp(cmd, **kwargs)
964        log(result, filters, indent=indent)
965        return result
966
967    # Returns None on success, and an error string on failure
968    def run_job(self, job: str, auto_finalize: bool = True,
969                auto_dismiss: bool = False,
970                pre_finalize: Optional[Callable[[], None]] = None,
971                cancel: bool = False, wait: float = 60.0,
972                filters: Iterable[Callable[[Any], Any]] = (),
973                ) -> Optional[str]:
974        """
975        run_job moves a job from creation through to dismissal.
976
977        :param job: String. ID of recently-launched job
978        :param auto_finalize: Bool. True if the job was launched with
979                              auto_finalize. Defaults to True.
980        :param auto_dismiss: Bool. True if the job was launched with
981                             auto_dismiss=True. Defaults to False.
982        :param pre_finalize: Callback. A callable that takes no arguments to be
983                             invoked prior to issuing job-finalize, if any.
984        :param cancel: Bool. When true, cancels the job after the pre_finalize
985                       callback.
986        :param wait: Float. Timeout value specifying how long to wait for any
987                     event, in seconds. Defaults to 60.0.
988        """
989        match_device = {'data': {'device': job}}
990        match_id = {'data': {'id': job}}
991        events = [
992            ('BLOCK_JOB_COMPLETED', match_device),
993            ('BLOCK_JOB_CANCELLED', match_device),
994            ('BLOCK_JOB_ERROR', match_device),
995            ('BLOCK_JOB_READY', match_device),
996            ('BLOCK_JOB_PENDING', match_id),
997            ('JOB_STATUS_CHANGE', match_id)
998        ]
999        error = None
1000        while True:
1001            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
1002            if ev['event'] != 'JOB_STATUS_CHANGE':
1003                log(ev, filters=filters)
1004                continue
1005            status = ev['data']['status']
1006            if status == 'aborting':
1007                result = self.qmp('query-jobs')
1008                for j in result['return']:
1009                    if j['id'] == job:
1010                        error = j['error']
1011                        log('Job failed: %s' % (j['error']), filters=filters)
1012            elif status == 'ready':
1013                self.qmp_log('job-complete', id=job, filters=filters)
1014            elif status == 'pending' and not auto_finalize:
1015                if pre_finalize:
1016                    pre_finalize()
1017                if cancel:
1018                    self.qmp_log('job-cancel', id=job, filters=filters)
1019                else:
1020                    self.qmp_log('job-finalize', id=job, filters=filters)
1021            elif status == 'concluded' and not auto_dismiss:
1022                self.qmp_log('job-dismiss', id=job, filters=filters)
1023            elif status == 'null':
1024                return error
1025
1026    # Returns None on success, and an error string on failure
1027    def blockdev_create(self, options, job_id='job0', filters=None):
1028        if filters is None:
1029            filters = [filter_qmp_testfiles]
1030        result = self.qmp_log('blockdev-create', filters=filters,
1031                              job_id=job_id, options=options)
1032
1033        if 'return' in result:
1034            assert result['return'] == {}
1035            job_result = self.run_job(job_id, filters=filters)
1036        else:
1037            job_result = result['error']
1038
1039        log("")
1040        return job_result
1041
1042    def enable_migration_events(self, name):
1043        log('Enabling migration QMP events on %s...' % name)
1044        log(self.qmp('migrate-set-capabilities', capabilities=[
1045            {
1046                'capability': 'events',
1047                'state': True
1048            }
1049        ]))
1050
1051    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1052        while True:
1053            event = self.event_wait('MIGRATION')
1054            # We use the default timeout, and with a timeout, event_wait()
1055            # never returns None
1056            assert event
1057
1058            log(event, filters=[filter_qmp_event])
1059            if event['data']['status'] in ('completed', 'failed'):
1060                break
1061
1062        if event['data']['status'] == 'completed':
1063            # The event may occur in finish-migrate, so wait for the expected
1064            # post-migration runstate
1065            runstate = None
1066            while runstate != expect_runstate:
1067                runstate = self.qmp('query-status')['return']['status']
1068            return True
1069        else:
1070            return False
1071
1072    def node_info(self, node_name):
1073        nodes = self.qmp('query-named-block-nodes')
1074        for x in nodes['return']:
1075            if x['node-name'] == node_name:
1076                return x
1077        return None
1078
1079    def query_bitmaps(self):
1080        res = self.qmp("query-named-block-nodes")
1081        return {device['node-name']: device['dirty-bitmaps']
1082                for device in res['return'] if 'dirty-bitmaps' in device}
1083
1084    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1085        """
1086        get a specific bitmap from the object returned by query_bitmaps.
1087        :param recording: If specified, filter results by the specified value.
1088        :param bitmaps: If specified, use it instead of call query_bitmaps()
1089        """
1090        if bitmaps is None:
1091            bitmaps = self.query_bitmaps()
1092
1093        for bitmap in bitmaps[node_name]:
1094            if bitmap.get('name', '') == bitmap_name:
1095                if recording is None or bitmap.get('recording') == recording:
1096                    return bitmap
1097        return None
1098
1099    def check_bitmap_status(self, node_name, bitmap_name, fields):
1100        ret = self.get_bitmap(node_name, bitmap_name)
1101
1102        return fields.items() <= ret.items()
1103
1104    def assert_block_path(self, root, path, expected_node, graph=None):
1105        """
1106        Check whether the node under the given path in the block graph
1107        is @expected_node.
1108
1109        @root is the node name of the node where the @path is rooted.
1110
1111        @path is a string that consists of child names separated by
1112        slashes.  It must begin with a slash.
1113
1114        Examples for @root + @path:
1115          - root="qcow2-node", path="/backing/file"
1116          - root="quorum-node", path="/children.2/file"
1117
1118        Hypothetically, @path could be empty, in which case it would
1119        point to @root.  However, in practice this case is not useful
1120        and hence not allowed.
1121
1122        @expected_node may be None.  (All elements of the path but the
1123        leaf must still exist.)
1124
1125        @graph may be None or the result of an x-debug-query-block-graph
1126        call that has already been performed.
1127        """
1128        if graph is None:
1129            graph = self.qmp('x-debug-query-block-graph')['return']
1130
1131        iter_path = iter(path.split('/'))
1132
1133        # Must start with a /
1134        assert next(iter_path) == ''
1135
1136        node = next((node for node in graph['nodes'] if node['name'] == root),
1137                    None)
1138
1139        # An empty @path is not allowed, so the root node must be present
1140        assert node is not None, 'Root node %s not found' % root
1141
1142        for child_name in iter_path:
1143            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1144
1145            try:
1146                node_id = next(edge['child'] for edge in graph['edges']
1147                               if (edge['parent'] == node['id'] and
1148                                   edge['name'] == child_name))
1149
1150                node = next(node for node in graph['nodes']
1151                            if node['id'] == node_id)
1152
1153            except StopIteration:
1154                node = None
1155
1156        if node is None:
1157            assert expected_node is None, \
1158                   'No node found under %s (but expected %s)' % \
1159                   (path, expected_node)
1160        else:
1161            assert node['name'] == expected_node, \
1162                   'Found node %s under %s (but expected %s)' % \
1163                   (node['name'], path, expected_node)
1164
1165index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1166
1167class QMPTestCase(unittest.TestCase):
1168    '''Abstract base class for QMP test cases'''
1169
1170    def __init__(self, *args, **kwargs):
1171        super().__init__(*args, **kwargs)
1172        # Many users of this class set a VM property we rely on heavily
1173        # in the methods below.
1174        self.vm = None
1175
1176    def dictpath(self, d, path):
1177        '''Traverse a path in a nested dict'''
1178        for component in path.split('/'):
1179            m = index_re.match(component)
1180            if m:
1181                component, idx = m.groups()
1182                idx = int(idx)
1183
1184            if not isinstance(d, dict) or component not in d:
1185                self.fail(f'failed path traversal for "{path}" in "{d}"')
1186            d = d[component]
1187
1188            if m:
1189                if not isinstance(d, list):
1190                    self.fail(f'path component "{component}" in "{path}" '
1191                              f'is not a list in "{d}"')
1192                try:
1193                    d = d[idx]
1194                except IndexError:
1195                    self.fail(f'invalid index "{idx}" in path "{path}" '
1196                              f'in "{d}"')
1197        return d
1198
1199    def assert_qmp_absent(self, d, path):
1200        try:
1201            result = self.dictpath(d, path)
1202        except AssertionError:
1203            return
1204        self.fail('path "%s" has value "%s"' % (path, str(result)))
1205
1206    def assert_qmp(self, d, path, value):
1207        '''Assert that the value for a specific path in a QMP dict
1208           matches.  When given a list of values, assert that any of
1209           them matches.'''
1210
1211        result = self.dictpath(d, path)
1212
1213        # [] makes no sense as a list of valid values, so treat it as
1214        # an actual single value.
1215        if isinstance(value, list) and value != []:
1216            for v in value:
1217                if result == v:
1218                    return
1219            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1220        else:
1221            self.assertEqual(result, value,
1222                             '"%s" is "%s", expected "%s"'
1223                             % (path, str(result), str(value)))
1224
1225    def assert_no_active_block_jobs(self):
1226        result = self.vm.qmp('query-block-jobs')
1227        self.assert_qmp(result, 'return', [])
1228
1229    def assert_has_block_node(self, node_name=None, file_name=None):
1230        """Issue a query-named-block-nodes and assert node_name and/or
1231        file_name is present in the result"""
1232        def check_equal_or_none(a, b):
1233            return a is None or b is None or a == b
1234        assert node_name or file_name
1235        result = self.vm.qmp('query-named-block-nodes')
1236        for x in result["return"]:
1237            if check_equal_or_none(x.get("node-name"), node_name) and \
1238                    check_equal_or_none(x.get("file"), file_name):
1239                return
1240        self.fail("Cannot find %s %s in result:\n%s" %
1241                  (node_name, file_name, result))
1242
1243    def assert_json_filename_equal(self, json_filename, reference):
1244        '''Asserts that the given filename is a json: filename and that its
1245           content is equal to the given reference object'''
1246        self.assertEqual(json_filename[:5], 'json:')
1247        self.assertEqual(
1248            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1249            self.vm.flatten_qmp_object(reference)
1250        )
1251
1252    def cancel_and_wait(self, drive='drive0', force=False,
1253                        resume=False, wait=60.0):
1254        '''Cancel a block job and wait for it to finish, returning the event'''
1255        self.vm.cmd('block-job-cancel', device=drive, force=force)
1256
1257        if resume:
1258            self.vm.resume_drive(drive)
1259
1260        cancelled = False
1261        result = None
1262        while not cancelled:
1263            for event in self.vm.get_qmp_events(wait=wait):
1264                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1265                   event['event'] == 'BLOCK_JOB_CANCELLED':
1266                    self.assert_qmp(event, 'data/device', drive)
1267                    result = event
1268                    cancelled = True
1269                elif event['event'] == 'JOB_STATUS_CHANGE':
1270                    self.assert_qmp(event, 'data/id', drive)
1271
1272
1273        self.assert_no_active_block_jobs()
1274        return result
1275
1276    def wait_until_completed(self, drive='drive0', check_offset=True,
1277                             wait=60.0, error=None):
1278        '''Wait for a block job to finish, returning the event'''
1279        while True:
1280            for event in self.vm.get_qmp_events(wait=wait):
1281                if event['event'] == 'BLOCK_JOB_COMPLETED':
1282                    self.assert_qmp(event, 'data/device', drive)
1283                    if error is None:
1284                        self.assert_qmp_absent(event, 'data/error')
1285                        if check_offset:
1286                            self.assert_qmp(event, 'data/offset',
1287                                            event['data']['len'])
1288                    else:
1289                        self.assert_qmp(event, 'data/error', error)
1290                    self.assert_no_active_block_jobs()
1291                    return event
1292                if event['event'] == 'JOB_STATUS_CHANGE':
1293                    self.assert_qmp(event, 'data/id', drive)
1294
1295    def wait_ready(self, drive='drive0'):
1296        """Wait until a BLOCK_JOB_READY event, and return the event."""
1297        return self.vm.events_wait([
1298            ('BLOCK_JOB_READY',
1299             {'data': {'type': 'mirror', 'device': drive}}),
1300            ('BLOCK_JOB_READY',
1301             {'data': {'type': 'commit', 'device': drive}})
1302        ])
1303
1304    def wait_ready_and_cancel(self, drive='drive0'):
1305        self.wait_ready(drive=drive)
1306        event = self.cancel_and_wait(drive=drive)
1307        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1308        self.assert_qmp(event, 'data/type', 'mirror')
1309        self.assert_qmp(event, 'data/offset', event['data']['len'])
1310
1311    def complete_and_wait(self, drive='drive0', wait_ready=True,
1312                          completion_error=None):
1313        '''Complete a block job and wait for it to finish'''
1314        if wait_ready:
1315            self.wait_ready(drive=drive)
1316
1317        self.vm.cmd('block-job-complete', device=drive)
1318
1319        event = self.wait_until_completed(drive=drive, error=completion_error)
1320        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1321
1322    def pause_wait(self, job_id='job0'):
1323        with Timeout(3, "Timeout waiting for job to pause"):
1324            while True:
1325                result = self.vm.qmp('query-block-jobs')
1326                found = False
1327                for job in result['return']:
1328                    if job['device'] == job_id:
1329                        found = True
1330                        if job['paused'] and not job['busy']:
1331                            return job
1332                        break
1333                assert found
1334
1335    def pause_job(self, job_id='job0', wait=True):
1336        self.vm.cmd('block-job-pause', device=job_id)
1337        if wait:
1338            self.pause_wait(job_id)
1339
1340    def case_skip(self, reason):
1341        '''Skip this test case'''
1342        case_notrun(reason)
1343        self.skipTest(reason)
1344
1345
1346def notrun(reason):
1347    '''Skip this test suite'''
1348    # Each test in qemu-iotests has a number ("seq")
1349    seq = os.path.basename(sys.argv[0])
1350
1351    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1352            as outfile:
1353        outfile.write(reason + '\n')
1354    logger.warning("%s not run: %s", seq, reason)
1355    sys.exit(0)
1356
1357def case_notrun(reason):
1358    '''Mark this test case as not having been run (without actually
1359    skipping it, that is left to the caller).  See
1360    QMPTestCase.case_skip() for a variant that actually skips the
1361    current test case.'''
1362
1363    # Each test in qemu-iotests has a number ("seq")
1364    seq = os.path.basename(sys.argv[0])
1365
1366    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1367            as outfile:
1368        outfile.write('    [case not run] ' + reason + '\n')
1369
1370def _verify_image_format(supported_fmts: Sequence[str] = (),
1371                         unsupported_fmts: Sequence[str] = ()) -> None:
1372    if 'generic' in supported_fmts and \
1373            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1374        # similar to
1375        #   _supported_fmt generic
1376        # for bash tests
1377        supported_fmts = ()
1378
1379    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1380    if not_sup or (imgfmt in unsupported_fmts):
1381        notrun('not suitable for this image format: %s' % imgfmt)
1382
1383    if imgfmt == 'luks':
1384        verify_working_luks()
1385
1386def _verify_protocol(supported: Sequence[str] = (),
1387                     unsupported: Sequence[str] = ()) -> None:
1388    assert not (supported and unsupported)
1389
1390    if 'generic' in supported:
1391        return
1392
1393    not_sup = supported and (imgproto not in supported)
1394    if not_sup or (imgproto in unsupported):
1395        notrun('not suitable for this protocol: %s' % imgproto)
1396
1397def _verify_platform(supported: Sequence[str] = (),
1398                     unsupported: Sequence[str] = ()) -> None:
1399    if any((sys.platform.startswith(x) for x in unsupported)):
1400        notrun('not suitable for this OS: %s' % sys.platform)
1401
1402    if supported:
1403        if not any((sys.platform.startswith(x) for x in supported)):
1404            notrun('not suitable for this OS: %s' % sys.platform)
1405
1406def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1407    if supported_cache_modes and (cachemode not in supported_cache_modes):
1408        notrun('not suitable for this cache mode: %s' % cachemode)
1409
1410def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1411    if supported_aio_modes and (aiomode not in supported_aio_modes):
1412        notrun('not suitable for this aio mode: %s' % aiomode)
1413
1414def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1415    usf_list = list(set(required_formats) - set(supported_formats()))
1416    if usf_list:
1417        notrun(f'formats {usf_list} are not whitelisted')
1418
1419
1420def _verify_virtio_blk() -> None:
1421    out = qemu_pipe('-M', 'none', '-device', 'help')
1422    if 'virtio-blk' not in out:
1423        notrun('Missing virtio-blk in QEMU binary')
1424
1425def verify_virtio_scsi_pci_or_ccw() -> None:
1426    out = qemu_pipe('-M', 'none', '-device', 'help')
1427    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1428        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1429
1430
1431def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1432    imgopts = os.environ.get('IMGOPTS')
1433    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1434    # but it supported only for bash tests. We don't have a concept of global
1435    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1436    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1437    unsup = list(unsupported) + ['$']
1438    if imgopts and any(x in imgopts for x in unsup):
1439        notrun(f'not suitable for this imgopts: {imgopts}')
1440
1441
1442def supports_quorum() -> bool:
1443    return 'quorum' in qemu_img('--help').stdout
1444
1445def verify_quorum():
1446    '''Skip test suite if quorum support is not available'''
1447    if not supports_quorum():
1448        notrun('quorum support missing')
1449
1450def has_working_luks() -> Tuple[bool, str]:
1451    """
1452    Check whether our LUKS driver can actually create images
1453    (this extends to LUKS encryption for qcow2).
1454
1455    If not, return the reason why.
1456    """
1457
1458    img_file = f'{test_dir}/luks-test.luks'
1459    res = qemu_img('create', '-f', 'luks',
1460                   '--object', luks_default_secret_object,
1461                   '-o', luks_default_key_secret_opt,
1462                   '-o', 'iter-time=10',
1463                   img_file, '1G',
1464                   check=False)
1465    try:
1466        os.remove(img_file)
1467    except OSError:
1468        pass
1469
1470    if res.returncode:
1471        reason = res.stdout
1472        for line in res.stdout.splitlines():
1473            if img_file + ':' in line:
1474                reason = line.split(img_file + ':', 1)[1].strip()
1475                break
1476
1477        return (False, reason)
1478    else:
1479        return (True, '')
1480
1481def verify_working_luks():
1482    """
1483    Skip test suite if LUKS does not work
1484    """
1485    (working, reason) = has_working_luks()
1486    if not working:
1487        notrun(reason)
1488
1489def supports_qcow2_zstd_compression() -> bool:
1490    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1491    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1492                   img_file, '0',
1493                   check=False)
1494    try:
1495        os.remove(img_file)
1496    except OSError:
1497        pass
1498
1499    if res.returncode == 1 and \
1500            "'compression-type' does not accept value 'zstd'" in res.stdout:
1501        return False
1502    else:
1503        return True
1504
1505def verify_qcow2_zstd_compression():
1506    if not supports_qcow2_zstd_compression():
1507        notrun('zstd compression not supported')
1508
1509def qemu_pipe(*args: str) -> str:
1510    """
1511    Run qemu with an option to print something and exit (e.g. a help option).
1512
1513    :return: QEMU's stdout output.
1514    """
1515    full_args = [qemu_prog] + qemu_opts + list(args)
1516    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1517    return output
1518
1519def supported_formats(read_only=False):
1520    '''Set 'read_only' to True to check ro-whitelist
1521       Otherwise, rw-whitelist is checked'''
1522
1523    if not hasattr(supported_formats, "formats"):
1524        supported_formats.formats = {}
1525
1526    if read_only not in supported_formats.formats:
1527        format_message = qemu_pipe("-drive", "format=help")
1528        line = 1 if read_only else 0
1529        supported_formats.formats[read_only] = \
1530            format_message.splitlines()[line].split(":")[1].split()
1531
1532    return supported_formats.formats[read_only]
1533
1534def skip_if_unsupported(required_formats=(), read_only=False):
1535    '''Skip Test Decorator
1536       Runs the test if all the required formats are whitelisted'''
1537    def skip_test_decorator(func):
1538        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1539                         **kwargs: Dict[str, Any]) -> None:
1540            if callable(required_formats):
1541                fmts = required_formats(test_case)
1542            else:
1543                fmts = required_formats
1544
1545            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1546            if usf_list:
1547                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1548                test_case.case_skip(msg)
1549            else:
1550                func(test_case, *args, **kwargs)
1551        return func_wrapper
1552    return skip_test_decorator
1553
1554def skip_for_formats(formats: Sequence[str] = ()) \
1555    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1556                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1557    '''Skip Test Decorator
1558       Skips the test for the given formats'''
1559    def skip_test_decorator(func):
1560        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1561                         **kwargs: Dict[str, Any]) -> None:
1562            if imgfmt in formats:
1563                msg = f'{test_case}: Skipped for format {imgfmt}'
1564                test_case.case_skip(msg)
1565            else:
1566                func(test_case, *args, **kwargs)
1567        return func_wrapper
1568    return skip_test_decorator
1569
1570def skip_if_user_is_root(func):
1571    '''Skip Test Decorator
1572       Runs the test only without root permissions'''
1573    def func_wrapper(*args, **kwargs):
1574        if os.getuid() == 0:
1575            case_notrun('{}: cannot be run as root'.format(args[0]))
1576            return None
1577        else:
1578            return func(*args, **kwargs)
1579    return func_wrapper
1580
1581# We need to filter out the time taken from the output so that
1582# qemu-iotest can reliably diff the results against master output,
1583# and hide skipped tests from the reference output.
1584
1585class ReproducibleTestResult(unittest.TextTestResult):
1586    def addSkip(self, test, reason):
1587        # Same as TextTestResult, but print dot instead of "s"
1588        unittest.TestResult.addSkip(self, test, reason)
1589        if self.showAll:
1590            self.stream.writeln("skipped {0!r}".format(reason))
1591        elif self.dots:
1592            self.stream.write(".")
1593            self.stream.flush()
1594
1595class ReproducibleStreamWrapper:
1596    def __init__(self, stream: TextIO):
1597        self.stream = stream
1598
1599    def __getattr__(self, attr):
1600        if attr in ('stream', '__getstate__'):
1601            raise AttributeError(attr)
1602        return getattr(self.stream, attr)
1603
1604    def write(self, arg=None):
1605        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1606        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1607        self.stream.write(arg)
1608
1609class ReproducibleTestRunner(unittest.TextTestRunner):
1610    def __init__(self, stream: Optional[TextIO] = None,
1611                 resultclass: Type[unittest.TestResult] =
1612                 ReproducibleTestResult,
1613                 **kwargs: Any) -> None:
1614        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1615        super().__init__(stream=rstream,           # type: ignore
1616                         descriptions=True,
1617                         resultclass=resultclass,
1618                         **kwargs)
1619
1620def execute_unittest(argv: List[str], debug: bool = False) -> None:
1621    """Executes unittests within the calling module."""
1622
1623    # Some tests have warnings, especially ResourceWarnings for unclosed
1624    # files and sockets.  Ignore them for now to ensure reproducibility of
1625    # the test output.
1626    unittest.main(argv=argv,
1627                  testRunner=ReproducibleTestRunner,
1628                  verbosity=2 if debug else 1,
1629                  warnings=None if sys.warnoptions else 'ignore')
1630
1631def execute_setup_common(supported_fmts: Sequence[str] = (),
1632                         supported_platforms: Sequence[str] = (),
1633                         supported_cache_modes: Sequence[str] = (),
1634                         supported_aio_modes: Sequence[str] = (),
1635                         unsupported_fmts: Sequence[str] = (),
1636                         supported_protocols: Sequence[str] = (),
1637                         unsupported_protocols: Sequence[str] = (),
1638                         required_fmts: Sequence[str] = (),
1639                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1640    """
1641    Perform necessary setup for either script-style or unittest-style tests.
1642
1643    :return: Bool; Whether or not debug mode has been requested via the CLI.
1644    """
1645    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1646
1647    debug = '-d' in sys.argv
1648    if debug:
1649        sys.argv.remove('-d')
1650    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1651
1652    _verify_image_format(supported_fmts, unsupported_fmts)
1653    _verify_protocol(supported_protocols, unsupported_protocols)
1654    _verify_platform(supported=supported_platforms)
1655    _verify_cache_mode(supported_cache_modes)
1656    _verify_aio_mode(supported_aio_modes)
1657    _verify_formats(required_fmts)
1658    _verify_virtio_blk()
1659    _verify_imgopts(unsupported_imgopts)
1660
1661    return debug
1662
1663def execute_test(*args, test_function=None, **kwargs):
1664    """Run either unittest or script-style tests."""
1665
1666    debug = execute_setup_common(*args, **kwargs)
1667    if not test_function:
1668        execute_unittest(sys.argv, debug)
1669    else:
1670        test_function()
1671
1672def activate_logging():
1673    """Activate iotests.log() output to stdout for script-style tests."""
1674    handler = logging.StreamHandler(stream=sys.stdout)
1675    formatter = logging.Formatter('%(message)s')
1676    handler.setFormatter(formatter)
1677    test_logger.addHandler(handler)
1678    test_logger.setLevel(logging.INFO)
1679    test_logger.propagate = False
1680
1681# This is called from script-style iotests without a single point of entry
1682def script_initialize(*args, **kwargs):
1683    """Initialize script-style tests without running any tests."""
1684    activate_logging()
1685    execute_setup_common(*args, **kwargs)
1686
1687# This is called from script-style iotests with a single point of entry
1688def script_main(test_function, *args, **kwargs):
1689    """Run script-style tests outside of the unittest framework"""
1690    activate_logging()
1691    execute_test(*args, test_function=test_function, **kwargs)
1692
1693# This is called from unittest style iotests
1694def main(*args, **kwargs):
1695    """Run tests using the unittest framework"""
1696    execute_test(*args, **kwargs)
1697