xref: /qemu/tests/qemu-iotests/iotests.py (revision 704a256d)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78test_dir = os.environ.get('TEST_DIR')
79sock_dir = os.environ.get('SOCK_DIR')
80output_dir = os.environ.get('OUTPUT_DIR', '.')
81cachemode = os.environ.get('CACHEMODE')
82aiomode = os.environ.get('AIOMODE')
83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87luks_default_secret_object = 'secret,id=keysec0,data=' + \
88                             os.environ.get('IMGKEYSECRET', '')
89luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93                              connect_stderr: bool = True) -> Tuple[str, int]:
94    """
95    Run a tool and return both its output and its exit code
96    """
97    stderr = subprocess.STDOUT if connect_stderr else None
98    subp = subprocess.Popen(args,
99                            stdout=subprocess.PIPE,
100                            stderr=stderr,
101                            universal_newlines=True)
102    output = subp.communicate()[0]
103    if subp.returncode < 0:
104        sys.stderr.write('%s received signal %i: %s\n'
105                         % (tool, -subp.returncode,
106                            ' '.join(qemu_img_args + list(args))))
107    return (output, subp.returncode)
108
109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110    """
111    Run qemu-img and return both its output and its exit code
112    """
113    full_args = qemu_img_args + list(args)
114    return qemu_tool_pipe_and_status('qemu-img', full_args)
115
116def qemu_img(*args: str) -> int:
117    '''Run qemu-img and return the exit code'''
118    return qemu_img_pipe_and_status(*args)[1]
119
120def ordered_qmp(qmsg, conv_keys=True):
121    # Dictionaries are not ordered prior to 3.6, therefore:
122    if isinstance(qmsg, list):
123        return [ordered_qmp(atom) for atom in qmsg]
124    if isinstance(qmsg, dict):
125        od = OrderedDict()
126        for k, v in sorted(qmsg.items()):
127            if conv_keys:
128                k = k.replace('_', '-')
129            od[k] = ordered_qmp(v, conv_keys=False)
130        return od
131    return qmsg
132
133def qemu_img_create(*args):
134    args = list(args)
135
136    # default luks support
137    if '-f' in args and args[args.index('-f') + 1] == 'luks':
138        if '-o' in args:
139            i = args.index('-o')
140            if 'key-secret' not in args[i + 1]:
141                args[i + 1].append(luks_default_key_secret_opt)
142                args.insert(i + 2, '--object')
143                args.insert(i + 3, luks_default_secret_object)
144        else:
145            args = ['-o', luks_default_key_secret_opt,
146                    '--object', luks_default_secret_object] + args
147
148    args.insert(0, 'create')
149
150    return qemu_img(*args)
151
152def qemu_img_measure(*args):
153    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154
155def qemu_img_check(*args):
156    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157
158def qemu_img_verbose(*args):
159    '''Run qemu-img without suppressing its output and return the exit code'''
160    exitcode = subprocess.call(qemu_img_args + list(args))
161    if exitcode < 0:
162        sys.stderr.write('qemu-img received signal %i: %s\n'
163                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
164    return exitcode
165
166def qemu_img_pipe(*args: str) -> str:
167    '''Run qemu-img and return its output'''
168    return qemu_img_pipe_and_status(*args)[0]
169
170def qemu_img_log(*args):
171    result = qemu_img_pipe(*args)
172    log(result, filters=[filter_testfiles])
173    return result
174
175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176    args = ['info']
177    if imgopts:
178        args.append('--image-opts')
179    else:
180        args += ['-f', imgfmt]
181    args += extra_args
182    args.append(filename)
183
184    output = qemu_img_pipe(*args)
185    if not filter_path:
186        filter_path = filename
187    log(filter_img_info(output, filter_path))
188
189def qemu_io(*args):
190    '''Run qemu-io and return the stdout data'''
191    args = qemu_io_args + list(args)
192    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193                            stderr=subprocess.STDOUT,
194                            universal_newlines=True)
195    output = subp.communicate()[0]
196    if subp.returncode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n'
198                         % (-subp.returncode, ' '.join(args)))
199    return output
200
201def qemu_io_log(*args):
202    result = qemu_io(*args)
203    log(result, filters=[filter_testfiles, filter_qemu_io])
204    return result
205
206def qemu_io_silent(*args):
207    '''Run qemu-io and return the exit code, suppressing stdout'''
208    args = qemu_io_args + list(args)
209    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
210    if exitcode < 0:
211        sys.stderr.write('qemu-io received signal %i: %s\n' %
212                         (-exitcode, ' '.join(args)))
213    return exitcode
214
215def qemu_io_silent_check(*args):
216    '''Run qemu-io and return the true if subprocess returned 0'''
217    args = qemu_io_args + list(args)
218    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
219                               stderr=subprocess.STDOUT)
220    return exitcode == 0
221
222def get_virtio_scsi_device():
223    if qemu_default_machine == 's390-ccw-virtio':
224        return 'virtio-scsi-ccw'
225    return 'virtio-scsi-pci'
226
227class QemuIoInteractive:
228    def __init__(self, *args):
229        self.args = qemu_io_args_no_fmt + list(args)
230        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
231                                   stdout=subprocess.PIPE,
232                                   stderr=subprocess.STDOUT,
233                                   universal_newlines=True)
234        out = self._p.stdout.read(9)
235        if out != 'qemu-io> ':
236            # Most probably qemu-io just failed to start.
237            # Let's collect the whole output and exit.
238            out += self._p.stdout.read()
239            self._p.wait(timeout=1)
240            raise ValueError(out)
241
242    def close(self):
243        self._p.communicate('q\n')
244
245    def _read_output(self):
246        pattern = 'qemu-io> '
247        n = len(pattern)
248        pos = 0
249        s = []
250        while pos != n:
251            c = self._p.stdout.read(1)
252            # check unexpected EOF
253            assert c != ''
254            s.append(c)
255            if c == pattern[pos]:
256                pos += 1
257            else:
258                pos = 0
259
260        return ''.join(s[:-n])
261
262    def cmd(self, cmd):
263        # quit command is in close(), '\n' is added automatically
264        assert '\n' not in cmd
265        cmd = cmd.strip()
266        assert cmd not in ('q', 'quit')
267        self._p.stdin.write(cmd + '\n')
268        self._p.stdin.flush()
269        return self._read_output()
270
271
272def qemu_nbd(*args):
273    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
274    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
275
276def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
277    '''Run qemu-nbd in daemon mode and return both the parent's exit code
278       and its output in case of an error'''
279    full_args = qemu_nbd_args + ['--fork'] + list(args)
280    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
281                                                   connect_stderr=False)
282    return returncode, output if returncode else ''
283
284def qemu_nbd_list_log(*args: str) -> str:
285    '''Run qemu-nbd to list remote exports'''
286    full_args = [qemu_nbd_prog, '-L'] + list(args)
287    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
288    log(output, filters=[filter_testfiles, filter_nbd_exports])
289    return output
290
291@contextmanager
292def qemu_nbd_popen(*args):
293    '''Context manager running qemu-nbd within the context'''
294    pid_file = file_path("pid")
295
296    cmd = list(qemu_nbd_args)
297    cmd.extend(('--persistent', '--pid-file', pid_file))
298    cmd.extend(args)
299
300    log('Start NBD server')
301    p = subprocess.Popen(cmd)
302    try:
303        while not os.path.exists(pid_file):
304            if p.poll() is not None:
305                raise RuntimeError(
306                    "qemu-nbd terminated with exit code {}: {}"
307                    .format(p.returncode, ' '.join(cmd)))
308
309            time.sleep(0.01)
310        yield
311    finally:
312        log('Kill NBD server')
313        p.kill()
314        p.wait()
315
316def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
317    '''Return True if two image files are identical'''
318    return qemu_img('compare', '-f', fmt1,
319                    '-F', fmt2, img1, img2) == 0
320
321def create_image(name, size):
322    '''Create a fully-allocated raw image with sector markers'''
323    file = open(name, 'wb')
324    i = 0
325    while i < size:
326        sector = struct.pack('>l504xl', i // 512, i // 512)
327        file.write(sector)
328        i = i + 512
329    file.close()
330
331def image_size(img):
332    '''Return image's virtual size'''
333    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
334    return json.loads(r)['virtual-size']
335
336def is_str(val):
337    return isinstance(val, str)
338
339test_dir_re = re.compile(r"%s" % test_dir)
340def filter_test_dir(msg):
341    return test_dir_re.sub("TEST_DIR", msg)
342
343win32_re = re.compile(r"\r")
344def filter_win32(msg):
345    return win32_re.sub("", msg)
346
347qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
348                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
349                        r"and [0-9\/.inf]* ops\/sec\)")
350def filter_qemu_io(msg):
351    msg = filter_win32(msg)
352    return qemu_io_re.sub("X ops; XX:XX:XX.X "
353                          "(XXX YYY/sec and XXX ops/sec)", msg)
354
355chown_re = re.compile(r"chown [0-9]+:[0-9]+")
356def filter_chown(msg):
357    return chown_re.sub("chown UID:GID", msg)
358
359def filter_qmp_event(event):
360    '''Filter a QMP event dict'''
361    event = dict(event)
362    if 'timestamp' in event:
363        event['timestamp']['seconds'] = 'SECS'
364        event['timestamp']['microseconds'] = 'USECS'
365    return event
366
367def filter_qmp(qmsg, filter_fn):
368    '''Given a string filter, filter a QMP object's values.
369    filter_fn takes a (key, value) pair.'''
370    # Iterate through either lists or dicts;
371    if isinstance(qmsg, list):
372        items = enumerate(qmsg)
373    else:
374        items = qmsg.items()
375
376    for k, v in items:
377        if isinstance(v, (dict, list)):
378            qmsg[k] = filter_qmp(v, filter_fn)
379        else:
380            qmsg[k] = filter_fn(k, v)
381    return qmsg
382
383def filter_testfiles(msg):
384    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
385    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
386    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
387
388def filter_qmp_testfiles(qmsg):
389    def _filter(_key, value):
390        if is_str(value):
391            return filter_testfiles(value)
392        return value
393    return filter_qmp(qmsg, _filter)
394
395def filter_generated_node_ids(msg):
396    return re.sub("#block[0-9]+", "NODE_NAME", msg)
397
398def filter_img_info(output, filename):
399    lines = []
400    for line in output.split('\n'):
401        if 'disk size' in line or 'actual-size' in line:
402            continue
403        line = line.replace(filename, 'TEST_IMG')
404        line = filter_testfiles(line)
405        line = line.replace(imgfmt, 'IMGFMT')
406        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
407        line = re.sub('uuid: [-a-f0-9]+',
408                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
409                      line)
410        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
411        lines.append(line)
412    return '\n'.join(lines)
413
414def filter_imgfmt(msg):
415    return msg.replace(imgfmt, 'IMGFMT')
416
417def filter_qmp_imgfmt(qmsg):
418    def _filter(_key, value):
419        if is_str(value):
420            return filter_imgfmt(value)
421        return value
422    return filter_qmp(qmsg, _filter)
423
424def filter_nbd_exports(output: str) -> str:
425    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
426
427
428Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
429
430def log(msg: Msg,
431        filters: Iterable[Callable[[Msg], Msg]] = (),
432        indent: Optional[int] = None) -> None:
433    """
434    Logs either a string message or a JSON serializable message (like QMP).
435    If indent is provided, JSON serializable messages are pretty-printed.
436    """
437    for flt in filters:
438        msg = flt(msg)
439    if isinstance(msg, (dict, list)):
440        # Don't sort if it's already sorted
441        do_sort = not isinstance(msg, OrderedDict)
442        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
443    else:
444        test_logger.info(msg)
445
446class Timeout:
447    def __init__(self, seconds, errmsg="Timeout"):
448        self.seconds = seconds
449        self.errmsg = errmsg
450    def __enter__(self):
451        signal.signal(signal.SIGALRM, self.timeout)
452        signal.setitimer(signal.ITIMER_REAL, self.seconds)
453        return self
454    def __exit__(self, exc_type, value, traceback):
455        signal.setitimer(signal.ITIMER_REAL, 0)
456        return False
457    def timeout(self, signum, frame):
458        raise Exception(self.errmsg)
459
460def file_pattern(name):
461    return "{0}-{1}".format(os.getpid(), name)
462
463class FilePath:
464    """
465    Context manager generating multiple file names. The generated files are
466    removed when exiting the context.
467
468    Example usage:
469
470        with FilePath('a.img', 'b.img') as (img_a, img_b):
471            # Use img_a and img_b here...
472
473        # a.img and b.img are automatically removed here.
474
475    By default images are created in iotests.test_dir. To create sockets use
476    iotests.sock_dir:
477
478       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
479
480    For convenience, calling with one argument yields a single file instead of
481    a tuple with one item.
482
483    """
484    def __init__(self, *names, base_dir=test_dir):
485        self.paths = [os.path.join(base_dir, file_pattern(name))
486                      for name in names]
487
488    def __enter__(self):
489        if len(self.paths) == 1:
490            return self.paths[0]
491        else:
492            return self.paths
493
494    def __exit__(self, exc_type, exc_val, exc_tb):
495        for path in self.paths:
496            try:
497                os.remove(path)
498            except OSError:
499                pass
500        return False
501
502
503def file_path_remover():
504    for path in reversed(file_path_remover.paths):
505        try:
506            os.remove(path)
507        except OSError:
508            pass
509
510
511def file_path(*names, base_dir=test_dir):
512    ''' Another way to get auto-generated filename that cleans itself up.
513
514    Use is as simple as:
515
516    img_a, img_b = file_path('a.img', 'b.img')
517    sock = file_path('socket')
518    '''
519
520    if not hasattr(file_path_remover, 'paths'):
521        file_path_remover.paths = []
522        atexit.register(file_path_remover)
523
524    paths = []
525    for name in names:
526        filename = file_pattern(name)
527        path = os.path.join(base_dir, filename)
528        file_path_remover.paths.append(path)
529        paths.append(path)
530
531    return paths[0] if len(paths) == 1 else paths
532
533def remote_filename(path):
534    if imgproto == 'file':
535        return path
536    elif imgproto == 'ssh':
537        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
538    else:
539        raise Exception("Protocol %s not supported" % (imgproto))
540
541class VM(qtest.QEMUQtestMachine):
542    '''A QEMU VM'''
543
544    def __init__(self, path_suffix=''):
545        name = "qemu%s-%d" % (path_suffix, os.getpid())
546        super().__init__(qemu_prog, qemu_opts, name=name,
547                         test_dir=test_dir,
548                         socket_scm_helper=socket_scm_helper,
549                         sock_dir=sock_dir)
550        self._num_drives = 0
551
552    def add_object(self, opts):
553        self._args.append('-object')
554        self._args.append(opts)
555        return self
556
557    def add_device(self, opts):
558        self._args.append('-device')
559        self._args.append(opts)
560        return self
561
562    def add_drive_raw(self, opts):
563        self._args.append('-drive')
564        self._args.append(opts)
565        return self
566
567    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
568        '''Add a virtio-blk drive to the VM'''
569        options = ['if=%s' % interface,
570                   'id=drive%d' % self._num_drives]
571
572        if path is not None:
573            options.append('file=%s' % path)
574            options.append('format=%s' % img_format)
575            options.append('cache=%s' % cachemode)
576            options.append('aio=%s' % aiomode)
577
578        if opts:
579            options.append(opts)
580
581        if img_format == 'luks' and 'key-secret' not in opts:
582            # default luks support
583            if luks_default_secret_object not in self._args:
584                self.add_object(luks_default_secret_object)
585
586            options.append(luks_default_key_secret_opt)
587
588        self._args.append('-drive')
589        self._args.append(','.join(options))
590        self._num_drives += 1
591        return self
592
593    def add_blockdev(self, opts):
594        self._args.append('-blockdev')
595        if isinstance(opts, str):
596            self._args.append(opts)
597        else:
598            self._args.append(','.join(opts))
599        return self
600
601    def add_incoming(self, addr):
602        self._args.append('-incoming')
603        self._args.append(addr)
604        return self
605
606    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
607        cmd = 'human-monitor-command'
608        kwargs: Dict[str, Any] = {'command-line': command_line}
609        if use_log:
610            return self.qmp_log(cmd, **kwargs)
611        else:
612            return self.qmp(cmd, **kwargs)
613
614    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
615        """Pause drive r/w operations"""
616        if not event:
617            self.pause_drive(drive, "read_aio")
618            self.pause_drive(drive, "write_aio")
619            return
620        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
621
622    def resume_drive(self, drive: str) -> None:
623        """Resume drive r/w operations"""
624        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
625
626    def hmp_qemu_io(self, drive: str, cmd: str,
627                    use_log: bool = False) -> QMPMessage:
628        """Write to a given drive using an HMP command"""
629        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
630
631    def flatten_qmp_object(self, obj, output=None, basestr=''):
632        if output is None:
633            output = dict()
634        if isinstance(obj, list):
635            for i, item in enumerate(obj):
636                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
637        elif isinstance(obj, dict):
638            for key in obj:
639                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
640        else:
641            output[basestr[:-1]] = obj # Strip trailing '.'
642        return output
643
644    def qmp_to_opts(self, obj):
645        obj = self.flatten_qmp_object(obj)
646        output_list = list()
647        for key in obj:
648            output_list += [key + '=' + obj[key]]
649        return ','.join(output_list)
650
651    def get_qmp_events_filtered(self, wait=60.0):
652        result = []
653        for ev in self.get_qmp_events(wait=wait):
654            result.append(filter_qmp_event(ev))
655        return result
656
657    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
658        full_cmd = OrderedDict((
659            ("execute", cmd),
660            ("arguments", ordered_qmp(kwargs))
661        ))
662        log(full_cmd, filters, indent=indent)
663        result = self.qmp(cmd, **kwargs)
664        log(result, filters, indent=indent)
665        return result
666
667    # Returns None on success, and an error string on failure
668    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
669                pre_finalize=None, cancel=False, wait=60.0):
670        """
671        run_job moves a job from creation through to dismissal.
672
673        :param job: String. ID of recently-launched job
674        :param auto_finalize: Bool. True if the job was launched with
675                              auto_finalize. Defaults to True.
676        :param auto_dismiss: Bool. True if the job was launched with
677                             auto_dismiss=True. Defaults to False.
678        :param pre_finalize: Callback. A callable that takes no arguments to be
679                             invoked prior to issuing job-finalize, if any.
680        :param cancel: Bool. When true, cancels the job after the pre_finalize
681                       callback.
682        :param wait: Float. Timeout value specifying how long to wait for any
683                     event, in seconds. Defaults to 60.0.
684        """
685        match_device = {'data': {'device': job}}
686        match_id = {'data': {'id': job}}
687        events = [
688            ('BLOCK_JOB_COMPLETED', match_device),
689            ('BLOCK_JOB_CANCELLED', match_device),
690            ('BLOCK_JOB_ERROR', match_device),
691            ('BLOCK_JOB_READY', match_device),
692            ('BLOCK_JOB_PENDING', match_id),
693            ('JOB_STATUS_CHANGE', match_id)
694        ]
695        error = None
696        while True:
697            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
698            if ev['event'] != 'JOB_STATUS_CHANGE':
699                log(ev)
700                continue
701            status = ev['data']['status']
702            if status == 'aborting':
703                result = self.qmp('query-jobs')
704                for j in result['return']:
705                    if j['id'] == job:
706                        error = j['error']
707                        log('Job failed: %s' % (j['error']))
708            elif status == 'ready':
709                self.qmp_log('job-complete', id=job)
710            elif status == 'pending' and not auto_finalize:
711                if pre_finalize:
712                    pre_finalize()
713                if cancel:
714                    self.qmp_log('job-cancel', id=job)
715                else:
716                    self.qmp_log('job-finalize', id=job)
717            elif status == 'concluded' and not auto_dismiss:
718                self.qmp_log('job-dismiss', id=job)
719            elif status == 'null':
720                return error
721
722    # Returns None on success, and an error string on failure
723    def blockdev_create(self, options, job_id='job0', filters=None):
724        if filters is None:
725            filters = [filter_qmp_testfiles]
726        result = self.qmp_log('blockdev-create', filters=filters,
727                              job_id=job_id, options=options)
728
729        if 'return' in result:
730            assert result['return'] == {}
731            job_result = self.run_job(job_id)
732        else:
733            job_result = result['error']
734
735        log("")
736        return job_result
737
738    def enable_migration_events(self, name):
739        log('Enabling migration QMP events on %s...' % name)
740        log(self.qmp('migrate-set-capabilities', capabilities=[
741            {
742                'capability': 'events',
743                'state': True
744            }
745        ]))
746
747    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
748        while True:
749            event = self.event_wait('MIGRATION')
750            # We use the default timeout, and with a timeout, event_wait()
751            # never returns None
752            assert event
753
754            log(event, filters=[filter_qmp_event])
755            if event['data']['status'] in ('completed', 'failed'):
756                break
757
758        if event['data']['status'] == 'completed':
759            # The event may occur in finish-migrate, so wait for the expected
760            # post-migration runstate
761            runstate = None
762            while runstate != expect_runstate:
763                runstate = self.qmp('query-status')['return']['status']
764            return True
765        else:
766            return False
767
768    def node_info(self, node_name):
769        nodes = self.qmp('query-named-block-nodes')
770        for x in nodes['return']:
771            if x['node-name'] == node_name:
772                return x
773        return None
774
775    def query_bitmaps(self):
776        res = self.qmp("query-named-block-nodes")
777        return {device['node-name']: device['dirty-bitmaps']
778                for device in res['return'] if 'dirty-bitmaps' in device}
779
780    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
781        """
782        get a specific bitmap from the object returned by query_bitmaps.
783        :param recording: If specified, filter results by the specified value.
784        :param bitmaps: If specified, use it instead of call query_bitmaps()
785        """
786        if bitmaps is None:
787            bitmaps = self.query_bitmaps()
788
789        for bitmap in bitmaps[node_name]:
790            if bitmap.get('name', '') == bitmap_name:
791                if recording is None or bitmap.get('recording') == recording:
792                    return bitmap
793        return None
794
795    def check_bitmap_status(self, node_name, bitmap_name, fields):
796        ret = self.get_bitmap(node_name, bitmap_name)
797
798        return fields.items() <= ret.items()
799
800    def assert_block_path(self, root, path, expected_node, graph=None):
801        """
802        Check whether the node under the given path in the block graph
803        is @expected_node.
804
805        @root is the node name of the node where the @path is rooted.
806
807        @path is a string that consists of child names separated by
808        slashes.  It must begin with a slash.
809
810        Examples for @root + @path:
811          - root="qcow2-node", path="/backing/file"
812          - root="quorum-node", path="/children.2/file"
813
814        Hypothetically, @path could be empty, in which case it would
815        point to @root.  However, in practice this case is not useful
816        and hence not allowed.
817
818        @expected_node may be None.  (All elements of the path but the
819        leaf must still exist.)
820
821        @graph may be None or the result of an x-debug-query-block-graph
822        call that has already been performed.
823        """
824        if graph is None:
825            graph = self.qmp('x-debug-query-block-graph')['return']
826
827        iter_path = iter(path.split('/'))
828
829        # Must start with a /
830        assert next(iter_path) == ''
831
832        node = next((node for node in graph['nodes'] if node['name'] == root),
833                    None)
834
835        # An empty @path is not allowed, so the root node must be present
836        assert node is not None, 'Root node %s not found' % root
837
838        for child_name in iter_path:
839            assert node is not None, 'Cannot follow path %s%s' % (root, path)
840
841            try:
842                node_id = next(edge['child'] for edge in graph['edges']
843                               if (edge['parent'] == node['id'] and
844                                   edge['name'] == child_name))
845
846                node = next(node for node in graph['nodes']
847                            if node['id'] == node_id)
848
849            except StopIteration:
850                node = None
851
852        if node is None:
853            assert expected_node is None, \
854                   'No node found under %s (but expected %s)' % \
855                   (path, expected_node)
856        else:
857            assert node['name'] == expected_node, \
858                   'Found node %s under %s (but expected %s)' % \
859                   (node['name'], path, expected_node)
860
861index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
862
863class QMPTestCase(unittest.TestCase):
864    '''Abstract base class for QMP test cases'''
865
866    def __init__(self, *args, **kwargs):
867        super().__init__(*args, **kwargs)
868        # Many users of this class set a VM property we rely on heavily
869        # in the methods below.
870        self.vm = None
871
872    def dictpath(self, d, path):
873        '''Traverse a path in a nested dict'''
874        for component in path.split('/'):
875            m = index_re.match(component)
876            if m:
877                component, idx = m.groups()
878                idx = int(idx)
879
880            if not isinstance(d, dict) or component not in d:
881                self.fail(f'failed path traversal for "{path}" in "{d}"')
882            d = d[component]
883
884            if m:
885                if not isinstance(d, list):
886                    self.fail(f'path component "{component}" in "{path}" '
887                              f'is not a list in "{d}"')
888                try:
889                    d = d[idx]
890                except IndexError:
891                    self.fail(f'invalid index "{idx}" in path "{path}" '
892                              f'in "{d}"')
893        return d
894
895    def assert_qmp_absent(self, d, path):
896        try:
897            result = self.dictpath(d, path)
898        except AssertionError:
899            return
900        self.fail('path "%s" has value "%s"' % (path, str(result)))
901
902    def assert_qmp(self, d, path, value):
903        '''Assert that the value for a specific path in a QMP dict
904           matches.  When given a list of values, assert that any of
905           them matches.'''
906
907        result = self.dictpath(d, path)
908
909        # [] makes no sense as a list of valid values, so treat it as
910        # an actual single value.
911        if isinstance(value, list) and value != []:
912            for v in value:
913                if result == v:
914                    return
915            self.fail('no match for "%s" in %s' % (str(result), str(value)))
916        else:
917            self.assertEqual(result, value,
918                             '"%s" is "%s", expected "%s"'
919                             % (path, str(result), str(value)))
920
921    def assert_no_active_block_jobs(self):
922        result = self.vm.qmp('query-block-jobs')
923        self.assert_qmp(result, 'return', [])
924
925    def assert_has_block_node(self, node_name=None, file_name=None):
926        """Issue a query-named-block-nodes and assert node_name and/or
927        file_name is present in the result"""
928        def check_equal_or_none(a, b):
929            return a is None or b is None or a == b
930        assert node_name or file_name
931        result = self.vm.qmp('query-named-block-nodes')
932        for x in result["return"]:
933            if check_equal_or_none(x.get("node-name"), node_name) and \
934                    check_equal_or_none(x.get("file"), file_name):
935                return
936        self.fail("Cannot find %s %s in result:\n%s" %
937                  (node_name, file_name, result))
938
939    def assert_json_filename_equal(self, json_filename, reference):
940        '''Asserts that the given filename is a json: filename and that its
941           content is equal to the given reference object'''
942        self.assertEqual(json_filename[:5], 'json:')
943        self.assertEqual(
944            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
945            self.vm.flatten_qmp_object(reference)
946        )
947
948    def cancel_and_wait(self, drive='drive0', force=False,
949                        resume=False, wait=60.0):
950        '''Cancel a block job and wait for it to finish, returning the event'''
951        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
952        self.assert_qmp(result, 'return', {})
953
954        if resume:
955            self.vm.resume_drive(drive)
956
957        cancelled = False
958        result = None
959        while not cancelled:
960            for event in self.vm.get_qmp_events(wait=wait):
961                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
962                   event['event'] == 'BLOCK_JOB_CANCELLED':
963                    self.assert_qmp(event, 'data/device', drive)
964                    result = event
965                    cancelled = True
966                elif event['event'] == 'JOB_STATUS_CHANGE':
967                    self.assert_qmp(event, 'data/id', drive)
968
969
970        self.assert_no_active_block_jobs()
971        return result
972
973    def wait_until_completed(self, drive='drive0', check_offset=True,
974                             wait=60.0, error=None):
975        '''Wait for a block job to finish, returning the event'''
976        while True:
977            for event in self.vm.get_qmp_events(wait=wait):
978                if event['event'] == 'BLOCK_JOB_COMPLETED':
979                    self.assert_qmp(event, 'data/device', drive)
980                    if error is None:
981                        self.assert_qmp_absent(event, 'data/error')
982                        if check_offset:
983                            self.assert_qmp(event, 'data/offset',
984                                            event['data']['len'])
985                    else:
986                        self.assert_qmp(event, 'data/error', error)
987                    self.assert_no_active_block_jobs()
988                    return event
989                if event['event'] == 'JOB_STATUS_CHANGE':
990                    self.assert_qmp(event, 'data/id', drive)
991
992    def wait_ready(self, drive='drive0'):
993        """Wait until a BLOCK_JOB_READY event, and return the event."""
994        return self.vm.events_wait([
995            ('BLOCK_JOB_READY',
996             {'data': {'type': 'mirror', 'device': drive}}),
997            ('BLOCK_JOB_READY',
998             {'data': {'type': 'commit', 'device': drive}})
999        ])
1000
1001    def wait_ready_and_cancel(self, drive='drive0'):
1002        self.wait_ready(drive=drive)
1003        event = self.cancel_and_wait(drive=drive)
1004        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1005        self.assert_qmp(event, 'data/type', 'mirror')
1006        self.assert_qmp(event, 'data/offset', event['data']['len'])
1007
1008    def complete_and_wait(self, drive='drive0', wait_ready=True,
1009                          completion_error=None):
1010        '''Complete a block job and wait for it to finish'''
1011        if wait_ready:
1012            self.wait_ready(drive=drive)
1013
1014        result = self.vm.qmp('block-job-complete', device=drive)
1015        self.assert_qmp(result, 'return', {})
1016
1017        event = self.wait_until_completed(drive=drive, error=completion_error)
1018        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1019
1020    def pause_wait(self, job_id='job0'):
1021        with Timeout(3, "Timeout waiting for job to pause"):
1022            while True:
1023                result = self.vm.qmp('query-block-jobs')
1024                found = False
1025                for job in result['return']:
1026                    if job['device'] == job_id:
1027                        found = True
1028                        if job['paused'] and not job['busy']:
1029                            return job
1030                        break
1031                assert found
1032
1033    def pause_job(self, job_id='job0', wait=True):
1034        result = self.vm.qmp('block-job-pause', device=job_id)
1035        self.assert_qmp(result, 'return', {})
1036        if wait:
1037            return self.pause_wait(job_id)
1038        return result
1039
1040    def case_skip(self, reason):
1041        '''Skip this test case'''
1042        case_notrun(reason)
1043        self.skipTest(reason)
1044
1045
1046def notrun(reason):
1047    '''Skip this test suite'''
1048    # Each test in qemu-iotests has a number ("seq")
1049    seq = os.path.basename(sys.argv[0])
1050
1051    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1052    logger.warning("%s not run: %s", seq, reason)
1053    sys.exit(0)
1054
1055def case_notrun(reason):
1056    '''Mark this test case as not having been run (without actually
1057    skipping it, that is left to the caller).  See
1058    QMPTestCase.case_skip() for a variant that actually skips the
1059    current test case.'''
1060
1061    # Each test in qemu-iotests has a number ("seq")
1062    seq = os.path.basename(sys.argv[0])
1063
1064    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1065        '    [case not run] ' + reason + '\n')
1066
1067def _verify_image_format(supported_fmts: Sequence[str] = (),
1068                         unsupported_fmts: Sequence[str] = ()) -> None:
1069    if 'generic' in supported_fmts and \
1070            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1071        # similar to
1072        #   _supported_fmt generic
1073        # for bash tests
1074        supported_fmts = ()
1075
1076    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1077    if not_sup or (imgfmt in unsupported_fmts):
1078        notrun('not suitable for this image format: %s' % imgfmt)
1079
1080    if imgfmt == 'luks':
1081        verify_working_luks()
1082
1083def _verify_protocol(supported: Sequence[str] = (),
1084                     unsupported: Sequence[str] = ()) -> None:
1085    assert not (supported and unsupported)
1086
1087    if 'generic' in supported:
1088        return
1089
1090    not_sup = supported and (imgproto not in supported)
1091    if not_sup or (imgproto in unsupported):
1092        notrun('not suitable for this protocol: %s' % imgproto)
1093
1094def _verify_platform(supported: Sequence[str] = (),
1095                     unsupported: Sequence[str] = ()) -> None:
1096    if any((sys.platform.startswith(x) for x in unsupported)):
1097        notrun('not suitable for this OS: %s' % sys.platform)
1098
1099    if supported:
1100        if not any((sys.platform.startswith(x) for x in supported)):
1101            notrun('not suitable for this OS: %s' % sys.platform)
1102
1103def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1104    if supported_cache_modes and (cachemode not in supported_cache_modes):
1105        notrun('not suitable for this cache mode: %s' % cachemode)
1106
1107def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1108    if supported_aio_modes and (aiomode not in supported_aio_modes):
1109        notrun('not suitable for this aio mode: %s' % aiomode)
1110
1111def supports_quorum():
1112    return 'quorum' in qemu_img_pipe('--help')
1113
1114def verify_quorum():
1115    '''Skip test suite if quorum support is not available'''
1116    if not supports_quorum():
1117        notrun('quorum support missing')
1118
1119def has_working_luks() -> Tuple[bool, str]:
1120    """
1121    Check whether our LUKS driver can actually create images
1122    (this extends to LUKS encryption for qcow2).
1123
1124    If not, return the reason why.
1125    """
1126
1127    img_file = f'{test_dir}/luks-test.luks'
1128    (output, status) = \
1129        qemu_img_pipe_and_status('create', '-f', 'luks',
1130                                 '--object', luks_default_secret_object,
1131                                 '-o', luks_default_key_secret_opt,
1132                                 '-o', 'iter-time=10',
1133                                 img_file, '1G')
1134    try:
1135        os.remove(img_file)
1136    except OSError:
1137        pass
1138
1139    if status != 0:
1140        reason = output
1141        for line in output.splitlines():
1142            if img_file + ':' in line:
1143                reason = line.split(img_file + ':', 1)[1].strip()
1144                break
1145
1146        return (False, reason)
1147    else:
1148        return (True, '')
1149
1150def verify_working_luks():
1151    """
1152    Skip test suite if LUKS does not work
1153    """
1154    (working, reason) = has_working_luks()
1155    if not working:
1156        notrun(reason)
1157
1158def qemu_pipe(*args: str) -> str:
1159    """
1160    Run qemu with an option to print something and exit (e.g. a help option).
1161
1162    :return: QEMU's stdout output.
1163    """
1164    full_args = [qemu_prog] + qemu_opts + list(args)
1165    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1166    return output
1167
1168def supported_formats(read_only=False):
1169    '''Set 'read_only' to True to check ro-whitelist
1170       Otherwise, rw-whitelist is checked'''
1171
1172    if not hasattr(supported_formats, "formats"):
1173        supported_formats.formats = {}
1174
1175    if read_only not in supported_formats.formats:
1176        format_message = qemu_pipe("-drive", "format=help")
1177        line = 1 if read_only else 0
1178        supported_formats.formats[read_only] = \
1179            format_message.splitlines()[line].split(":")[1].split()
1180
1181    return supported_formats.formats[read_only]
1182
1183def skip_if_unsupported(required_formats=(), read_only=False):
1184    '''Skip Test Decorator
1185       Runs the test if all the required formats are whitelisted'''
1186    def skip_test_decorator(func):
1187        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1188                         **kwargs: Dict[str, Any]) -> None:
1189            if callable(required_formats):
1190                fmts = required_formats(test_case)
1191            else:
1192                fmts = required_formats
1193
1194            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1195            if usf_list:
1196                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1197                test_case.case_skip(msg)
1198            else:
1199                func(test_case, *args, **kwargs)
1200        return func_wrapper
1201    return skip_test_decorator
1202
1203def skip_for_formats(formats: Sequence[str] = ()) \
1204    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1205                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1206    '''Skip Test Decorator
1207       Skips the test for the given formats'''
1208    def skip_test_decorator(func):
1209        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1210                         **kwargs: Dict[str, Any]) -> None:
1211            if imgfmt in formats:
1212                msg = f'{test_case}: Skipped for format {imgfmt}'
1213                test_case.case_skip(msg)
1214            else:
1215                func(test_case, *args, **kwargs)
1216        return func_wrapper
1217    return skip_test_decorator
1218
1219def skip_if_user_is_root(func):
1220    '''Skip Test Decorator
1221       Runs the test only without root permissions'''
1222    def func_wrapper(*args, **kwargs):
1223        if os.getuid() == 0:
1224            case_notrun('{}: cannot be run as root'.format(args[0]))
1225            return None
1226        else:
1227            return func(*args, **kwargs)
1228    return func_wrapper
1229
1230def execute_unittest(debug=False):
1231    """Executes unittests within the calling module."""
1232
1233    verbosity = 2 if debug else 1
1234
1235    if debug:
1236        output = sys.stdout
1237    else:
1238        # We need to filter out the time taken from the output so that
1239        # qemu-iotest can reliably diff the results against master output.
1240        output = io.StringIO()
1241
1242    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1243                                     verbosity=verbosity)
1244    try:
1245        # unittest.main() will use sys.exit(); so expect a SystemExit
1246        # exception
1247        unittest.main(testRunner=runner)
1248    finally:
1249        # We need to filter out the time taken from the output so that
1250        # qemu-iotest can reliably diff the results against master output.
1251        if not debug:
1252            out = output.getvalue()
1253            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1254
1255            # Hide skipped tests from the reference output
1256            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1257            out_first_line, out_rest = out.split('\n', 1)
1258            out = out_first_line.replace('s', '.') + '\n' + out_rest
1259
1260            sys.stderr.write(out)
1261
1262def execute_setup_common(supported_fmts: Sequence[str] = (),
1263                         supported_platforms: Sequence[str] = (),
1264                         supported_cache_modes: Sequence[str] = (),
1265                         supported_aio_modes: Sequence[str] = (),
1266                         unsupported_fmts: Sequence[str] = (),
1267                         supported_protocols: Sequence[str] = (),
1268                         unsupported_protocols: Sequence[str] = ()) -> bool:
1269    """
1270    Perform necessary setup for either script-style or unittest-style tests.
1271
1272    :return: Bool; Whether or not debug mode has been requested via the CLI.
1273    """
1274    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1275
1276    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1277    # indicate that we're not being run via "check". There may be
1278    # other things set up by "check" that individual test cases rely
1279    # on.
1280    if test_dir is None or qemu_default_machine is None:
1281        sys.stderr.write('Please run this test via the "check" script\n')
1282        sys.exit(os.EX_USAGE)
1283
1284    debug = '-d' in sys.argv
1285    if debug:
1286        sys.argv.remove('-d')
1287    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1288
1289    _verify_image_format(supported_fmts, unsupported_fmts)
1290    _verify_protocol(supported_protocols, unsupported_protocols)
1291    _verify_platform(supported=supported_platforms)
1292    _verify_cache_mode(supported_cache_modes)
1293    _verify_aio_mode(supported_aio_modes)
1294
1295    return debug
1296
1297def execute_test(*args, test_function=None, **kwargs):
1298    """Run either unittest or script-style tests."""
1299
1300    debug = execute_setup_common(*args, **kwargs)
1301    if not test_function:
1302        execute_unittest(debug)
1303    else:
1304        test_function()
1305
1306def activate_logging():
1307    """Activate iotests.log() output to stdout for script-style tests."""
1308    handler = logging.StreamHandler(stream=sys.stdout)
1309    formatter = logging.Formatter('%(message)s')
1310    handler.setFormatter(formatter)
1311    test_logger.addHandler(handler)
1312    test_logger.setLevel(logging.INFO)
1313    test_logger.propagate = False
1314
1315# This is called from script-style iotests without a single point of entry
1316def script_initialize(*args, **kwargs):
1317    """Initialize script-style tests without running any tests."""
1318    activate_logging()
1319    execute_setup_common(*args, **kwargs)
1320
1321# This is called from script-style iotests with a single point of entry
1322def script_main(test_function, *args, **kwargs):
1323    """Run script-style tests outside of the unittest framework"""
1324    activate_logging()
1325    execute_test(*args, test_function=test_function, **kwargs)
1326
1327# This is called from unittest style iotests
1328def main(*args, **kwargs):
1329    """Run tests using the unittest framework"""
1330    execute_test(*args, **kwargs)
1331