xref: /qemu/tests/qemu-iotests/iotests.py (revision d201cf7a)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76gdb_qemu_env = os.environ.get('GDB_OPTIONS')
77qemu_gdb = []
78if gdb_qemu_env:
79    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
80
81qemu_print = os.environ.get('PRINT_QEMU', False)
82
83imgfmt = os.environ.get('IMGFMT', 'raw')
84imgproto = os.environ.get('IMGPROTO', 'file')
85output_dir = os.environ.get('OUTPUT_DIR', '.')
86
87try:
88    test_dir = os.environ['TEST_DIR']
89    sock_dir = os.environ['SOCK_DIR']
90    cachemode = os.environ['CACHEMODE']
91    aiomode = os.environ['AIOMODE']
92    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
93except KeyError:
94    # We are using these variables as proxies to indicate that we're
95    # not being run via "check". There may be other things set up by
96    # "check" that individual test cases rely on.
97    sys.stderr.write('Please run this test via the "check" script\n')
98    sys.exit(os.EX_USAGE)
99
100qemu_valgrind = []
101if os.environ.get('VALGRIND_QEMU') == "y" and \
102    os.environ.get('NO_VALGRIND') != "y":
103    valgrind_logfile = "--log-file=" + test_dir
104    # %p allows to put the valgrind process PID, since
105    # we don't know it a priori (subprocess.Popen is
106    # not yet invoked)
107    valgrind_logfile += "/%p.valgrind"
108
109    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
110
111luks_default_secret_object = 'secret,id=keysec0,data=' + \
112                             os.environ.get('IMGKEYSECRET', '')
113luks_default_key_secret_opt = 'key-secret=keysec0'
114
115sample_img_dir = os.environ['SAMPLE_IMG_DIR']
116
117
118@contextmanager
119def change_log_level(
120        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
121    """
122    Utility function for temporarily changing the log level of a logger.
123
124    This can be used to silence errors that are expected or uninteresting.
125    """
126    _logger = logging.getLogger(logger_name)
127    current_level = _logger.level
128    _logger.setLevel(level)
129
130    try:
131        yield
132    finally:
133        _logger.setLevel(current_level)
134
135
136def unarchive_sample_image(sample, fname):
137    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
138    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
139        shutil.copyfileobj(f_in, f_out)
140
141
142def qemu_tool_popen(args: Sequence[str],
143                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
144    stderr = subprocess.STDOUT if connect_stderr else None
145    # pylint: disable=consider-using-with
146    return subprocess.Popen(args,
147                            stdout=subprocess.PIPE,
148                            stderr=stderr,
149                            universal_newlines=True)
150
151
152def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
153                              connect_stderr: bool = True,
154                              drop_successful_output: bool = False) \
155        -> Tuple[str, int]:
156    """
157    Run a tool and return both its output and its exit code
158    """
159    with qemu_tool_popen(args, connect_stderr) as subp:
160        output = subp.communicate()[0]
161        if subp.returncode < 0:
162            cmd = ' '.join(args)
163            sys.stderr.write(f'{tool} received signal \
164                               {-subp.returncode}: {cmd}\n')
165        if drop_successful_output and subp.returncode == 0:
166            output = ''
167        return (output, subp.returncode)
168
169def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
170    if not args or args[0] != 'create':
171        return list(args)
172    args = args[1:]
173
174    p = argparse.ArgumentParser(allow_abbrev=False)
175    # -o option may be specified several times
176    p.add_argument('-o', action='append', default=[])
177    p.add_argument('-f')
178    parsed, remaining = p.parse_known_args(args)
179
180    opts_list = parsed.o
181
182    result = ['create']
183    if parsed.f is not None:
184        result += ['-f', parsed.f]
185
186    # IMGOPTS most probably contain options specific for the selected format,
187    # like extended_l2 or compression_type for qcow2. Test may want to create
188    # additional images in other formats that doesn't support these options.
189    # So, use IMGOPTS only for images created in imgfmt format.
190    imgopts = os.environ.get('IMGOPTS')
191    if imgopts and parsed.f == imgfmt:
192        opts_list.insert(0, imgopts)
193
194    # default luks support
195    if parsed.f == 'luks' and \
196            all('key-secret' not in opts for opts in opts_list):
197        result += ['--object', luks_default_secret_object]
198        opts_list.append(luks_default_key_secret_opt)
199
200    for opts in opts_list:
201        result += ['-o', opts]
202
203    result += remaining
204
205    return result
206
207def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
208    """
209    Run qemu-img and return both its output and its exit code
210    """
211    is_create = bool(args and args[0] == 'create')
212    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
213    return qemu_tool_pipe_and_status('qemu-img', full_args,
214                                     drop_successful_output=is_create)
215
216def qemu_img(*args: str) -> int:
217    '''Run qemu-img and return the exit code'''
218    return qemu_img_pipe_and_status(*args)[1]
219
220def ordered_qmp(qmsg, conv_keys=True):
221    # Dictionaries are not ordered prior to 3.6, therefore:
222    if isinstance(qmsg, list):
223        return [ordered_qmp(atom) for atom in qmsg]
224    if isinstance(qmsg, dict):
225        od = OrderedDict()
226        for k, v in sorted(qmsg.items()):
227            if conv_keys:
228                k = k.replace('_', '-')
229            od[k] = ordered_qmp(v, conv_keys=False)
230        return od
231    return qmsg
232
233def qemu_img_create(*args):
234    return qemu_img('create', *args)
235
236def qemu_img_measure(*args):
237    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
238
239def qemu_img_check(*args):
240    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
241
242def qemu_img_pipe(*args: str) -> str:
243    '''Run qemu-img and return its output'''
244    return qemu_img_pipe_and_status(*args)[0]
245
246def qemu_img_log(*args):
247    result = qemu_img_pipe(*args)
248    log(result, filters=[filter_testfiles])
249    return result
250
251def img_info_log(filename, filter_path=None, use_image_opts=False,
252                 extra_args=()):
253    args = ['info']
254    if use_image_opts:
255        args.append('--image-opts')
256    else:
257        args += ['-f', imgfmt]
258    args += extra_args
259    args.append(filename)
260
261    output = qemu_img_pipe(*args)
262    if not filter_path:
263        filter_path = filename
264    log(filter_img_info(output, filter_path))
265
266def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
267    if '-f' in args or '--image-opts' in args:
268        return qemu_io_args_no_fmt + list(args)
269    else:
270        return qemu_io_args + list(args)
271
272def qemu_io_popen(*args):
273    return qemu_tool_popen(qemu_io_wrap_args(args))
274
275def qemu_io(*args):
276    '''Run qemu-io and return the stdout data'''
277    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
278
279def qemu_io_log(*args):
280    result = qemu_io(*args)
281    log(result, filters=[filter_testfiles, filter_qemu_io])
282    return result
283
284def qemu_io_silent(*args):
285    '''Run qemu-io and return the exit code, suppressing stdout'''
286    args = qemu_io_wrap_args(args)
287    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
288    if result.returncode < 0:
289        sys.stderr.write('qemu-io received signal %i: %s\n' %
290                         (-result.returncode, ' '.join(args)))
291    return result.returncode
292
293def qemu_io_silent_check(*args):
294    '''Run qemu-io and return the true if subprocess returned 0'''
295    args = qemu_io_wrap_args(args)
296    result = subprocess.run(args, stdout=subprocess.DEVNULL,
297                            stderr=subprocess.STDOUT, check=False)
298    return result.returncode == 0
299
300class QemuIoInteractive:
301    def __init__(self, *args):
302        self.args = qemu_io_wrap_args(args)
303        # We need to keep the Popen objext around, and not
304        # close it immediately. Therefore, disable the pylint check:
305        # pylint: disable=consider-using-with
306        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
307                                   stdout=subprocess.PIPE,
308                                   stderr=subprocess.STDOUT,
309                                   universal_newlines=True)
310        out = self._p.stdout.read(9)
311        if out != 'qemu-io> ':
312            # Most probably qemu-io just failed to start.
313            # Let's collect the whole output and exit.
314            out += self._p.stdout.read()
315            self._p.wait(timeout=1)
316            raise ValueError(out)
317
318    def close(self):
319        self._p.communicate('q\n')
320
321    def _read_output(self):
322        pattern = 'qemu-io> '
323        n = len(pattern)
324        pos = 0
325        s = []
326        while pos != n:
327            c = self._p.stdout.read(1)
328            # check unexpected EOF
329            assert c != ''
330            s.append(c)
331            if c == pattern[pos]:
332                pos += 1
333            else:
334                pos = 0
335
336        return ''.join(s[:-n])
337
338    def cmd(self, cmd):
339        # quit command is in close(), '\n' is added automatically
340        assert '\n' not in cmd
341        cmd = cmd.strip()
342        assert cmd not in ('q', 'quit')
343        self._p.stdin.write(cmd + '\n')
344        self._p.stdin.flush()
345        return self._read_output()
346
347
348def qemu_nbd(*args):
349    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
350    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
351
352def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
353    '''Run qemu-nbd in daemon mode and return both the parent's exit code
354       and its output in case of an error'''
355    full_args = qemu_nbd_args + ['--fork'] + list(args)
356    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
357                                                   connect_stderr=False)
358    return returncode, output if returncode else ''
359
360def qemu_nbd_list_log(*args: str) -> str:
361    '''Run qemu-nbd to list remote exports'''
362    full_args = [qemu_nbd_prog, '-L'] + list(args)
363    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
364    log(output, filters=[filter_testfiles, filter_nbd_exports])
365    return output
366
367@contextmanager
368def qemu_nbd_popen(*args):
369    '''Context manager running qemu-nbd within the context'''
370    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
371
372    assert not os.path.exists(pid_file)
373
374    cmd = list(qemu_nbd_args)
375    cmd.extend(('--persistent', '--pid-file', pid_file))
376    cmd.extend(args)
377
378    log('Start NBD server')
379    with subprocess.Popen(cmd) as p:
380        try:
381            while not os.path.exists(pid_file):
382                if p.poll() is not None:
383                    raise RuntimeError(
384                        "qemu-nbd terminated with exit code {}: {}"
385                        .format(p.returncode, ' '.join(cmd)))
386
387                time.sleep(0.01)
388            yield
389        finally:
390            if os.path.exists(pid_file):
391                os.remove(pid_file)
392            log('Kill NBD server')
393            p.kill()
394            p.wait()
395
396def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
397    '''Return True if two image files are identical'''
398    return qemu_img('compare', '-f', fmt1,
399                    '-F', fmt2, img1, img2) == 0
400
401def create_image(name, size):
402    '''Create a fully-allocated raw image with sector markers'''
403    with open(name, 'wb') as file:
404        i = 0
405        while i < size:
406            sector = struct.pack('>l504xl', i // 512, i // 512)
407            file.write(sector)
408            i = i + 512
409
410def image_size(img):
411    '''Return image's virtual size'''
412    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
413    return json.loads(r)['virtual-size']
414
415def is_str(val):
416    return isinstance(val, str)
417
418test_dir_re = re.compile(r"%s" % test_dir)
419def filter_test_dir(msg):
420    return test_dir_re.sub("TEST_DIR", msg)
421
422win32_re = re.compile(r"\r")
423def filter_win32(msg):
424    return win32_re.sub("", msg)
425
426qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
427                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
428                        r"and [0-9\/.inf]* ops\/sec\)")
429def filter_qemu_io(msg):
430    msg = filter_win32(msg)
431    return qemu_io_re.sub("X ops; XX:XX:XX.X "
432                          "(XXX YYY/sec and XXX ops/sec)", msg)
433
434chown_re = re.compile(r"chown [0-9]+:[0-9]+")
435def filter_chown(msg):
436    return chown_re.sub("chown UID:GID", msg)
437
438def filter_qmp_event(event):
439    '''Filter a QMP event dict'''
440    event = dict(event)
441    if 'timestamp' in event:
442        event['timestamp']['seconds'] = 'SECS'
443        event['timestamp']['microseconds'] = 'USECS'
444    return event
445
446def filter_qmp(qmsg, filter_fn):
447    '''Given a string filter, filter a QMP object's values.
448    filter_fn takes a (key, value) pair.'''
449    # Iterate through either lists or dicts;
450    if isinstance(qmsg, list):
451        items = enumerate(qmsg)
452    else:
453        items = qmsg.items()
454
455    for k, v in items:
456        if isinstance(v, (dict, list)):
457            qmsg[k] = filter_qmp(v, filter_fn)
458        else:
459            qmsg[k] = filter_fn(k, v)
460    return qmsg
461
462def filter_testfiles(msg):
463    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
464    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
465    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
466
467def filter_qmp_testfiles(qmsg):
468    def _filter(_key, value):
469        if is_str(value):
470            return filter_testfiles(value)
471        return value
472    return filter_qmp(qmsg, _filter)
473
474def filter_virtio_scsi(output: str) -> str:
475    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
476
477def filter_qmp_virtio_scsi(qmsg):
478    def _filter(_key, value):
479        if is_str(value):
480            return filter_virtio_scsi(value)
481        return value
482    return filter_qmp(qmsg, _filter)
483
484def filter_generated_node_ids(msg):
485    return re.sub("#block[0-9]+", "NODE_NAME", msg)
486
487def filter_img_info(output, filename):
488    lines = []
489    for line in output.split('\n'):
490        if 'disk size' in line or 'actual-size' in line:
491            continue
492        line = line.replace(filename, 'TEST_IMG')
493        line = filter_testfiles(line)
494        line = line.replace(imgfmt, 'IMGFMT')
495        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
496        line = re.sub('uuid: [-a-f0-9]+',
497                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
498                      line)
499        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
500        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
501                      line)
502        lines.append(line)
503    return '\n'.join(lines)
504
505def filter_imgfmt(msg):
506    return msg.replace(imgfmt, 'IMGFMT')
507
508def filter_qmp_imgfmt(qmsg):
509    def _filter(_key, value):
510        if is_str(value):
511            return filter_imgfmt(value)
512        return value
513    return filter_qmp(qmsg, _filter)
514
515def filter_nbd_exports(output: str) -> str:
516    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
517
518
519Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
520
521def log(msg: Msg,
522        filters: Iterable[Callable[[Msg], Msg]] = (),
523        indent: Optional[int] = None) -> None:
524    """
525    Logs either a string message or a JSON serializable message (like QMP).
526    If indent is provided, JSON serializable messages are pretty-printed.
527    """
528    for flt in filters:
529        msg = flt(msg)
530    if isinstance(msg, (dict, list)):
531        # Don't sort if it's already sorted
532        do_sort = not isinstance(msg, OrderedDict)
533        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
534    else:
535        test_logger.info(msg)
536
537class Timeout:
538    def __init__(self, seconds, errmsg="Timeout"):
539        self.seconds = seconds
540        self.errmsg = errmsg
541    def __enter__(self):
542        if qemu_gdb or qemu_valgrind:
543            return self
544        signal.signal(signal.SIGALRM, self.timeout)
545        signal.setitimer(signal.ITIMER_REAL, self.seconds)
546        return self
547    def __exit__(self, exc_type, value, traceback):
548        if qemu_gdb or qemu_valgrind:
549            return False
550        signal.setitimer(signal.ITIMER_REAL, 0)
551        return False
552    def timeout(self, signum, frame):
553        raise Exception(self.errmsg)
554
555def file_pattern(name):
556    return "{0}-{1}".format(os.getpid(), name)
557
558class FilePath:
559    """
560    Context manager generating multiple file names. The generated files are
561    removed when exiting the context.
562
563    Example usage:
564
565        with FilePath('a.img', 'b.img') as (img_a, img_b):
566            # Use img_a and img_b here...
567
568        # a.img and b.img are automatically removed here.
569
570    By default images are created in iotests.test_dir. To create sockets use
571    iotests.sock_dir:
572
573       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
574
575    For convenience, calling with one argument yields a single file instead of
576    a tuple with one item.
577
578    """
579    def __init__(self, *names, base_dir=test_dir):
580        self.paths = [os.path.join(base_dir, file_pattern(name))
581                      for name in names]
582
583    def __enter__(self):
584        if len(self.paths) == 1:
585            return self.paths[0]
586        else:
587            return self.paths
588
589    def __exit__(self, exc_type, exc_val, exc_tb):
590        for path in self.paths:
591            try:
592                os.remove(path)
593            except OSError:
594                pass
595        return False
596
597
598def try_remove(img):
599    try:
600        os.remove(img)
601    except OSError:
602        pass
603
604def file_path_remover():
605    for path in reversed(file_path_remover.paths):
606        try_remove(path)
607
608
609def file_path(*names, base_dir=test_dir):
610    ''' Another way to get auto-generated filename that cleans itself up.
611
612    Use is as simple as:
613
614    img_a, img_b = file_path('a.img', 'b.img')
615    sock = file_path('socket')
616    '''
617
618    if not hasattr(file_path_remover, 'paths'):
619        file_path_remover.paths = []
620        atexit.register(file_path_remover)
621
622    paths = []
623    for name in names:
624        filename = file_pattern(name)
625        path = os.path.join(base_dir, filename)
626        file_path_remover.paths.append(path)
627        paths.append(path)
628
629    return paths[0] if len(paths) == 1 else paths
630
631def remote_filename(path):
632    if imgproto == 'file':
633        return path
634    elif imgproto == 'ssh':
635        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
636    else:
637        raise Exception("Protocol %s not supported" % (imgproto))
638
639class VM(qtest.QEMUQtestMachine):
640    '''A QEMU VM'''
641
642    def __init__(self, path_suffix=''):
643        name = "qemu%s-%d" % (path_suffix, os.getpid())
644        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
645        if qemu_gdb and qemu_valgrind:
646            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
647            sys.exit(1)
648        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
649        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
650                         name=name,
651                         base_temp_dir=test_dir,
652                         sock_dir=sock_dir, qmp_timer=timer)
653        self._num_drives = 0
654
655    def _post_shutdown(self) -> None:
656        super()._post_shutdown()
657        if not qemu_valgrind or not self._popen:
658            return
659        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
660        if self.exitcode() == 99:
661            with open(valgrind_filename, encoding='utf-8') as f:
662                print(f.read())
663        else:
664            os.remove(valgrind_filename)
665
666    def _pre_launch(self) -> None:
667        super()._pre_launch()
668        if qemu_print:
669            # set QEMU binary output to stdout
670            self._close_qemu_log_file()
671
672    def add_object(self, opts):
673        self._args.append('-object')
674        self._args.append(opts)
675        return self
676
677    def add_device(self, opts):
678        self._args.append('-device')
679        self._args.append(opts)
680        return self
681
682    def add_drive_raw(self, opts):
683        self._args.append('-drive')
684        self._args.append(opts)
685        return self
686
687    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
688        '''Add a virtio-blk drive to the VM'''
689        options = ['if=%s' % interface,
690                   'id=drive%d' % self._num_drives]
691
692        if path is not None:
693            options.append('file=%s' % path)
694            options.append('format=%s' % img_format)
695            options.append('cache=%s' % cachemode)
696            options.append('aio=%s' % aiomode)
697
698        if opts:
699            options.append(opts)
700
701        if img_format == 'luks' and 'key-secret' not in opts:
702            # default luks support
703            if luks_default_secret_object not in self._args:
704                self.add_object(luks_default_secret_object)
705
706            options.append(luks_default_key_secret_opt)
707
708        self._args.append('-drive')
709        self._args.append(','.join(options))
710        self._num_drives += 1
711        return self
712
713    def add_blockdev(self, opts):
714        self._args.append('-blockdev')
715        if isinstance(opts, str):
716            self._args.append(opts)
717        else:
718            self._args.append(','.join(opts))
719        return self
720
721    def add_incoming(self, addr):
722        self._args.append('-incoming')
723        self._args.append(addr)
724        return self
725
726    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
727        cmd = 'human-monitor-command'
728        kwargs: Dict[str, Any] = {'command-line': command_line}
729        if use_log:
730            return self.qmp_log(cmd, **kwargs)
731        else:
732            return self.qmp(cmd, **kwargs)
733
734    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
735        """Pause drive r/w operations"""
736        if not event:
737            self.pause_drive(drive, "read_aio")
738            self.pause_drive(drive, "write_aio")
739            return
740        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
741
742    def resume_drive(self, drive: str) -> None:
743        """Resume drive r/w operations"""
744        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
745
746    def hmp_qemu_io(self, drive: str, cmd: str,
747                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
748        """Write to a given drive using an HMP command"""
749        d = '-d ' if qdev else ''
750        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
751
752    def flatten_qmp_object(self, obj, output=None, basestr=''):
753        if output is None:
754            output = {}
755        if isinstance(obj, list):
756            for i, item in enumerate(obj):
757                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
758        elif isinstance(obj, dict):
759            for key in obj:
760                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
761        else:
762            output[basestr[:-1]] = obj # Strip trailing '.'
763        return output
764
765    def qmp_to_opts(self, obj):
766        obj = self.flatten_qmp_object(obj)
767        output_list = []
768        for key in obj:
769            output_list += [key + '=' + obj[key]]
770        return ','.join(output_list)
771
772    def get_qmp_events_filtered(self, wait=60.0):
773        result = []
774        for ev in self.get_qmp_events(wait=wait):
775            result.append(filter_qmp_event(ev))
776        return result
777
778    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
779        full_cmd = OrderedDict((
780            ("execute", cmd),
781            ("arguments", ordered_qmp(kwargs))
782        ))
783        log(full_cmd, filters, indent=indent)
784        result = self.qmp(cmd, **kwargs)
785        log(result, filters, indent=indent)
786        return result
787
788    # Returns None on success, and an error string on failure
789    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
790                pre_finalize=None, cancel=False, wait=60.0):
791        """
792        run_job moves a job from creation through to dismissal.
793
794        :param job: String. ID of recently-launched job
795        :param auto_finalize: Bool. True if the job was launched with
796                              auto_finalize. Defaults to True.
797        :param auto_dismiss: Bool. True if the job was launched with
798                             auto_dismiss=True. Defaults to False.
799        :param pre_finalize: Callback. A callable that takes no arguments to be
800                             invoked prior to issuing job-finalize, if any.
801        :param cancel: Bool. When true, cancels the job after the pre_finalize
802                       callback.
803        :param wait: Float. Timeout value specifying how long to wait for any
804                     event, in seconds. Defaults to 60.0.
805        """
806        match_device = {'data': {'device': job}}
807        match_id = {'data': {'id': job}}
808        events = [
809            ('BLOCK_JOB_COMPLETED', match_device),
810            ('BLOCK_JOB_CANCELLED', match_device),
811            ('BLOCK_JOB_ERROR', match_device),
812            ('BLOCK_JOB_READY', match_device),
813            ('BLOCK_JOB_PENDING', match_id),
814            ('JOB_STATUS_CHANGE', match_id)
815        ]
816        error = None
817        while True:
818            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
819            if ev['event'] != 'JOB_STATUS_CHANGE':
820                log(ev)
821                continue
822            status = ev['data']['status']
823            if status == 'aborting':
824                result = self.qmp('query-jobs')
825                for j in result['return']:
826                    if j['id'] == job:
827                        error = j['error']
828                        log('Job failed: %s' % (j['error']))
829            elif status == 'ready':
830                self.qmp_log('job-complete', id=job)
831            elif status == 'pending' and not auto_finalize:
832                if pre_finalize:
833                    pre_finalize()
834                if cancel:
835                    self.qmp_log('job-cancel', id=job)
836                else:
837                    self.qmp_log('job-finalize', id=job)
838            elif status == 'concluded' and not auto_dismiss:
839                self.qmp_log('job-dismiss', id=job)
840            elif status == 'null':
841                return error
842
843    # Returns None on success, and an error string on failure
844    def blockdev_create(self, options, job_id='job0', filters=None):
845        if filters is None:
846            filters = [filter_qmp_testfiles]
847        result = self.qmp_log('blockdev-create', filters=filters,
848                              job_id=job_id, options=options)
849
850        if 'return' in result:
851            assert result['return'] == {}
852            job_result = self.run_job(job_id)
853        else:
854            job_result = result['error']
855
856        log("")
857        return job_result
858
859    def enable_migration_events(self, name):
860        log('Enabling migration QMP events on %s...' % name)
861        log(self.qmp('migrate-set-capabilities', capabilities=[
862            {
863                'capability': 'events',
864                'state': True
865            }
866        ]))
867
868    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
869        while True:
870            event = self.event_wait('MIGRATION')
871            # We use the default timeout, and with a timeout, event_wait()
872            # never returns None
873            assert event
874
875            log(event, filters=[filter_qmp_event])
876            if event['data']['status'] in ('completed', 'failed'):
877                break
878
879        if event['data']['status'] == 'completed':
880            # The event may occur in finish-migrate, so wait for the expected
881            # post-migration runstate
882            runstate = None
883            while runstate != expect_runstate:
884                runstate = self.qmp('query-status')['return']['status']
885            return True
886        else:
887            return False
888
889    def node_info(self, node_name):
890        nodes = self.qmp('query-named-block-nodes')
891        for x in nodes['return']:
892            if x['node-name'] == node_name:
893                return x
894        return None
895
896    def query_bitmaps(self):
897        res = self.qmp("query-named-block-nodes")
898        return {device['node-name']: device['dirty-bitmaps']
899                for device in res['return'] if 'dirty-bitmaps' in device}
900
901    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
902        """
903        get a specific bitmap from the object returned by query_bitmaps.
904        :param recording: If specified, filter results by the specified value.
905        :param bitmaps: If specified, use it instead of call query_bitmaps()
906        """
907        if bitmaps is None:
908            bitmaps = self.query_bitmaps()
909
910        for bitmap in bitmaps[node_name]:
911            if bitmap.get('name', '') == bitmap_name:
912                if recording is None or bitmap.get('recording') == recording:
913                    return bitmap
914        return None
915
916    def check_bitmap_status(self, node_name, bitmap_name, fields):
917        ret = self.get_bitmap(node_name, bitmap_name)
918
919        return fields.items() <= ret.items()
920
921    def assert_block_path(self, root, path, expected_node, graph=None):
922        """
923        Check whether the node under the given path in the block graph
924        is @expected_node.
925
926        @root is the node name of the node where the @path is rooted.
927
928        @path is a string that consists of child names separated by
929        slashes.  It must begin with a slash.
930
931        Examples for @root + @path:
932          - root="qcow2-node", path="/backing/file"
933          - root="quorum-node", path="/children.2/file"
934
935        Hypothetically, @path could be empty, in which case it would
936        point to @root.  However, in practice this case is not useful
937        and hence not allowed.
938
939        @expected_node may be None.  (All elements of the path but the
940        leaf must still exist.)
941
942        @graph may be None or the result of an x-debug-query-block-graph
943        call that has already been performed.
944        """
945        if graph is None:
946            graph = self.qmp('x-debug-query-block-graph')['return']
947
948        iter_path = iter(path.split('/'))
949
950        # Must start with a /
951        assert next(iter_path) == ''
952
953        node = next((node for node in graph['nodes'] if node['name'] == root),
954                    None)
955
956        # An empty @path is not allowed, so the root node must be present
957        assert node is not None, 'Root node %s not found' % root
958
959        for child_name in iter_path:
960            assert node is not None, 'Cannot follow path %s%s' % (root, path)
961
962            try:
963                node_id = next(edge['child'] for edge in graph['edges']
964                               if (edge['parent'] == node['id'] and
965                                   edge['name'] == child_name))
966
967                node = next(node for node in graph['nodes']
968                            if node['id'] == node_id)
969
970            except StopIteration:
971                node = None
972
973        if node is None:
974            assert expected_node is None, \
975                   'No node found under %s (but expected %s)' % \
976                   (path, expected_node)
977        else:
978            assert node['name'] == expected_node, \
979                   'Found node %s under %s (but expected %s)' % \
980                   (node['name'], path, expected_node)
981
982index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
983
984class QMPTestCase(unittest.TestCase):
985    '''Abstract base class for QMP test cases'''
986
987    def __init__(self, *args, **kwargs):
988        super().__init__(*args, **kwargs)
989        # Many users of this class set a VM property we rely on heavily
990        # in the methods below.
991        self.vm = None
992
993    def dictpath(self, d, path):
994        '''Traverse a path in a nested dict'''
995        for component in path.split('/'):
996            m = index_re.match(component)
997            if m:
998                component, idx = m.groups()
999                idx = int(idx)
1000
1001            if not isinstance(d, dict) or component not in d:
1002                self.fail(f'failed path traversal for "{path}" in "{d}"')
1003            d = d[component]
1004
1005            if m:
1006                if not isinstance(d, list):
1007                    self.fail(f'path component "{component}" in "{path}" '
1008                              f'is not a list in "{d}"')
1009                try:
1010                    d = d[idx]
1011                except IndexError:
1012                    self.fail(f'invalid index "{idx}" in path "{path}" '
1013                              f'in "{d}"')
1014        return d
1015
1016    def assert_qmp_absent(self, d, path):
1017        try:
1018            result = self.dictpath(d, path)
1019        except AssertionError:
1020            return
1021        self.fail('path "%s" has value "%s"' % (path, str(result)))
1022
1023    def assert_qmp(self, d, path, value):
1024        '''Assert that the value for a specific path in a QMP dict
1025           matches.  When given a list of values, assert that any of
1026           them matches.'''
1027
1028        result = self.dictpath(d, path)
1029
1030        # [] makes no sense as a list of valid values, so treat it as
1031        # an actual single value.
1032        if isinstance(value, list) and value != []:
1033            for v in value:
1034                if result == v:
1035                    return
1036            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1037        else:
1038            self.assertEqual(result, value,
1039                             '"%s" is "%s", expected "%s"'
1040                             % (path, str(result), str(value)))
1041
1042    def assert_no_active_block_jobs(self):
1043        result = self.vm.qmp('query-block-jobs')
1044        self.assert_qmp(result, 'return', [])
1045
1046    def assert_has_block_node(self, node_name=None, file_name=None):
1047        """Issue a query-named-block-nodes and assert node_name and/or
1048        file_name is present in the result"""
1049        def check_equal_or_none(a, b):
1050            return a is None or b is None or a == b
1051        assert node_name or file_name
1052        result = self.vm.qmp('query-named-block-nodes')
1053        for x in result["return"]:
1054            if check_equal_or_none(x.get("node-name"), node_name) and \
1055                    check_equal_or_none(x.get("file"), file_name):
1056                return
1057        self.fail("Cannot find %s %s in result:\n%s" %
1058                  (node_name, file_name, result))
1059
1060    def assert_json_filename_equal(self, json_filename, reference):
1061        '''Asserts that the given filename is a json: filename and that its
1062           content is equal to the given reference object'''
1063        self.assertEqual(json_filename[:5], 'json:')
1064        self.assertEqual(
1065            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1066            self.vm.flatten_qmp_object(reference)
1067        )
1068
1069    def cancel_and_wait(self, drive='drive0', force=False,
1070                        resume=False, wait=60.0):
1071        '''Cancel a block job and wait for it to finish, returning the event'''
1072        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1073        self.assert_qmp(result, 'return', {})
1074
1075        if resume:
1076            self.vm.resume_drive(drive)
1077
1078        cancelled = False
1079        result = None
1080        while not cancelled:
1081            for event in self.vm.get_qmp_events(wait=wait):
1082                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1083                   event['event'] == 'BLOCK_JOB_CANCELLED':
1084                    self.assert_qmp(event, 'data/device', drive)
1085                    result = event
1086                    cancelled = True
1087                elif event['event'] == 'JOB_STATUS_CHANGE':
1088                    self.assert_qmp(event, 'data/id', drive)
1089
1090
1091        self.assert_no_active_block_jobs()
1092        return result
1093
1094    def wait_until_completed(self, drive='drive0', check_offset=True,
1095                             wait=60.0, error=None):
1096        '''Wait for a block job to finish, returning the event'''
1097        while True:
1098            for event in self.vm.get_qmp_events(wait=wait):
1099                if event['event'] == 'BLOCK_JOB_COMPLETED':
1100                    self.assert_qmp(event, 'data/device', drive)
1101                    if error is None:
1102                        self.assert_qmp_absent(event, 'data/error')
1103                        if check_offset:
1104                            self.assert_qmp(event, 'data/offset',
1105                                            event['data']['len'])
1106                    else:
1107                        self.assert_qmp(event, 'data/error', error)
1108                    self.assert_no_active_block_jobs()
1109                    return event
1110                if event['event'] == 'JOB_STATUS_CHANGE':
1111                    self.assert_qmp(event, 'data/id', drive)
1112
1113    def wait_ready(self, drive='drive0'):
1114        """Wait until a BLOCK_JOB_READY event, and return the event."""
1115        return self.vm.events_wait([
1116            ('BLOCK_JOB_READY',
1117             {'data': {'type': 'mirror', 'device': drive}}),
1118            ('BLOCK_JOB_READY',
1119             {'data': {'type': 'commit', 'device': drive}})
1120        ])
1121
1122    def wait_ready_and_cancel(self, drive='drive0'):
1123        self.wait_ready(drive=drive)
1124        event = self.cancel_and_wait(drive=drive)
1125        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1126        self.assert_qmp(event, 'data/type', 'mirror')
1127        self.assert_qmp(event, 'data/offset', event['data']['len'])
1128
1129    def complete_and_wait(self, drive='drive0', wait_ready=True,
1130                          completion_error=None):
1131        '''Complete a block job and wait for it to finish'''
1132        if wait_ready:
1133            self.wait_ready(drive=drive)
1134
1135        result = self.vm.qmp('block-job-complete', device=drive)
1136        self.assert_qmp(result, 'return', {})
1137
1138        event = self.wait_until_completed(drive=drive, error=completion_error)
1139        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1140
1141    def pause_wait(self, job_id='job0'):
1142        with Timeout(3, "Timeout waiting for job to pause"):
1143            while True:
1144                result = self.vm.qmp('query-block-jobs')
1145                found = False
1146                for job in result['return']:
1147                    if job['device'] == job_id:
1148                        found = True
1149                        if job['paused'] and not job['busy']:
1150                            return job
1151                        break
1152                assert found
1153
1154    def pause_job(self, job_id='job0', wait=True):
1155        result = self.vm.qmp('block-job-pause', device=job_id)
1156        self.assert_qmp(result, 'return', {})
1157        if wait:
1158            return self.pause_wait(job_id)
1159        return result
1160
1161    def case_skip(self, reason):
1162        '''Skip this test case'''
1163        case_notrun(reason)
1164        self.skipTest(reason)
1165
1166
1167def notrun(reason):
1168    '''Skip this test suite'''
1169    # Each test in qemu-iotests has a number ("seq")
1170    seq = os.path.basename(sys.argv[0])
1171
1172    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1173            as outfile:
1174        outfile.write(reason + '\n')
1175    logger.warning("%s not run: %s", seq, reason)
1176    sys.exit(0)
1177
1178def case_notrun(reason):
1179    '''Mark this test case as not having been run (without actually
1180    skipping it, that is left to the caller).  See
1181    QMPTestCase.case_skip() for a variant that actually skips the
1182    current test case.'''
1183
1184    # Each test in qemu-iotests has a number ("seq")
1185    seq = os.path.basename(sys.argv[0])
1186
1187    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1188            as outfile:
1189        outfile.write('    [case not run] ' + reason + '\n')
1190
1191def _verify_image_format(supported_fmts: Sequence[str] = (),
1192                         unsupported_fmts: Sequence[str] = ()) -> None:
1193    if 'generic' in supported_fmts and \
1194            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1195        # similar to
1196        #   _supported_fmt generic
1197        # for bash tests
1198        supported_fmts = ()
1199
1200    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1201    if not_sup or (imgfmt in unsupported_fmts):
1202        notrun('not suitable for this image format: %s' % imgfmt)
1203
1204    if imgfmt == 'luks':
1205        verify_working_luks()
1206
1207def _verify_protocol(supported: Sequence[str] = (),
1208                     unsupported: Sequence[str] = ()) -> None:
1209    assert not (supported and unsupported)
1210
1211    if 'generic' in supported:
1212        return
1213
1214    not_sup = supported and (imgproto not in supported)
1215    if not_sup or (imgproto in unsupported):
1216        notrun('not suitable for this protocol: %s' % imgproto)
1217
1218def _verify_platform(supported: Sequence[str] = (),
1219                     unsupported: Sequence[str] = ()) -> None:
1220    if any((sys.platform.startswith(x) for x in unsupported)):
1221        notrun('not suitable for this OS: %s' % sys.platform)
1222
1223    if supported:
1224        if not any((sys.platform.startswith(x) for x in supported)):
1225            notrun('not suitable for this OS: %s' % sys.platform)
1226
1227def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1228    if supported_cache_modes and (cachemode not in supported_cache_modes):
1229        notrun('not suitable for this cache mode: %s' % cachemode)
1230
1231def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1232    if supported_aio_modes and (aiomode not in supported_aio_modes):
1233        notrun('not suitable for this aio mode: %s' % aiomode)
1234
1235def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1236    usf_list = list(set(required_formats) - set(supported_formats()))
1237    if usf_list:
1238        notrun(f'formats {usf_list} are not whitelisted')
1239
1240
1241def _verify_virtio_blk() -> None:
1242    out = qemu_pipe('-M', 'none', '-device', 'help')
1243    if 'virtio-blk' not in out:
1244        notrun('Missing virtio-blk in QEMU binary')
1245
1246def _verify_virtio_scsi_pci_or_ccw() -> None:
1247    out = qemu_pipe('-M', 'none', '-device', 'help')
1248    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1249        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1250
1251
1252def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1253    imgopts = os.environ.get('IMGOPTS')
1254    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1255    # but it supported only for bash tests. We don't have a concept of global
1256    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1257    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1258    unsup = list(unsupported) + ['$']
1259    if imgopts and any(x in imgopts for x in unsup):
1260        notrun(f'not suitable for this imgopts: {imgopts}')
1261
1262
1263def supports_quorum():
1264    return 'quorum' in qemu_img_pipe('--help')
1265
1266def verify_quorum():
1267    '''Skip test suite if quorum support is not available'''
1268    if not supports_quorum():
1269        notrun('quorum support missing')
1270
1271def has_working_luks() -> Tuple[bool, str]:
1272    """
1273    Check whether our LUKS driver can actually create images
1274    (this extends to LUKS encryption for qcow2).
1275
1276    If not, return the reason why.
1277    """
1278
1279    img_file = f'{test_dir}/luks-test.luks'
1280    (output, status) = \
1281        qemu_img_pipe_and_status('create', '-f', 'luks',
1282                                 '--object', luks_default_secret_object,
1283                                 '-o', luks_default_key_secret_opt,
1284                                 '-o', 'iter-time=10',
1285                                 img_file, '1G')
1286    try:
1287        os.remove(img_file)
1288    except OSError:
1289        pass
1290
1291    if status != 0:
1292        reason = output
1293        for line in output.splitlines():
1294            if img_file + ':' in line:
1295                reason = line.split(img_file + ':', 1)[1].strip()
1296                break
1297
1298        return (False, reason)
1299    else:
1300        return (True, '')
1301
1302def verify_working_luks():
1303    """
1304    Skip test suite if LUKS does not work
1305    """
1306    (working, reason) = has_working_luks()
1307    if not working:
1308        notrun(reason)
1309
1310def qemu_pipe(*args: str) -> str:
1311    """
1312    Run qemu with an option to print something and exit (e.g. a help option).
1313
1314    :return: QEMU's stdout output.
1315    """
1316    full_args = [qemu_prog] + qemu_opts + list(args)
1317    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1318    return output
1319
1320def supported_formats(read_only=False):
1321    '''Set 'read_only' to True to check ro-whitelist
1322       Otherwise, rw-whitelist is checked'''
1323
1324    if not hasattr(supported_formats, "formats"):
1325        supported_formats.formats = {}
1326
1327    if read_only not in supported_formats.formats:
1328        format_message = qemu_pipe("-drive", "format=help")
1329        line = 1 if read_only else 0
1330        supported_formats.formats[read_only] = \
1331            format_message.splitlines()[line].split(":")[1].split()
1332
1333    return supported_formats.formats[read_only]
1334
1335def skip_if_unsupported(required_formats=(), read_only=False):
1336    '''Skip Test Decorator
1337       Runs the test if all the required formats are whitelisted'''
1338    def skip_test_decorator(func):
1339        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1340                         **kwargs: Dict[str, Any]) -> None:
1341            if callable(required_formats):
1342                fmts = required_formats(test_case)
1343            else:
1344                fmts = required_formats
1345
1346            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1347            if usf_list:
1348                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1349                test_case.case_skip(msg)
1350            else:
1351                func(test_case, *args, **kwargs)
1352        return func_wrapper
1353    return skip_test_decorator
1354
1355def skip_for_formats(formats: Sequence[str] = ()) \
1356    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1357                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1358    '''Skip Test Decorator
1359       Skips the test for the given formats'''
1360    def skip_test_decorator(func):
1361        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1362                         **kwargs: Dict[str, Any]) -> None:
1363            if imgfmt in formats:
1364                msg = f'{test_case}: Skipped for format {imgfmt}'
1365                test_case.case_skip(msg)
1366            else:
1367                func(test_case, *args, **kwargs)
1368        return func_wrapper
1369    return skip_test_decorator
1370
1371def skip_if_user_is_root(func):
1372    '''Skip Test Decorator
1373       Runs the test only without root permissions'''
1374    def func_wrapper(*args, **kwargs):
1375        if os.getuid() == 0:
1376            case_notrun('{}: cannot be run as root'.format(args[0]))
1377            return None
1378        else:
1379            return func(*args, **kwargs)
1380    return func_wrapper
1381
1382# We need to filter out the time taken from the output so that
1383# qemu-iotest can reliably diff the results against master output,
1384# and hide skipped tests from the reference output.
1385
1386class ReproducibleTestResult(unittest.TextTestResult):
1387    def addSkip(self, test, reason):
1388        # Same as TextTestResult, but print dot instead of "s"
1389        unittest.TestResult.addSkip(self, test, reason)
1390        if self.showAll:
1391            self.stream.writeln("skipped {0!r}".format(reason))
1392        elif self.dots:
1393            self.stream.write(".")
1394            self.stream.flush()
1395
1396class ReproducibleStreamWrapper:
1397    def __init__(self, stream: TextIO):
1398        self.stream = stream
1399
1400    def __getattr__(self, attr):
1401        if attr in ('stream', '__getstate__'):
1402            raise AttributeError(attr)
1403        return getattr(self.stream, attr)
1404
1405    def write(self, arg=None):
1406        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1407        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1408        self.stream.write(arg)
1409
1410class ReproducibleTestRunner(unittest.TextTestRunner):
1411    def __init__(self, stream: Optional[TextIO] = None,
1412                 resultclass: Type[unittest.TestResult] =
1413                 ReproducibleTestResult,
1414                 **kwargs: Any) -> None:
1415        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1416        super().__init__(stream=rstream,           # type: ignore
1417                         descriptions=True,
1418                         resultclass=resultclass,
1419                         **kwargs)
1420
1421def execute_unittest(argv: List[str], debug: bool = False) -> None:
1422    """Executes unittests within the calling module."""
1423
1424    # Some tests have warnings, especially ResourceWarnings for unclosed
1425    # files and sockets.  Ignore them for now to ensure reproducibility of
1426    # the test output.
1427    unittest.main(argv=argv,
1428                  testRunner=ReproducibleTestRunner,
1429                  verbosity=2 if debug else 1,
1430                  warnings=None if sys.warnoptions else 'ignore')
1431
1432def execute_setup_common(supported_fmts: Sequence[str] = (),
1433                         supported_platforms: Sequence[str] = (),
1434                         supported_cache_modes: Sequence[str] = (),
1435                         supported_aio_modes: Sequence[str] = (),
1436                         unsupported_fmts: Sequence[str] = (),
1437                         supported_protocols: Sequence[str] = (),
1438                         unsupported_protocols: Sequence[str] = (),
1439                         required_fmts: Sequence[str] = (),
1440                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1441    """
1442    Perform necessary setup for either script-style or unittest-style tests.
1443
1444    :return: Bool; Whether or not debug mode has been requested via the CLI.
1445    """
1446    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1447
1448    debug = '-d' in sys.argv
1449    if debug:
1450        sys.argv.remove('-d')
1451    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1452
1453    _verify_image_format(supported_fmts, unsupported_fmts)
1454    _verify_protocol(supported_protocols, unsupported_protocols)
1455    _verify_platform(supported=supported_platforms)
1456    _verify_cache_mode(supported_cache_modes)
1457    _verify_aio_mode(supported_aio_modes)
1458    _verify_formats(required_fmts)
1459    _verify_virtio_blk()
1460    _verify_imgopts(unsupported_imgopts)
1461
1462    return debug
1463
1464def execute_test(*args, test_function=None, **kwargs):
1465    """Run either unittest or script-style tests."""
1466
1467    debug = execute_setup_common(*args, **kwargs)
1468    if not test_function:
1469        execute_unittest(sys.argv, debug)
1470    else:
1471        test_function()
1472
1473def activate_logging():
1474    """Activate iotests.log() output to stdout for script-style tests."""
1475    handler = logging.StreamHandler(stream=sys.stdout)
1476    formatter = logging.Formatter('%(message)s')
1477    handler.setFormatter(formatter)
1478    test_logger.addHandler(handler)
1479    test_logger.setLevel(logging.INFO)
1480    test_logger.propagate = False
1481
1482# This is called from script-style iotests without a single point of entry
1483def script_initialize(*args, **kwargs):
1484    """Initialize script-style tests without running any tests."""
1485    activate_logging()
1486    execute_setup_common(*args, **kwargs)
1487
1488# This is called from script-style iotests with a single point of entry
1489def script_main(test_function, *args, **kwargs):
1490    """Run script-style tests outside of the unittest framework"""
1491    activate_logging()
1492    execute_test(*args, test_function=test_function, **kwargs)
1493
1494# This is called from unittest style iotests
1495def main(*args, **kwargs):
1496    """Run tests using the unittest framework"""
1497    execute_test(*args, **kwargs)
1498