xref: /qemu/tests/qemu-iotests/iotests.py (revision 7c0dfcf9)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd_raw(cmd, args)
464
465    def get_qmp(self) -> QEMUMonitorProtocol:
466        assert self._qmp is not None
467        return self._qmp
468
469    def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \
470            -> QMPReturnValue:
471        assert self._qmp is not None
472        return self._qmp.cmd(cmd, **(args or {}))
473
474    def stop(self, kill_signal=15):
475        self._p.send_signal(kill_signal)
476        self._p.wait()
477        self._p = None
478
479        if self._qmp:
480            self._qmp.close()
481
482        if self._qmpsock is not None:
483            try:
484                os.remove(self._qmpsock)
485            except OSError:
486                pass
487        try:
488            os.remove(self.pidfile)
489        except OSError:
490            pass
491
492    def __del__(self):
493        if self._p is not None:
494            self.stop(kill_signal=9)
495
496
497def qemu_nbd(*args):
498    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
500
501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502    '''Run qemu-nbd in daemon mode and return both the parent's exit code
503       and its output in case of an error'''
504    full_args = qemu_nbd_args + ['--fork'] + list(args)
505    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
506                                                   connect_stderr=False)
507    return returncode, output if returncode else ''
508
509def qemu_nbd_list_log(*args: str) -> str:
510    '''Run qemu-nbd to list remote exports'''
511    full_args = [qemu_nbd_prog, '-L'] + list(args)
512    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
513    log(output, filters=[filter_testfiles, filter_nbd_exports])
514    return output
515
516@contextmanager
517def qemu_nbd_popen(*args):
518    '''Context manager running qemu-nbd within the context'''
519    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
520
521    assert not os.path.exists(pid_file)
522
523    cmd = list(qemu_nbd_args)
524    cmd.extend(('--persistent', '--pid-file', pid_file))
525    cmd.extend(args)
526
527    log('Start NBD server')
528    with subprocess.Popen(cmd) as p:
529        try:
530            while not os.path.exists(pid_file):
531                if p.poll() is not None:
532                    raise RuntimeError(
533                        "qemu-nbd terminated with exit code {}: {}"
534                        .format(p.returncode, ' '.join(cmd)))
535
536                time.sleep(0.01)
537            yield
538        finally:
539            if os.path.exists(pid_file):
540                os.remove(pid_file)
541            log('Kill NBD server')
542            p.kill()
543            p.wait()
544
545def compare_images(img1: str, img2: str,
546                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
547    """
548    Compare two images with QEMU_IMG; return True if they are identical.
549
550    :raise CalledProcessError:
551        when qemu-img crashes or returns a status code of anything other
552        than 0 (identical) or 1 (different).
553    """
554    try:
555        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
556        return True
557    except subprocess.CalledProcessError as exc:
558        if exc.returncode == 1:
559            return False
560        raise
561
562def create_image(name, size):
563    '''Create a fully-allocated raw image with sector markers'''
564    with open(name, 'wb') as file:
565        i = 0
566        while i < size:
567            sector = struct.pack('>l504xl', i // 512, i // 512)
568            file.write(sector)
569            i = i + 512
570
571def image_size(img: str) -> int:
572    """Return image's virtual size"""
573    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
574    if not isinstance(value, int):
575        type_name = type(value).__name__
576        raise TypeError("Expected 'int' for 'virtual-size', "
577                        f"got '{value}' of type '{type_name}'")
578    return value
579
580def is_str(val):
581    return isinstance(val, str)
582
583test_dir_re = re.compile(r"%s" % test_dir)
584def filter_test_dir(msg):
585    return test_dir_re.sub("TEST_DIR", msg)
586
587win32_re = re.compile(r"\r")
588def filter_win32(msg):
589    return win32_re.sub("", msg)
590
591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593                        r"and [0-9\/.inf]* ops\/sec\)")
594def filter_qemu_io(msg):
595    msg = filter_win32(msg)
596    return qemu_io_re.sub("X ops; XX:XX:XX.X "
597                          "(XXX YYY/sec and XXX ops/sec)", msg)
598
599chown_re = re.compile(r"chown [0-9]+:[0-9]+")
600def filter_chown(msg):
601    return chown_re.sub("chown UID:GID", msg)
602
603def filter_qmp_event(event):
604    '''Filter a QMP event dict'''
605    event = dict(event)
606    if 'timestamp' in event:
607        event['timestamp']['seconds'] = 'SECS'
608        event['timestamp']['microseconds'] = 'USECS'
609    return event
610
611def filter_qmp(qmsg, filter_fn):
612    '''Given a string filter, filter a QMP object's values.
613    filter_fn takes a (key, value) pair.'''
614    # Iterate through either lists or dicts;
615    if isinstance(qmsg, list):
616        items = enumerate(qmsg)
617    elif isinstance(qmsg, dict):
618        items = qmsg.items()
619    else:
620        return filter_fn(None, qmsg)
621
622    for k, v in items:
623        if isinstance(v, (dict, list)):
624            qmsg[k] = filter_qmp(v, filter_fn)
625        else:
626            qmsg[k] = filter_fn(k, v)
627    return qmsg
628
629def filter_testfiles(msg):
630    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
631    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
632    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
633
634def filter_qmp_testfiles(qmsg):
635    def _filter(_key, value):
636        if is_str(value):
637            return filter_testfiles(value)
638        return value
639    return filter_qmp(qmsg, _filter)
640
641def filter_virtio_scsi(output: str) -> str:
642    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
643
644def filter_qmp_virtio_scsi(qmsg):
645    def _filter(_key, value):
646        if is_str(value):
647            return filter_virtio_scsi(value)
648        return value
649    return filter_qmp(qmsg, _filter)
650
651def filter_generated_node_ids(msg):
652    return re.sub("#block[0-9]+", "NODE_NAME", msg)
653
654def filter_qmp_generated_node_ids(qmsg):
655    def _filter(_key, value):
656        if is_str(value):
657            return filter_generated_node_ids(value)
658        return value
659    return filter_qmp(qmsg, _filter)
660
661def filter_img_info(output: str, filename: str,
662                    drop_child_info: bool = True) -> str:
663    lines = []
664    drop_indented = False
665    for line in output.split('\n'):
666        if 'disk size' in line or 'actual-size' in line:
667            continue
668
669        # Drop child node info
670        if drop_indented:
671            if line.startswith(' '):
672                continue
673            drop_indented = False
674        if drop_child_info and "Child node '/" in line:
675            drop_indented = True
676            continue
677
678        line = line.replace(filename, 'TEST_IMG')
679        line = filter_testfiles(line)
680        line = line.replace(imgfmt, 'IMGFMT')
681        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
682        line = re.sub('uuid: [-a-f0-9]+',
683                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
684                      line)
685        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
686        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
687                      line)
688        lines.append(line)
689    return '\n'.join(lines)
690
691def filter_imgfmt(msg):
692    return msg.replace(imgfmt, 'IMGFMT')
693
694def filter_qmp_imgfmt(qmsg):
695    def _filter(_key, value):
696        if is_str(value):
697            return filter_imgfmt(value)
698        return value
699    return filter_qmp(qmsg, _filter)
700
701def filter_nbd_exports(output: str) -> str:
702    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
703
704
705Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
706
707def log(msg: Msg,
708        filters: Iterable[Callable[[Msg], Msg]] = (),
709        indent: Optional[int] = None) -> None:
710    """
711    Logs either a string message or a JSON serializable message (like QMP).
712    If indent is provided, JSON serializable messages are pretty-printed.
713    """
714    for flt in filters:
715        msg = flt(msg)
716    if isinstance(msg, (dict, list)):
717        # Don't sort if it's already sorted
718        do_sort = not isinstance(msg, OrderedDict)
719        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
720    else:
721        test_logger.info(msg)
722
723class Timeout:
724    def __init__(self, seconds, errmsg="Timeout"):
725        self.seconds = seconds
726        self.errmsg = errmsg
727    def __enter__(self):
728        if qemu_gdb or qemu_valgrind:
729            return self
730        signal.signal(signal.SIGALRM, self.timeout)
731        signal.setitimer(signal.ITIMER_REAL, self.seconds)
732        return self
733    def __exit__(self, exc_type, value, traceback):
734        if qemu_gdb or qemu_valgrind:
735            return False
736        signal.setitimer(signal.ITIMER_REAL, 0)
737        return False
738    def timeout(self, signum, frame):
739        raise TimeoutError(self.errmsg)
740
741def file_pattern(name):
742    return "{0}-{1}".format(os.getpid(), name)
743
744class FilePath:
745    """
746    Context manager generating multiple file names. The generated files are
747    removed when exiting the context.
748
749    Example usage:
750
751        with FilePath('a.img', 'b.img') as (img_a, img_b):
752            # Use img_a and img_b here...
753
754        # a.img and b.img are automatically removed here.
755
756    By default images are created in iotests.test_dir. To create sockets use
757    iotests.sock_dir:
758
759       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
760
761    For convenience, calling with one argument yields a single file instead of
762    a tuple with one item.
763
764    """
765    def __init__(self, *names, base_dir=test_dir):
766        self.paths = [os.path.join(base_dir, file_pattern(name))
767                      for name in names]
768
769    def __enter__(self):
770        if len(self.paths) == 1:
771            return self.paths[0]
772        else:
773            return self.paths
774
775    def __exit__(self, exc_type, exc_val, exc_tb):
776        for path in self.paths:
777            try:
778                os.remove(path)
779            except OSError:
780                pass
781        return False
782
783
784def try_remove(img):
785    try:
786        os.remove(img)
787    except OSError:
788        pass
789
790def file_path_remover():
791    for path in reversed(file_path_remover.paths):
792        try_remove(path)
793
794
795def file_path(*names, base_dir=test_dir):
796    ''' Another way to get auto-generated filename that cleans itself up.
797
798    Use is as simple as:
799
800    img_a, img_b = file_path('a.img', 'b.img')
801    sock = file_path('socket')
802    '''
803
804    if not hasattr(file_path_remover, 'paths'):
805        file_path_remover.paths = []
806        atexit.register(file_path_remover)
807
808    paths = []
809    for name in names:
810        filename = file_pattern(name)
811        path = os.path.join(base_dir, filename)
812        file_path_remover.paths.append(path)
813        paths.append(path)
814
815    return paths[0] if len(paths) == 1 else paths
816
817def remote_filename(path):
818    if imgproto == 'file':
819        return path
820    elif imgproto == 'ssh':
821        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
822    else:
823        raise ValueError("Protocol %s not supported" % (imgproto))
824
825class VM(qtest.QEMUQtestMachine):
826    '''A QEMU VM'''
827
828    def __init__(self, path_suffix=''):
829        name = "qemu%s-%d" % (path_suffix, os.getpid())
830        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
831        if qemu_gdb and qemu_valgrind:
832            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
833            sys.exit(1)
834        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
835        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
836                         name=name,
837                         base_temp_dir=test_dir,
838                         qmp_timer=timer)
839        self._num_drives = 0
840
841    def _post_shutdown(self) -> None:
842        super()._post_shutdown()
843        if not qemu_valgrind or not self._popen:
844            return
845        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
846        if self.exitcode() == 99:
847            with open(valgrind_filename, encoding='utf-8') as f:
848                print(f.read())
849        else:
850            os.remove(valgrind_filename)
851
852    def _pre_launch(self) -> None:
853        super()._pre_launch()
854        if qemu_print:
855            # set QEMU binary output to stdout
856            self._close_qemu_log_file()
857
858    def add_object(self, opts):
859        self._args.append('-object')
860        self._args.append(opts)
861        return self
862
863    def add_device(self, opts):
864        self._args.append('-device')
865        self._args.append(opts)
866        return self
867
868    def add_drive_raw(self, opts):
869        self._args.append('-drive')
870        self._args.append(opts)
871        return self
872
873    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
874        '''Add a virtio-blk drive to the VM'''
875        options = ['if=%s' % interface,
876                   'id=drive%d' % self._num_drives]
877
878        if path is not None:
879            options.append('file=%s' % path)
880            options.append('format=%s' % img_format)
881            options.append('cache=%s' % cachemode)
882            options.append('aio=%s' % aiomode)
883
884        if opts:
885            options.append(opts)
886
887        if img_format == 'luks' and 'key-secret' not in opts:
888            # default luks support
889            if luks_default_secret_object not in self._args:
890                self.add_object(luks_default_secret_object)
891
892            options.append(luks_default_key_secret_opt)
893
894        self._args.append('-drive')
895        self._args.append(','.join(options))
896        self._num_drives += 1
897        return self
898
899    def add_blockdev(self, opts):
900        self._args.append('-blockdev')
901        if isinstance(opts, str):
902            self._args.append(opts)
903        else:
904            self._args.append(','.join(opts))
905        return self
906
907    def add_incoming(self, addr):
908        self._args.append('-incoming')
909        self._args.append(addr)
910        return self
911
912    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
913        cmd = 'human-monitor-command'
914        kwargs: Dict[str, Any] = {'command-line': command_line}
915        if use_log:
916            return self.qmp_log(cmd, **kwargs)
917        else:
918            return self.qmp(cmd, **kwargs)
919
920    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
921        """Pause drive r/w operations"""
922        if not event:
923            self.pause_drive(drive, "read_aio")
924            self.pause_drive(drive, "write_aio")
925            return
926        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
927
928    def resume_drive(self, drive: str) -> None:
929        """Resume drive r/w operations"""
930        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
931
932    def hmp_qemu_io(self, drive: str, cmd: str,
933                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
934        """Write to a given drive using an HMP command"""
935        d = '-d ' if qdev else ''
936        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
937
938    def flatten_qmp_object(self, obj, output=None, basestr=''):
939        if output is None:
940            output = {}
941        if isinstance(obj, list):
942            for i, item in enumerate(obj):
943                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
944        elif isinstance(obj, dict):
945            for key in obj:
946                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
947        else:
948            output[basestr[:-1]] = obj # Strip trailing '.'
949        return output
950
951    def qmp_to_opts(self, obj):
952        obj = self.flatten_qmp_object(obj)
953        output_list = []
954        for key in obj:
955            output_list += [key + '=' + obj[key]]
956        return ','.join(output_list)
957
958    def get_qmp_events_filtered(self, wait=60.0):
959        result = []
960        for ev in self.get_qmp_events(wait=wait):
961            result.append(filter_qmp_event(ev))
962        return result
963
964    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
965        full_cmd = OrderedDict((
966            ("execute", cmd),
967            ("arguments", ordered_qmp(kwargs))
968        ))
969        log(full_cmd, filters, indent=indent)
970        result = self.qmp(cmd, **kwargs)
971        log(result, filters, indent=indent)
972        return result
973
974    # Returns None on success, and an error string on failure
975    def run_job(self, job: str, auto_finalize: bool = True,
976                auto_dismiss: bool = False,
977                pre_finalize: Optional[Callable[[], None]] = None,
978                cancel: bool = False, wait: float = 60.0,
979                filters: Iterable[Callable[[Any], Any]] = (),
980                ) -> Optional[str]:
981        """
982        run_job moves a job from creation through to dismissal.
983
984        :param job: String. ID of recently-launched job
985        :param auto_finalize: Bool. True if the job was launched with
986                              auto_finalize. Defaults to True.
987        :param auto_dismiss: Bool. True if the job was launched with
988                             auto_dismiss=True. Defaults to False.
989        :param pre_finalize: Callback. A callable that takes no arguments to be
990                             invoked prior to issuing job-finalize, if any.
991        :param cancel: Bool. When true, cancels the job after the pre_finalize
992                       callback.
993        :param wait: Float. Timeout value specifying how long to wait for any
994                     event, in seconds. Defaults to 60.0.
995        """
996        match_device = {'data': {'device': job}}
997        match_id = {'data': {'id': job}}
998        events = [
999            ('BLOCK_JOB_COMPLETED', match_device),
1000            ('BLOCK_JOB_CANCELLED', match_device),
1001            ('BLOCK_JOB_ERROR', match_device),
1002            ('BLOCK_JOB_READY', match_device),
1003            ('BLOCK_JOB_PENDING', match_id),
1004            ('JOB_STATUS_CHANGE', match_id)
1005        ]
1006        error = None
1007        while True:
1008            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
1009            if ev['event'] != 'JOB_STATUS_CHANGE':
1010                log(ev, filters=filters)
1011                continue
1012            status = ev['data']['status']
1013            if status == 'aborting':
1014                result = self.qmp('query-jobs')
1015                for j in result['return']:
1016                    if j['id'] == job:
1017                        error = j['error']
1018                        log('Job failed: %s' % (j['error']), filters=filters)
1019            elif status == 'ready':
1020                self.qmp_log('job-complete', id=job, filters=filters)
1021            elif status == 'pending' and not auto_finalize:
1022                if pre_finalize:
1023                    pre_finalize()
1024                if cancel:
1025                    self.qmp_log('job-cancel', id=job, filters=filters)
1026                else:
1027                    self.qmp_log('job-finalize', id=job, filters=filters)
1028            elif status == 'concluded' and not auto_dismiss:
1029                self.qmp_log('job-dismiss', id=job, filters=filters)
1030            elif status == 'null':
1031                return error
1032
1033    # Returns None on success, and an error string on failure
1034    def blockdev_create(self, options, job_id='job0', filters=None):
1035        if filters is None:
1036            filters = [filter_qmp_testfiles]
1037        result = self.qmp_log('blockdev-create', filters=filters,
1038                              job_id=job_id, options=options)
1039
1040        if 'return' in result:
1041            assert result['return'] == {}
1042            job_result = self.run_job(job_id, filters=filters)
1043        else:
1044            job_result = result['error']
1045
1046        log("")
1047        return job_result
1048
1049    def enable_migration_events(self, name):
1050        log('Enabling migration QMP events on %s...' % name)
1051        log(self.qmp('migrate-set-capabilities', capabilities=[
1052            {
1053                'capability': 'events',
1054                'state': True
1055            }
1056        ]))
1057
1058    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1059        while True:
1060            event = self.event_wait('MIGRATION')
1061            # We use the default timeout, and with a timeout, event_wait()
1062            # never returns None
1063            assert event
1064
1065            log(event, filters=[filter_qmp_event])
1066            if event['data']['status'] in ('completed', 'failed'):
1067                break
1068
1069        if event['data']['status'] == 'completed':
1070            # The event may occur in finish-migrate, so wait for the expected
1071            # post-migration runstate
1072            runstate = None
1073            while runstate != expect_runstate:
1074                runstate = self.qmp('query-status')['return']['status']
1075            return True
1076        else:
1077            return False
1078
1079    def node_info(self, node_name):
1080        nodes = self.qmp('query-named-block-nodes')
1081        for x in nodes['return']:
1082            if x['node-name'] == node_name:
1083                return x
1084        return None
1085
1086    def query_bitmaps(self):
1087        res = self.qmp("query-named-block-nodes")
1088        return {device['node-name']: device['dirty-bitmaps']
1089                for device in res['return'] if 'dirty-bitmaps' in device}
1090
1091    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1092        """
1093        get a specific bitmap from the object returned by query_bitmaps.
1094        :param recording: If specified, filter results by the specified value.
1095        :param bitmaps: If specified, use it instead of call query_bitmaps()
1096        """
1097        if bitmaps is None:
1098            bitmaps = self.query_bitmaps()
1099
1100        for bitmap in bitmaps[node_name]:
1101            if bitmap.get('name', '') == bitmap_name:
1102                if recording is None or bitmap.get('recording') == recording:
1103                    return bitmap
1104        return None
1105
1106    def check_bitmap_status(self, node_name, bitmap_name, fields):
1107        ret = self.get_bitmap(node_name, bitmap_name)
1108
1109        return fields.items() <= ret.items()
1110
1111    def assert_block_path(self, root, path, expected_node, graph=None):
1112        """
1113        Check whether the node under the given path in the block graph
1114        is @expected_node.
1115
1116        @root is the node name of the node where the @path is rooted.
1117
1118        @path is a string that consists of child names separated by
1119        slashes.  It must begin with a slash.
1120
1121        Examples for @root + @path:
1122          - root="qcow2-node", path="/backing/file"
1123          - root="quorum-node", path="/children.2/file"
1124
1125        Hypothetically, @path could be empty, in which case it would
1126        point to @root.  However, in practice this case is not useful
1127        and hence not allowed.
1128
1129        @expected_node may be None.  (All elements of the path but the
1130        leaf must still exist.)
1131
1132        @graph may be None or the result of an x-debug-query-block-graph
1133        call that has already been performed.
1134        """
1135        if graph is None:
1136            graph = self.qmp('x-debug-query-block-graph')['return']
1137
1138        iter_path = iter(path.split('/'))
1139
1140        # Must start with a /
1141        assert next(iter_path) == ''
1142
1143        node = next((node for node in graph['nodes'] if node['name'] == root),
1144                    None)
1145
1146        # An empty @path is not allowed, so the root node must be present
1147        assert node is not None, 'Root node %s not found' % root
1148
1149        for child_name in iter_path:
1150            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1151
1152            try:
1153                node_id = next(edge['child'] for edge in graph['edges']
1154                               if (edge['parent'] == node['id'] and
1155                                   edge['name'] == child_name))
1156
1157                node = next(node for node in graph['nodes']
1158                            if node['id'] == node_id)
1159
1160            except StopIteration:
1161                node = None
1162
1163        if node is None:
1164            assert expected_node is None, \
1165                   'No node found under %s (but expected %s)' % \
1166                   (path, expected_node)
1167        else:
1168            assert node['name'] == expected_node, \
1169                   'Found node %s under %s (but expected %s)' % \
1170                   (node['name'], path, expected_node)
1171
1172index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1173
1174class QMPTestCase(unittest.TestCase):
1175    '''Abstract base class for QMP test cases'''
1176
1177    def __init__(self, *args, **kwargs):
1178        super().__init__(*args, **kwargs)
1179        # Many users of this class set a VM property we rely on heavily
1180        # in the methods below.
1181        self.vm = None
1182
1183    def dictpath(self, d, path):
1184        '''Traverse a path in a nested dict'''
1185        for component in path.split('/'):
1186            m = index_re.match(component)
1187            if m:
1188                component, idx = m.groups()
1189                idx = int(idx)
1190
1191            if not isinstance(d, dict) or component not in d:
1192                self.fail(f'failed path traversal for "{path}" in "{d}"')
1193            d = d[component]
1194
1195            if m:
1196                if not isinstance(d, list):
1197                    self.fail(f'path component "{component}" in "{path}" '
1198                              f'is not a list in "{d}"')
1199                try:
1200                    d = d[idx]
1201                except IndexError:
1202                    self.fail(f'invalid index "{idx}" in path "{path}" '
1203                              f'in "{d}"')
1204        return d
1205
1206    def assert_qmp_absent(self, d, path):
1207        try:
1208            result = self.dictpath(d, path)
1209        except AssertionError:
1210            return
1211        self.fail('path "%s" has value "%s"' % (path, str(result)))
1212
1213    def assert_qmp(self, d, path, value):
1214        '''Assert that the value for a specific path in a QMP dict
1215           matches.  When given a list of values, assert that any of
1216           them matches.'''
1217
1218        result = self.dictpath(d, path)
1219
1220        # [] makes no sense as a list of valid values, so treat it as
1221        # an actual single value.
1222        if isinstance(value, list) and value != []:
1223            for v in value:
1224                if result == v:
1225                    return
1226            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1227        else:
1228            self.assertEqual(result, value,
1229                             '"%s" is "%s", expected "%s"'
1230                             % (path, str(result), str(value)))
1231
1232    def assert_no_active_block_jobs(self):
1233        result = self.vm.qmp('query-block-jobs')
1234        self.assert_qmp(result, 'return', [])
1235
1236    def assert_has_block_node(self, node_name=None, file_name=None):
1237        """Issue a query-named-block-nodes and assert node_name and/or
1238        file_name is present in the result"""
1239        def check_equal_or_none(a, b):
1240            return a is None or b is None or a == b
1241        assert node_name or file_name
1242        result = self.vm.qmp('query-named-block-nodes')
1243        for x in result["return"]:
1244            if check_equal_or_none(x.get("node-name"), node_name) and \
1245                    check_equal_or_none(x.get("file"), file_name):
1246                return
1247        self.fail("Cannot find %s %s in result:\n%s" %
1248                  (node_name, file_name, result))
1249
1250    def assert_json_filename_equal(self, json_filename, reference):
1251        '''Asserts that the given filename is a json: filename and that its
1252           content is equal to the given reference object'''
1253        self.assertEqual(json_filename[:5], 'json:')
1254        self.assertEqual(
1255            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1256            self.vm.flatten_qmp_object(reference)
1257        )
1258
1259    def cancel_and_wait(self, drive='drive0', force=False,
1260                        resume=False, wait=60.0):
1261        '''Cancel a block job and wait for it to finish, returning the event'''
1262        self.vm.cmd('block-job-cancel', device=drive, force=force)
1263
1264        if resume:
1265            self.vm.resume_drive(drive)
1266
1267        cancelled = False
1268        result = None
1269        while not cancelled:
1270            for event in self.vm.get_qmp_events(wait=wait):
1271                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1272                   event['event'] == 'BLOCK_JOB_CANCELLED':
1273                    self.assert_qmp(event, 'data/device', drive)
1274                    result = event
1275                    cancelled = True
1276                elif event['event'] == 'JOB_STATUS_CHANGE':
1277                    self.assert_qmp(event, 'data/id', drive)
1278
1279
1280        self.assert_no_active_block_jobs()
1281        return result
1282
1283    def wait_until_completed(self, drive='drive0', check_offset=True,
1284                             wait=60.0, error=None):
1285        '''Wait for a block job to finish, returning the event'''
1286        while True:
1287            for event in self.vm.get_qmp_events(wait=wait):
1288                if event['event'] == 'BLOCK_JOB_COMPLETED':
1289                    self.assert_qmp(event, 'data/device', drive)
1290                    if error is None:
1291                        self.assert_qmp_absent(event, 'data/error')
1292                        if check_offset:
1293                            self.assert_qmp(event, 'data/offset',
1294                                            event['data']['len'])
1295                    else:
1296                        self.assert_qmp(event, 'data/error', error)
1297                    self.assert_no_active_block_jobs()
1298                    return event
1299                if event['event'] == 'JOB_STATUS_CHANGE':
1300                    self.assert_qmp(event, 'data/id', drive)
1301
1302    def wait_ready(self, drive='drive0'):
1303        """Wait until a BLOCK_JOB_READY event, and return the event."""
1304        return self.vm.events_wait([
1305            ('BLOCK_JOB_READY',
1306             {'data': {'type': 'mirror', 'device': drive}}),
1307            ('BLOCK_JOB_READY',
1308             {'data': {'type': 'commit', 'device': drive}})
1309        ])
1310
1311    def wait_ready_and_cancel(self, drive='drive0'):
1312        self.wait_ready(drive=drive)
1313        event = self.cancel_and_wait(drive=drive)
1314        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1315        self.assert_qmp(event, 'data/type', 'mirror')
1316        self.assert_qmp(event, 'data/offset', event['data']['len'])
1317
1318    def complete_and_wait(self, drive='drive0', wait_ready=True,
1319                          completion_error=None):
1320        '''Complete a block job and wait for it to finish'''
1321        if wait_ready:
1322            self.wait_ready(drive=drive)
1323
1324        self.vm.cmd('block-job-complete', device=drive)
1325
1326        event = self.wait_until_completed(drive=drive, error=completion_error)
1327        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1328
1329    def pause_wait(self, job_id='job0'):
1330        with Timeout(3, "Timeout waiting for job to pause"):
1331            while True:
1332                result = self.vm.qmp('query-block-jobs')
1333                found = False
1334                for job in result['return']:
1335                    if job['device'] == job_id:
1336                        found = True
1337                        if job['paused'] and not job['busy']:
1338                            return job
1339                        break
1340                assert found
1341
1342    def pause_job(self, job_id='job0', wait=True):
1343        self.vm.cmd('block-job-pause', device=job_id)
1344        if wait:
1345            self.pause_wait(job_id)
1346
1347    def case_skip(self, reason):
1348        '''Skip this test case'''
1349        case_notrun(reason)
1350        self.skipTest(reason)
1351
1352
1353def notrun(reason):
1354    '''Skip this test suite'''
1355    # Each test in qemu-iotests has a number ("seq")
1356    seq = os.path.basename(sys.argv[0])
1357
1358    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1359            as outfile:
1360        outfile.write(reason + '\n')
1361    logger.warning("%s not run: %s", seq, reason)
1362    sys.exit(0)
1363
1364def case_notrun(reason):
1365    '''Mark this test case as not having been run (without actually
1366    skipping it, that is left to the caller).  See
1367    QMPTestCase.case_skip() for a variant that actually skips the
1368    current test case.'''
1369
1370    # Each test in qemu-iotests has a number ("seq")
1371    seq = os.path.basename(sys.argv[0])
1372
1373    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1374            as outfile:
1375        outfile.write('    [case not run] ' + reason + '\n')
1376
1377def _verify_image_format(supported_fmts: Sequence[str] = (),
1378                         unsupported_fmts: Sequence[str] = ()) -> None:
1379    if 'generic' in supported_fmts and \
1380            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1381        # similar to
1382        #   _supported_fmt generic
1383        # for bash tests
1384        supported_fmts = ()
1385
1386    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1387    if not_sup or (imgfmt in unsupported_fmts):
1388        notrun('not suitable for this image format: %s' % imgfmt)
1389
1390    if imgfmt == 'luks':
1391        verify_working_luks()
1392
1393def _verify_protocol(supported: Sequence[str] = (),
1394                     unsupported: Sequence[str] = ()) -> None:
1395    assert not (supported and unsupported)
1396
1397    if 'generic' in supported:
1398        return
1399
1400    not_sup = supported and (imgproto not in supported)
1401    if not_sup or (imgproto in unsupported):
1402        notrun('not suitable for this protocol: %s' % imgproto)
1403
1404def _verify_platform(supported: Sequence[str] = (),
1405                     unsupported: Sequence[str] = ()) -> None:
1406    if any((sys.platform.startswith(x) for x in unsupported)):
1407        notrun('not suitable for this OS: %s' % sys.platform)
1408
1409    if supported:
1410        if not any((sys.platform.startswith(x) for x in supported)):
1411            notrun('not suitable for this OS: %s' % sys.platform)
1412
1413def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1414    if supported_cache_modes and (cachemode not in supported_cache_modes):
1415        notrun('not suitable for this cache mode: %s' % cachemode)
1416
1417def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1418    if supported_aio_modes and (aiomode not in supported_aio_modes):
1419        notrun('not suitable for this aio mode: %s' % aiomode)
1420
1421def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1422    usf_list = list(set(required_formats) - set(supported_formats()))
1423    if usf_list:
1424        notrun(f'formats {usf_list} are not whitelisted')
1425
1426
1427def _verify_virtio_blk() -> None:
1428    out = qemu_pipe('-M', 'none', '-device', 'help')
1429    if 'virtio-blk' not in out:
1430        notrun('Missing virtio-blk in QEMU binary')
1431
1432def verify_virtio_scsi_pci_or_ccw() -> None:
1433    out = qemu_pipe('-M', 'none', '-device', 'help')
1434    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1435        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1436
1437
1438def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1439    imgopts = os.environ.get('IMGOPTS')
1440    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1441    # but it supported only for bash tests. We don't have a concept of global
1442    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1443    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1444    unsup = list(unsupported) + ['$']
1445    if imgopts and any(x in imgopts for x in unsup):
1446        notrun(f'not suitable for this imgopts: {imgopts}')
1447
1448
1449def supports_quorum() -> bool:
1450    return 'quorum' in qemu_img('--help').stdout
1451
1452def verify_quorum():
1453    '''Skip test suite if quorum support is not available'''
1454    if not supports_quorum():
1455        notrun('quorum support missing')
1456
1457def has_working_luks() -> Tuple[bool, str]:
1458    """
1459    Check whether our LUKS driver can actually create images
1460    (this extends to LUKS encryption for qcow2).
1461
1462    If not, return the reason why.
1463    """
1464
1465    img_file = f'{test_dir}/luks-test.luks'
1466    res = qemu_img('create', '-f', 'luks',
1467                   '--object', luks_default_secret_object,
1468                   '-o', luks_default_key_secret_opt,
1469                   '-o', 'iter-time=10',
1470                   img_file, '1G',
1471                   check=False)
1472    try:
1473        os.remove(img_file)
1474    except OSError:
1475        pass
1476
1477    if res.returncode:
1478        reason = res.stdout
1479        for line in res.stdout.splitlines():
1480            if img_file + ':' in line:
1481                reason = line.split(img_file + ':', 1)[1].strip()
1482                break
1483
1484        return (False, reason)
1485    else:
1486        return (True, '')
1487
1488def verify_working_luks():
1489    """
1490    Skip test suite if LUKS does not work
1491    """
1492    (working, reason) = has_working_luks()
1493    if not working:
1494        notrun(reason)
1495
1496def supports_qcow2_zstd_compression() -> bool:
1497    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1498    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1499                   img_file, '0',
1500                   check=False)
1501    try:
1502        os.remove(img_file)
1503    except OSError:
1504        pass
1505
1506    if res.returncode == 1 and \
1507            "'compression-type' does not accept value 'zstd'" in res.stdout:
1508        return False
1509    else:
1510        return True
1511
1512def verify_qcow2_zstd_compression():
1513    if not supports_qcow2_zstd_compression():
1514        notrun('zstd compression not supported')
1515
1516def qemu_pipe(*args: str) -> str:
1517    """
1518    Run qemu with an option to print something and exit (e.g. a help option).
1519
1520    :return: QEMU's stdout output.
1521    """
1522    full_args = [qemu_prog] + qemu_opts + list(args)
1523    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1524    return output
1525
1526def supported_formats(read_only=False):
1527    '''Set 'read_only' to True to check ro-whitelist
1528       Otherwise, rw-whitelist is checked'''
1529
1530    if not hasattr(supported_formats, "formats"):
1531        supported_formats.formats = {}
1532
1533    if read_only not in supported_formats.formats:
1534        format_message = qemu_pipe("-drive", "format=help")
1535        line = 1 if read_only else 0
1536        supported_formats.formats[read_only] = \
1537            format_message.splitlines()[line].split(":")[1].split()
1538
1539    return supported_formats.formats[read_only]
1540
1541def skip_if_unsupported(required_formats=(), read_only=False):
1542    '''Skip Test Decorator
1543       Runs the test if all the required formats are whitelisted'''
1544    def skip_test_decorator(func):
1545        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1546                         **kwargs: Dict[str, Any]) -> None:
1547            if callable(required_formats):
1548                fmts = required_formats(test_case)
1549            else:
1550                fmts = required_formats
1551
1552            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1553            if usf_list:
1554                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1555                test_case.case_skip(msg)
1556            else:
1557                func(test_case, *args, **kwargs)
1558        return func_wrapper
1559    return skip_test_decorator
1560
1561def skip_for_formats(formats: Sequence[str] = ()) \
1562    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1563                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1564    '''Skip Test Decorator
1565       Skips the test for the given formats'''
1566    def skip_test_decorator(func):
1567        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1568                         **kwargs: Dict[str, Any]) -> None:
1569            if imgfmt in formats:
1570                msg = f'{test_case}: Skipped for format {imgfmt}'
1571                test_case.case_skip(msg)
1572            else:
1573                func(test_case, *args, **kwargs)
1574        return func_wrapper
1575    return skip_test_decorator
1576
1577def skip_if_user_is_root(func):
1578    '''Skip Test Decorator
1579       Runs the test only without root permissions'''
1580    def func_wrapper(*args, **kwargs):
1581        if os.getuid() == 0:
1582            case_notrun('{}: cannot be run as root'.format(args[0]))
1583            return None
1584        else:
1585            return func(*args, **kwargs)
1586    return func_wrapper
1587
1588# We need to filter out the time taken from the output so that
1589# qemu-iotest can reliably diff the results against master output,
1590# and hide skipped tests from the reference output.
1591
1592class ReproducibleTestResult(unittest.TextTestResult):
1593    def addSkip(self, test, reason):
1594        # Same as TextTestResult, but print dot instead of "s"
1595        unittest.TestResult.addSkip(self, test, reason)
1596        if self.showAll:
1597            self.stream.writeln("skipped {0!r}".format(reason))
1598        elif self.dots:
1599            self.stream.write(".")
1600            self.stream.flush()
1601
1602class ReproducibleStreamWrapper:
1603    def __init__(self, stream: TextIO):
1604        self.stream = stream
1605
1606    def __getattr__(self, attr):
1607        if attr in ('stream', '__getstate__'):
1608            raise AttributeError(attr)
1609        return getattr(self.stream, attr)
1610
1611    def write(self, arg=None):
1612        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1613        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1614        self.stream.write(arg)
1615
1616class ReproducibleTestRunner(unittest.TextTestRunner):
1617    def __init__(self, stream: Optional[TextIO] = None,
1618                 resultclass: Type[unittest.TestResult] =
1619                 ReproducibleTestResult,
1620                 **kwargs: Any) -> None:
1621        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1622        super().__init__(stream=rstream,           # type: ignore
1623                         descriptions=True,
1624                         resultclass=resultclass,
1625                         **kwargs)
1626
1627def execute_unittest(argv: List[str], debug: bool = False) -> None:
1628    """Executes unittests within the calling module."""
1629
1630    # Some tests have warnings, especially ResourceWarnings for unclosed
1631    # files and sockets.  Ignore them for now to ensure reproducibility of
1632    # the test output.
1633    unittest.main(argv=argv,
1634                  testRunner=ReproducibleTestRunner,
1635                  verbosity=2 if debug else 1,
1636                  warnings=None if sys.warnoptions else 'ignore')
1637
1638def execute_setup_common(supported_fmts: Sequence[str] = (),
1639                         supported_platforms: Sequence[str] = (),
1640                         supported_cache_modes: Sequence[str] = (),
1641                         supported_aio_modes: Sequence[str] = (),
1642                         unsupported_fmts: Sequence[str] = (),
1643                         supported_protocols: Sequence[str] = (),
1644                         unsupported_protocols: Sequence[str] = (),
1645                         required_fmts: Sequence[str] = (),
1646                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1647    """
1648    Perform necessary setup for either script-style or unittest-style tests.
1649
1650    :return: Bool; Whether or not debug mode has been requested via the CLI.
1651    """
1652    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1653
1654    debug = '-d' in sys.argv
1655    if debug:
1656        sys.argv.remove('-d')
1657    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1658
1659    _verify_image_format(supported_fmts, unsupported_fmts)
1660    _verify_protocol(supported_protocols, unsupported_protocols)
1661    _verify_platform(supported=supported_platforms)
1662    _verify_cache_mode(supported_cache_modes)
1663    _verify_aio_mode(supported_aio_modes)
1664    _verify_formats(required_fmts)
1665    _verify_virtio_blk()
1666    _verify_imgopts(unsupported_imgopts)
1667
1668    return debug
1669
1670def execute_test(*args, test_function=None, **kwargs):
1671    """Run either unittest or script-style tests."""
1672
1673    debug = execute_setup_common(*args, **kwargs)
1674    if not test_function:
1675        execute_unittest(sys.argv, debug)
1676    else:
1677        test_function()
1678
1679def activate_logging():
1680    """Activate iotests.log() output to stdout for script-style tests."""
1681    handler = logging.StreamHandler(stream=sys.stdout)
1682    formatter = logging.Formatter('%(message)s')
1683    handler.setFormatter(formatter)
1684    test_logger.addHandler(handler)
1685    test_logger.setLevel(logging.INFO)
1686    test_logger.propagate = False
1687
1688# This is called from script-style iotests without a single point of entry
1689def script_initialize(*args, **kwargs):
1690    """Initialize script-style tests without running any tests."""
1691    activate_logging()
1692    execute_setup_common(*args, **kwargs)
1693
1694# This is called from script-style iotests with a single point of entry
1695def script_main(test_function, *args, **kwargs):
1696    """Run script-style tests outside of the unittest framework"""
1697    activate_logging()
1698    execute_test(*args, test_function=test_function, **kwargs)
1699
1700# This is called from unittest style iotests
1701def main(*args, **kwargs):
1702    """Run tests using the unittest framework"""
1703    execute_test(*args, **kwargs)
1704