xref: /qemu/tests/qemu-iotests/iotests.py (revision 8063396b)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43assert sys.version_info >= (3, 6)
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_measure(*args):
145    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
146
147def qemu_img_check(*args):
148    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
149
150def qemu_img_verbose(*args):
151    '''Run qemu-img without suppressing its output and return the exit code'''
152    exitcode = subprocess.call(qemu_img_args + list(args))
153    if exitcode < 0:
154        sys.stderr.write('qemu-img received signal %i: %s\n'
155                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
156    return exitcode
157
158def qemu_img_pipe(*args: str) -> str:
159    '''Run qemu-img and return its output'''
160    return qemu_img_pipe_and_status(*args)[0]
161
162def qemu_img_log(*args):
163    result = qemu_img_pipe(*args)
164    log(result, filters=[filter_testfiles])
165    return result
166
167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168    args = ['info']
169    if imgopts:
170        args.append('--image-opts')
171    else:
172        args += ['-f', imgfmt]
173    args += extra_args
174    args.append(filename)
175
176    output = qemu_img_pipe(*args)
177    if not filter_path:
178        filter_path = filename
179    log(filter_img_info(output, filter_path))
180
181def qemu_io(*args):
182    '''Run qemu-io and return the stdout data'''
183    args = qemu_io_args + list(args)
184    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185                            stderr=subprocess.STDOUT,
186                            universal_newlines=True)
187    output = subp.communicate()[0]
188    if subp.returncode < 0:
189        sys.stderr.write('qemu-io received signal %i: %s\n'
190                         % (-subp.returncode, ' '.join(args)))
191    return output
192
193def qemu_io_log(*args):
194    result = qemu_io(*args)
195    log(result, filters=[filter_testfiles, filter_qemu_io])
196    return result
197
198def qemu_io_silent(*args):
199    '''Run qemu-io and return the exit code, suppressing stdout'''
200    args = qemu_io_args + list(args)
201    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202    if exitcode < 0:
203        sys.stderr.write('qemu-io received signal %i: %s\n' %
204                         (-exitcode, ' '.join(args)))
205    return exitcode
206
207def qemu_io_silent_check(*args):
208    '''Run qemu-io and return the true if subprocess returned 0'''
209    args = qemu_io_args + list(args)
210    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211                               stderr=subprocess.STDOUT)
212    return exitcode == 0
213
214def get_virtio_scsi_device():
215    if qemu_default_machine == 's390-ccw-virtio':
216        return 'virtio-scsi-ccw'
217    return 'virtio-scsi-pci'
218
219class QemuIoInteractive:
220    def __init__(self, *args):
221        self.args = qemu_io_args_no_fmt + list(args)
222        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223                                   stdout=subprocess.PIPE,
224                                   stderr=subprocess.STDOUT,
225                                   universal_newlines=True)
226        out = self._p.stdout.read(9)
227        if out != 'qemu-io> ':
228            # Most probably qemu-io just failed to start.
229            # Let's collect the whole output and exit.
230            out += self._p.stdout.read()
231            self._p.wait(timeout=1)
232            raise ValueError(out)
233
234    def close(self):
235        self._p.communicate('q\n')
236
237    def _read_output(self):
238        pattern = 'qemu-io> '
239        n = len(pattern)
240        pos = 0
241        s = []
242        while pos != n:
243            c = self._p.stdout.read(1)
244            # check unexpected EOF
245            assert c != ''
246            s.append(c)
247            if c == pattern[pos]:
248                pos += 1
249            else:
250                pos = 0
251
252        return ''.join(s[:-n])
253
254    def cmd(self, cmd):
255        # quit command is in close(), '\n' is added automatically
256        assert '\n' not in cmd
257        cmd = cmd.strip()
258        assert cmd not in ('q', 'quit')
259        self._p.stdin.write(cmd + '\n')
260        self._p.stdin.flush()
261        return self._read_output()
262
263
264def qemu_nbd(*args):
265    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
267
268def qemu_nbd_early_pipe(*args):
269    '''Run qemu-nbd in daemon mode and return both the parent's exit code
270       and its output in case of an error'''
271    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272                            stdout=subprocess.PIPE,
273                            universal_newlines=True)
274    output = subp.communicate()[0]
275    if subp.returncode < 0:
276        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277                         (-subp.returncode,
278                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
279
280    return subp.returncode, output if subp.returncode else ''
281
282@contextmanager
283def qemu_nbd_popen(*args):
284    '''Context manager running qemu-nbd within the context'''
285    pid_file = file_path("pid")
286
287    cmd = list(qemu_nbd_args)
288    cmd.extend(('--persistent', '--pid-file', pid_file))
289    cmd.extend(args)
290
291    log('Start NBD server')
292    p = subprocess.Popen(cmd)
293    try:
294        while not os.path.exists(pid_file):
295            if p.poll() is not None:
296                raise RuntimeError(
297                    "qemu-nbd terminated with exit code {}: {}"
298                    .format(p.returncode, ' '.join(cmd)))
299
300            time.sleep(0.01)
301        yield
302    finally:
303        log('Kill NBD server')
304        p.kill()
305        p.wait()
306
307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308    '''Return True if two image files are identical'''
309    return qemu_img('compare', '-f', fmt1,
310                    '-F', fmt2, img1, img2) == 0
311
312def create_image(name, size):
313    '''Create a fully-allocated raw image with sector markers'''
314    file = open(name, 'wb')
315    i = 0
316    while i < size:
317        sector = struct.pack('>l504xl', i // 512, i // 512)
318        file.write(sector)
319        i = i + 512
320    file.close()
321
322def image_size(img):
323    '''Return image's virtual size'''
324    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325    return json.loads(r)['virtual-size']
326
327def is_str(val):
328    return isinstance(val, str)
329
330test_dir_re = re.compile(r"%s" % test_dir)
331def filter_test_dir(msg):
332    return test_dir_re.sub("TEST_DIR", msg)
333
334win32_re = re.compile(r"\r")
335def filter_win32(msg):
336    return win32_re.sub("", msg)
337
338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340                        r"and [0-9\/.inf]* ops\/sec\)")
341def filter_qemu_io(msg):
342    msg = filter_win32(msg)
343    return qemu_io_re.sub("X ops; XX:XX:XX.X "
344                          "(XXX YYY/sec and XXX ops/sec)", msg)
345
346chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347def filter_chown(msg):
348    return chown_re.sub("chown UID:GID", msg)
349
350def filter_qmp_event(event):
351    '''Filter a QMP event dict'''
352    event = dict(event)
353    if 'timestamp' in event:
354        event['timestamp']['seconds'] = 'SECS'
355        event['timestamp']['microseconds'] = 'USECS'
356    return event
357
358def filter_qmp(qmsg, filter_fn):
359    '''Given a string filter, filter a QMP object's values.
360    filter_fn takes a (key, value) pair.'''
361    # Iterate through either lists or dicts;
362    if isinstance(qmsg, list):
363        items = enumerate(qmsg)
364    else:
365        items = qmsg.items()
366
367    for k, v in items:
368        if isinstance(v, (dict, list)):
369            qmsg[k] = filter_qmp(v, filter_fn)
370        else:
371            qmsg[k] = filter_fn(k, v)
372    return qmsg
373
374def filter_testfiles(msg):
375    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
378
379def filter_qmp_testfiles(qmsg):
380    def _filter(_key, value):
381        if is_str(value):
382            return filter_testfiles(value)
383        return value
384    return filter_qmp(qmsg, _filter)
385
386def filter_generated_node_ids(msg):
387    return re.sub("#block[0-9]+", "NODE_NAME", msg)
388
389def filter_img_info(output, filename):
390    lines = []
391    for line in output.split('\n'):
392        if 'disk size' in line or 'actual-size' in line:
393            continue
394        line = line.replace(filename, 'TEST_IMG')
395        line = filter_testfiles(line)
396        line = line.replace(imgfmt, 'IMGFMT')
397        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398        line = re.sub('uuid: [-a-f0-9]+',
399                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400                      line)
401        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402        lines.append(line)
403    return '\n'.join(lines)
404
405def filter_imgfmt(msg):
406    return msg.replace(imgfmt, 'IMGFMT')
407
408def filter_qmp_imgfmt(qmsg):
409    def _filter(_key, value):
410        if is_str(value):
411            return filter_imgfmt(value)
412        return value
413    return filter_qmp(qmsg, _filter)
414
415
416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
417
418def log(msg: Msg,
419        filters: Iterable[Callable[[Msg], Msg]] = (),
420        indent: Optional[int] = None) -> None:
421    """
422    Logs either a string message or a JSON serializable message (like QMP).
423    If indent is provided, JSON serializable messages are pretty-printed.
424    """
425    for flt in filters:
426        msg = flt(msg)
427    if isinstance(msg, (dict, list)):
428        # Don't sort if it's already sorted
429        do_sort = not isinstance(msg, OrderedDict)
430        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431    else:
432        test_logger.info(msg)
433
434class Timeout:
435    def __init__(self, seconds, errmsg="Timeout"):
436        self.seconds = seconds
437        self.errmsg = errmsg
438    def __enter__(self):
439        signal.signal(signal.SIGALRM, self.timeout)
440        signal.setitimer(signal.ITIMER_REAL, self.seconds)
441        return self
442    def __exit__(self, exc_type, value, traceback):
443        signal.setitimer(signal.ITIMER_REAL, 0)
444        return False
445    def timeout(self, signum, frame):
446        raise Exception(self.errmsg)
447
448def file_pattern(name):
449    return "{0}-{1}".format(os.getpid(), name)
450
451class FilePath:
452    """
453    Context manager generating multiple file names. The generated files are
454    removed when exiting the context.
455
456    Example usage:
457
458        with FilePath('a.img', 'b.img') as (img_a, img_b):
459            # Use img_a and img_b here...
460
461        # a.img and b.img are automatically removed here.
462
463    By default images are created in iotests.test_dir. To create sockets use
464    iotests.sock_dir:
465
466       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
467
468    For convenience, calling with one argument yields a single file instead of
469    a tuple with one item.
470
471    """
472    def __init__(self, *names, base_dir=test_dir):
473        self.paths = [os.path.join(base_dir, file_pattern(name))
474                      for name in names]
475
476    def __enter__(self):
477        if len(self.paths) == 1:
478            return self.paths[0]
479        else:
480            return self.paths
481
482    def __exit__(self, exc_type, exc_val, exc_tb):
483        for path in self.paths:
484            try:
485                os.remove(path)
486            except OSError:
487                pass
488        return False
489
490
491def file_path_remover():
492    for path in reversed(file_path_remover.paths):
493        try:
494            os.remove(path)
495        except OSError:
496            pass
497
498
499def file_path(*names, base_dir=test_dir):
500    ''' Another way to get auto-generated filename that cleans itself up.
501
502    Use is as simple as:
503
504    img_a, img_b = file_path('a.img', 'b.img')
505    sock = file_path('socket')
506    '''
507
508    if not hasattr(file_path_remover, 'paths'):
509        file_path_remover.paths = []
510        atexit.register(file_path_remover)
511
512    paths = []
513    for name in names:
514        filename = file_pattern(name)
515        path = os.path.join(base_dir, filename)
516        file_path_remover.paths.append(path)
517        paths.append(path)
518
519    return paths[0] if len(paths) == 1 else paths
520
521def remote_filename(path):
522    if imgproto == 'file':
523        return path
524    elif imgproto == 'ssh':
525        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
526    else:
527        raise Exception("Protocol %s not supported" % (imgproto))
528
529class VM(qtest.QEMUQtestMachine):
530    '''A QEMU VM'''
531
532    def __init__(self, path_suffix=''):
533        name = "qemu%s-%d" % (path_suffix, os.getpid())
534        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
535                                 test_dir=test_dir,
536                                 socket_scm_helper=socket_scm_helper,
537                                 sock_dir=sock_dir)
538        self._num_drives = 0
539
540    def add_object(self, opts):
541        self._args.append('-object')
542        self._args.append(opts)
543        return self
544
545    def add_device(self, opts):
546        self._args.append('-device')
547        self._args.append(opts)
548        return self
549
550    def add_drive_raw(self, opts):
551        self._args.append('-drive')
552        self._args.append(opts)
553        return self
554
555    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
556        '''Add a virtio-blk drive to the VM'''
557        options = ['if=%s' % interface,
558                   'id=drive%d' % self._num_drives]
559
560        if path is not None:
561            options.append('file=%s' % path)
562            options.append('format=%s' % img_format)
563            options.append('cache=%s' % cachemode)
564            options.append('aio=%s' % aiomode)
565
566        if opts:
567            options.append(opts)
568
569        if img_format == 'luks' and 'key-secret' not in opts:
570            # default luks support
571            if luks_default_secret_object not in self._args:
572                self.add_object(luks_default_secret_object)
573
574            options.append(luks_default_key_secret_opt)
575
576        self._args.append('-drive')
577        self._args.append(','.join(options))
578        self._num_drives += 1
579        return self
580
581    def add_blockdev(self, opts):
582        self._args.append('-blockdev')
583        if isinstance(opts, str):
584            self._args.append(opts)
585        else:
586            self._args.append(','.join(opts))
587        return self
588
589    def add_incoming(self, addr):
590        self._args.append('-incoming')
591        self._args.append(addr)
592        return self
593
594    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
595        cmd = 'human-monitor-command'
596        kwargs = {'command-line': command_line}
597        if use_log:
598            return self.qmp_log(cmd, **kwargs)
599        else:
600            return self.qmp(cmd, **kwargs)
601
602    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
603        """Pause drive r/w operations"""
604        if not event:
605            self.pause_drive(drive, "read_aio")
606            self.pause_drive(drive, "write_aio")
607            return
608        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
609
610    def resume_drive(self, drive: str) -> None:
611        """Resume drive r/w operations"""
612        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
613
614    def hmp_qemu_io(self, drive: str, cmd: str,
615                    use_log: bool = False) -> QMPMessage:
616        """Write to a given drive using an HMP command"""
617        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
618
619    def flatten_qmp_object(self, obj, output=None, basestr=''):
620        if output is None:
621            output = dict()
622        if isinstance(obj, list):
623            for i, item in enumerate(obj):
624                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
625        elif isinstance(obj, dict):
626            for key in obj:
627                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
628        else:
629            output[basestr[:-1]] = obj # Strip trailing '.'
630        return output
631
632    def qmp_to_opts(self, obj):
633        obj = self.flatten_qmp_object(obj)
634        output_list = list()
635        for key in obj:
636            output_list += [key + '=' + obj[key]]
637        return ','.join(output_list)
638
639    def get_qmp_events_filtered(self, wait=60.0):
640        result = []
641        for ev in self.get_qmp_events(wait=wait):
642            result.append(filter_qmp_event(ev))
643        return result
644
645    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
646        full_cmd = OrderedDict((
647            ("execute", cmd),
648            ("arguments", ordered_qmp(kwargs))
649        ))
650        log(full_cmd, filters, indent=indent)
651        result = self.qmp(cmd, **kwargs)
652        log(result, filters, indent=indent)
653        return result
654
655    # Returns None on success, and an error string on failure
656    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
657                pre_finalize=None, cancel=False, wait=60.0):
658        """
659        run_job moves a job from creation through to dismissal.
660
661        :param job: String. ID of recently-launched job
662        :param auto_finalize: Bool. True if the job was launched with
663                              auto_finalize. Defaults to True.
664        :param auto_dismiss: Bool. True if the job was launched with
665                             auto_dismiss=True. Defaults to False.
666        :param pre_finalize: Callback. A callable that takes no arguments to be
667                             invoked prior to issuing job-finalize, if any.
668        :param cancel: Bool. When true, cancels the job after the pre_finalize
669                       callback.
670        :param wait: Float. Timeout value specifying how long to wait for any
671                     event, in seconds. Defaults to 60.0.
672        """
673        match_device = {'data': {'device': job}}
674        match_id = {'data': {'id': job}}
675        events = [
676            ('BLOCK_JOB_COMPLETED', match_device),
677            ('BLOCK_JOB_CANCELLED', match_device),
678            ('BLOCK_JOB_ERROR', match_device),
679            ('BLOCK_JOB_READY', match_device),
680            ('BLOCK_JOB_PENDING', match_id),
681            ('JOB_STATUS_CHANGE', match_id)
682        ]
683        error = None
684        while True:
685            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
686            if ev['event'] != 'JOB_STATUS_CHANGE':
687                log(ev)
688                continue
689            status = ev['data']['status']
690            if status == 'aborting':
691                result = self.qmp('query-jobs')
692                for j in result['return']:
693                    if j['id'] == job:
694                        error = j['error']
695                        log('Job failed: %s' % (j['error']))
696            elif status == 'ready':
697                self.qmp_log('job-complete', id=job)
698            elif status == 'pending' and not auto_finalize:
699                if pre_finalize:
700                    pre_finalize()
701                if cancel:
702                    self.qmp_log('job-cancel', id=job)
703                else:
704                    self.qmp_log('job-finalize', id=job)
705            elif status == 'concluded' and not auto_dismiss:
706                self.qmp_log('job-dismiss', id=job)
707            elif status == 'null':
708                return error
709
710    # Returns None on success, and an error string on failure
711    def blockdev_create(self, options, job_id='job0', filters=None):
712        if filters is None:
713            filters = [filter_qmp_testfiles]
714        result = self.qmp_log('blockdev-create', filters=filters,
715                              job_id=job_id, options=options)
716
717        if 'return' in result:
718            assert result['return'] == {}
719            job_result = self.run_job(job_id)
720        else:
721            job_result = result['error']
722
723        log("")
724        return job_result
725
726    def enable_migration_events(self, name):
727        log('Enabling migration QMP events on %s...' % name)
728        log(self.qmp('migrate-set-capabilities', capabilities=[
729            {
730                'capability': 'events',
731                'state': True
732            }
733        ]))
734
735    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
736        while True:
737            event = self.event_wait('MIGRATION')
738            log(event, filters=[filter_qmp_event])
739            if event['data']['status'] in ('completed', 'failed'):
740                break
741
742        if event['data']['status'] == 'completed':
743            # The event may occur in finish-migrate, so wait for the expected
744            # post-migration runstate
745            runstate = None
746            while runstate != expect_runstate:
747                runstate = self.qmp('query-status')['return']['status']
748            return True
749        else:
750            return False
751
752    def node_info(self, node_name):
753        nodes = self.qmp('query-named-block-nodes')
754        for x in nodes['return']:
755            if x['node-name'] == node_name:
756                return x
757        return None
758
759    def query_bitmaps(self):
760        res = self.qmp("query-named-block-nodes")
761        return {device['node-name']: device['dirty-bitmaps']
762                for device in res['return'] if 'dirty-bitmaps' in device}
763
764    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
765        """
766        get a specific bitmap from the object returned by query_bitmaps.
767        :param recording: If specified, filter results by the specified value.
768        :param bitmaps: If specified, use it instead of call query_bitmaps()
769        """
770        if bitmaps is None:
771            bitmaps = self.query_bitmaps()
772
773        for bitmap in bitmaps[node_name]:
774            if bitmap.get('name', '') == bitmap_name:
775                if recording is None or bitmap.get('recording') == recording:
776                    return bitmap
777        return None
778
779    def check_bitmap_status(self, node_name, bitmap_name, fields):
780        ret = self.get_bitmap(node_name, bitmap_name)
781
782        return fields.items() <= ret.items()
783
784    def assert_block_path(self, root, path, expected_node, graph=None):
785        """
786        Check whether the node under the given path in the block graph
787        is @expected_node.
788
789        @root is the node name of the node where the @path is rooted.
790
791        @path is a string that consists of child names separated by
792        slashes.  It must begin with a slash.
793
794        Examples for @root + @path:
795          - root="qcow2-node", path="/backing/file"
796          - root="quorum-node", path="/children.2/file"
797
798        Hypothetically, @path could be empty, in which case it would
799        point to @root.  However, in practice this case is not useful
800        and hence not allowed.
801
802        @expected_node may be None.  (All elements of the path but the
803        leaf must still exist.)
804
805        @graph may be None or the result of an x-debug-query-block-graph
806        call that has already been performed.
807        """
808        if graph is None:
809            graph = self.qmp('x-debug-query-block-graph')['return']
810
811        iter_path = iter(path.split('/'))
812
813        # Must start with a /
814        assert next(iter_path) == ''
815
816        node = next((node for node in graph['nodes'] if node['name'] == root),
817                    None)
818
819        # An empty @path is not allowed, so the root node must be present
820        assert node is not None, 'Root node %s not found' % root
821
822        for child_name in iter_path:
823            assert node is not None, 'Cannot follow path %s%s' % (root, path)
824
825            try:
826                node_id = next(edge['child'] for edge in graph['edges']
827                               if (edge['parent'] == node['id'] and
828                                   edge['name'] == child_name))
829
830                node = next(node for node in graph['nodes']
831                            if node['id'] == node_id)
832
833            except StopIteration:
834                node = None
835
836        if node is None:
837            assert expected_node is None, \
838                   'No node found under %s (but expected %s)' % \
839                   (path, expected_node)
840        else:
841            assert node['name'] == expected_node, \
842                   'Found node %s under %s (but expected %s)' % \
843                   (node['name'], path, expected_node)
844
845index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
846
847class QMPTestCase(unittest.TestCase):
848    '''Abstract base class for QMP test cases'''
849
850    def __init__(self, *args, **kwargs):
851        super().__init__(*args, **kwargs)
852        # Many users of this class set a VM property we rely on heavily
853        # in the methods below.
854        self.vm = None
855
856    def dictpath(self, d, path):
857        '''Traverse a path in a nested dict'''
858        for component in path.split('/'):
859            m = index_re.match(component)
860            if m:
861                component, idx = m.groups()
862                idx = int(idx)
863
864            if not isinstance(d, dict) or component not in d:
865                self.fail(f'failed path traversal for "{path}" in "{d}"')
866            d = d[component]
867
868            if m:
869                if not isinstance(d, list):
870                    self.fail(f'path component "{component}" in "{path}" '
871                              f'is not a list in "{d}"')
872                try:
873                    d = d[idx]
874                except IndexError:
875                    self.fail(f'invalid index "{idx}" in path "{path}" '
876                              f'in "{d}"')
877        return d
878
879    def assert_qmp_absent(self, d, path):
880        try:
881            result = self.dictpath(d, path)
882        except AssertionError:
883            return
884        self.fail('path "%s" has value "%s"' % (path, str(result)))
885
886    def assert_qmp(self, d, path, value):
887        '''Assert that the value for a specific path in a QMP dict
888           matches.  When given a list of values, assert that any of
889           them matches.'''
890
891        result = self.dictpath(d, path)
892
893        # [] makes no sense as a list of valid values, so treat it as
894        # an actual single value.
895        if isinstance(value, list) and value != []:
896            for v in value:
897                if result == v:
898                    return
899            self.fail('no match for "%s" in %s' % (str(result), str(value)))
900        else:
901            self.assertEqual(result, value,
902                             '"%s" is "%s", expected "%s"'
903                             % (path, str(result), str(value)))
904
905    def assert_no_active_block_jobs(self):
906        result = self.vm.qmp('query-block-jobs')
907        self.assert_qmp(result, 'return', [])
908
909    def assert_has_block_node(self, node_name=None, file_name=None):
910        """Issue a query-named-block-nodes and assert node_name and/or
911        file_name is present in the result"""
912        def check_equal_or_none(a, b):
913            return a is None or b is None or a == b
914        assert node_name or file_name
915        result = self.vm.qmp('query-named-block-nodes')
916        for x in result["return"]:
917            if check_equal_or_none(x.get("node-name"), node_name) and \
918                    check_equal_or_none(x.get("file"), file_name):
919                return
920        self.fail("Cannot find %s %s in result:\n%s" %
921                  (node_name, file_name, result))
922
923    def assert_json_filename_equal(self, json_filename, reference):
924        '''Asserts that the given filename is a json: filename and that its
925           content is equal to the given reference object'''
926        self.assertEqual(json_filename[:5], 'json:')
927        self.assertEqual(
928            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
929            self.vm.flatten_qmp_object(reference)
930        )
931
932    def cancel_and_wait(self, drive='drive0', force=False,
933                        resume=False, wait=60.0):
934        '''Cancel a block job and wait for it to finish, returning the event'''
935        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
936        self.assert_qmp(result, 'return', {})
937
938        if resume:
939            self.vm.resume_drive(drive)
940
941        cancelled = False
942        result = None
943        while not cancelled:
944            for event in self.vm.get_qmp_events(wait=wait):
945                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
946                   event['event'] == 'BLOCK_JOB_CANCELLED':
947                    self.assert_qmp(event, 'data/device', drive)
948                    result = event
949                    cancelled = True
950                elif event['event'] == 'JOB_STATUS_CHANGE':
951                    self.assert_qmp(event, 'data/id', drive)
952
953
954        self.assert_no_active_block_jobs()
955        return result
956
957    def wait_until_completed(self, drive='drive0', check_offset=True,
958                             wait=60.0, error=None):
959        '''Wait for a block job to finish, returning the event'''
960        while True:
961            for event in self.vm.get_qmp_events(wait=wait):
962                if event['event'] == 'BLOCK_JOB_COMPLETED':
963                    self.assert_qmp(event, 'data/device', drive)
964                    if error is None:
965                        self.assert_qmp_absent(event, 'data/error')
966                        if check_offset:
967                            self.assert_qmp(event, 'data/offset',
968                                            event['data']['len'])
969                    else:
970                        self.assert_qmp(event, 'data/error', error)
971                    self.assert_no_active_block_jobs()
972                    return event
973                if event['event'] == 'JOB_STATUS_CHANGE':
974                    self.assert_qmp(event, 'data/id', drive)
975
976    def wait_ready(self, drive='drive0'):
977        """Wait until a BLOCK_JOB_READY event, and return the event."""
978        return self.vm.events_wait([
979            ('BLOCK_JOB_READY',
980             {'data': {'type': 'mirror', 'device': drive}}),
981            ('BLOCK_JOB_READY',
982             {'data': {'type': 'commit', 'device': drive}})
983        ])
984
985    def wait_ready_and_cancel(self, drive='drive0'):
986        self.wait_ready(drive=drive)
987        event = self.cancel_and_wait(drive=drive)
988        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
989        self.assert_qmp(event, 'data/type', 'mirror')
990        self.assert_qmp(event, 'data/offset', event['data']['len'])
991
992    def complete_and_wait(self, drive='drive0', wait_ready=True,
993                          completion_error=None):
994        '''Complete a block job and wait for it to finish'''
995        if wait_ready:
996            self.wait_ready(drive=drive)
997
998        result = self.vm.qmp('block-job-complete', device=drive)
999        self.assert_qmp(result, 'return', {})
1000
1001        event = self.wait_until_completed(drive=drive, error=completion_error)
1002        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1003
1004    def pause_wait(self, job_id='job0'):
1005        with Timeout(3, "Timeout waiting for job to pause"):
1006            while True:
1007                result = self.vm.qmp('query-block-jobs')
1008                found = False
1009                for job in result['return']:
1010                    if job['device'] == job_id:
1011                        found = True
1012                        if job['paused'] and not job['busy']:
1013                            return job
1014                        break
1015                assert found
1016
1017    def pause_job(self, job_id='job0', wait=True):
1018        result = self.vm.qmp('block-job-pause', device=job_id)
1019        self.assert_qmp(result, 'return', {})
1020        if wait:
1021            return self.pause_wait(job_id)
1022        return result
1023
1024    def case_skip(self, reason):
1025        '''Skip this test case'''
1026        case_notrun(reason)
1027        self.skipTest(reason)
1028
1029
1030def notrun(reason):
1031    '''Skip this test suite'''
1032    # Each test in qemu-iotests has a number ("seq")
1033    seq = os.path.basename(sys.argv[0])
1034
1035    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1036    logger.warning("%s not run: %s", seq, reason)
1037    sys.exit(0)
1038
1039def case_notrun(reason):
1040    '''Mark this test case as not having been run (without actually
1041    skipping it, that is left to the caller).  See
1042    QMPTestCase.case_skip() for a variant that actually skips the
1043    current test case.'''
1044
1045    # Each test in qemu-iotests has a number ("seq")
1046    seq = os.path.basename(sys.argv[0])
1047
1048    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1049        '    [case not run] ' + reason + '\n')
1050
1051def _verify_image_format(supported_fmts: Sequence[str] = (),
1052                         unsupported_fmts: Sequence[str] = ()) -> None:
1053    assert not (supported_fmts and unsupported_fmts)
1054
1055    if 'generic' in supported_fmts and \
1056            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1057        # similar to
1058        #   _supported_fmt generic
1059        # for bash tests
1060        if imgfmt == 'luks':
1061            verify_working_luks()
1062        return
1063
1064    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1065    if not_sup or (imgfmt in unsupported_fmts):
1066        notrun('not suitable for this image format: %s' % imgfmt)
1067
1068    if imgfmt == 'luks':
1069        verify_working_luks()
1070
1071def _verify_protocol(supported: Sequence[str] = (),
1072                     unsupported: Sequence[str] = ()) -> None:
1073    assert not (supported and unsupported)
1074
1075    if 'generic' in supported:
1076        return
1077
1078    not_sup = supported and (imgproto not in supported)
1079    if not_sup or (imgproto in unsupported):
1080        notrun('not suitable for this protocol: %s' % imgproto)
1081
1082def _verify_platform(supported: Sequence[str] = (),
1083                     unsupported: Sequence[str] = ()) -> None:
1084    if any((sys.platform.startswith(x) for x in unsupported)):
1085        notrun('not suitable for this OS: %s' % sys.platform)
1086
1087    if supported:
1088        if not any((sys.platform.startswith(x) for x in supported)):
1089            notrun('not suitable for this OS: %s' % sys.platform)
1090
1091def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1092    if supported_cache_modes and (cachemode not in supported_cache_modes):
1093        notrun('not suitable for this cache mode: %s' % cachemode)
1094
1095def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1096    if supported_aio_modes and (aiomode not in supported_aio_modes):
1097        notrun('not suitable for this aio mode: %s' % aiomode)
1098
1099def supports_quorum():
1100    return 'quorum' in qemu_img_pipe('--help')
1101
1102def verify_quorum():
1103    '''Skip test suite if quorum support is not available'''
1104    if not supports_quorum():
1105        notrun('quorum support missing')
1106
1107def has_working_luks() -> Tuple[bool, str]:
1108    """
1109    Check whether our LUKS driver can actually create images
1110    (this extends to LUKS encryption for qcow2).
1111
1112    If not, return the reason why.
1113    """
1114
1115    img_file = f'{test_dir}/luks-test.luks'
1116    (output, status) = \
1117        qemu_img_pipe_and_status('create', '-f', 'luks',
1118                                 '--object', luks_default_secret_object,
1119                                 '-o', luks_default_key_secret_opt,
1120                                 '-o', 'iter-time=10',
1121                                 img_file, '1G')
1122    try:
1123        os.remove(img_file)
1124    except OSError:
1125        pass
1126
1127    if status != 0:
1128        reason = output
1129        for line in output.splitlines():
1130            if img_file + ':' in line:
1131                reason = line.split(img_file + ':', 1)[1].strip()
1132                break
1133
1134        return (False, reason)
1135    else:
1136        return (True, '')
1137
1138def verify_working_luks():
1139    """
1140    Skip test suite if LUKS does not work
1141    """
1142    (working, reason) = has_working_luks()
1143    if not working:
1144        notrun(reason)
1145
1146def qemu_pipe(*args):
1147    """
1148    Run qemu with an option to print something and exit (e.g. a help option).
1149
1150    :return: QEMU's stdout output.
1151    """
1152    args = [qemu_prog] + qemu_opts + list(args)
1153    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1154                            stderr=subprocess.STDOUT,
1155                            universal_newlines=True)
1156    output = subp.communicate()[0]
1157    if subp.returncode < 0:
1158        sys.stderr.write('qemu received signal %i: %s\n' %
1159                         (-subp.returncode, ' '.join(args)))
1160    return output
1161
1162def supported_formats(read_only=False):
1163    '''Set 'read_only' to True to check ro-whitelist
1164       Otherwise, rw-whitelist is checked'''
1165
1166    if not hasattr(supported_formats, "formats"):
1167        supported_formats.formats = {}
1168
1169    if read_only not in supported_formats.formats:
1170        format_message = qemu_pipe("-drive", "format=help")
1171        line = 1 if read_only else 0
1172        supported_formats.formats[read_only] = \
1173            format_message.splitlines()[line].split(":")[1].split()
1174
1175    return supported_formats.formats[read_only]
1176
1177def skip_if_unsupported(required_formats=(), read_only=False):
1178    '''Skip Test Decorator
1179       Runs the test if all the required formats are whitelisted'''
1180    def skip_test_decorator(func):
1181        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1182                         **kwargs: Dict[str, Any]) -> None:
1183            if callable(required_formats):
1184                fmts = required_formats(test_case)
1185            else:
1186                fmts = required_formats
1187
1188            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1189            if usf_list:
1190                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1191                test_case.case_skip(msg)
1192            else:
1193                func(test_case, *args, **kwargs)
1194        return func_wrapper
1195    return skip_test_decorator
1196
1197def skip_for_formats(formats: Sequence[str] = ()) \
1198    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1199                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1200    '''Skip Test Decorator
1201       Skips the test for the given formats'''
1202    def skip_test_decorator(func):
1203        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1204                         **kwargs: Dict[str, Any]) -> None:
1205            if imgfmt in formats:
1206                msg = f'{test_case}: Skipped for format {imgfmt}'
1207                test_case.case_skip(msg)
1208            else:
1209                func(test_case, *args, **kwargs)
1210        return func_wrapper
1211    return skip_test_decorator
1212
1213def skip_if_user_is_root(func):
1214    '''Skip Test Decorator
1215       Runs the test only without root permissions'''
1216    def func_wrapper(*args, **kwargs):
1217        if os.getuid() == 0:
1218            case_notrun('{}: cannot be run as root'.format(args[0]))
1219            return None
1220        else:
1221            return func(*args, **kwargs)
1222    return func_wrapper
1223
1224def execute_unittest(debug=False):
1225    """Executes unittests within the calling module."""
1226
1227    verbosity = 2 if debug else 1
1228
1229    if debug:
1230        output = sys.stdout
1231    else:
1232        # We need to filter out the time taken from the output so that
1233        # qemu-iotest can reliably diff the results against master output.
1234        output = io.StringIO()
1235
1236    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1237                                     verbosity=verbosity)
1238    try:
1239        # unittest.main() will use sys.exit(); so expect a SystemExit
1240        # exception
1241        unittest.main(testRunner=runner)
1242    finally:
1243        # We need to filter out the time taken from the output so that
1244        # qemu-iotest can reliably diff the results against master output.
1245        if not debug:
1246            out = output.getvalue()
1247            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1248
1249            # Hide skipped tests from the reference output
1250            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1251            out_first_line, out_rest = out.split('\n', 1)
1252            out = out_first_line.replace('s', '.') + '\n' + out_rest
1253
1254            sys.stderr.write(out)
1255
1256def execute_setup_common(supported_fmts: Sequence[str] = (),
1257                         supported_platforms: Sequence[str] = (),
1258                         supported_cache_modes: Sequence[str] = (),
1259                         supported_aio_modes: Sequence[str] = (),
1260                         unsupported_fmts: Sequence[str] = (),
1261                         supported_protocols: Sequence[str] = (),
1262                         unsupported_protocols: Sequence[str] = ()) -> bool:
1263    """
1264    Perform necessary setup for either script-style or unittest-style tests.
1265
1266    :return: Bool; Whether or not debug mode has been requested via the CLI.
1267    """
1268    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1269
1270    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1271    # indicate that we're not being run via "check". There may be
1272    # other things set up by "check" that individual test cases rely
1273    # on.
1274    if test_dir is None or qemu_default_machine is None:
1275        sys.stderr.write('Please run this test via the "check" script\n')
1276        sys.exit(os.EX_USAGE)
1277
1278    debug = '-d' in sys.argv
1279    if debug:
1280        sys.argv.remove('-d')
1281    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1282
1283    _verify_image_format(supported_fmts, unsupported_fmts)
1284    _verify_protocol(supported_protocols, unsupported_protocols)
1285    _verify_platform(supported=supported_platforms)
1286    _verify_cache_mode(supported_cache_modes)
1287    _verify_aio_mode(supported_aio_modes)
1288
1289    return debug
1290
1291def execute_test(*args, test_function=None, **kwargs):
1292    """Run either unittest or script-style tests."""
1293
1294    debug = execute_setup_common(*args, **kwargs)
1295    if not test_function:
1296        execute_unittest(debug)
1297    else:
1298        test_function()
1299
1300def activate_logging():
1301    """Activate iotests.log() output to stdout for script-style tests."""
1302    handler = logging.StreamHandler(stream=sys.stdout)
1303    formatter = logging.Formatter('%(message)s')
1304    handler.setFormatter(formatter)
1305    test_logger.addHandler(handler)
1306    test_logger.setLevel(logging.INFO)
1307    test_logger.propagate = False
1308
1309# This is called from script-style iotests without a single point of entry
1310def script_initialize(*args, **kwargs):
1311    """Initialize script-style tests without running any tests."""
1312    activate_logging()
1313    execute_setup_common(*args, **kwargs)
1314
1315# This is called from script-style iotests with a single point of entry
1316def script_main(test_function, *args, **kwargs):
1317    """Run script-style tests outside of the unittest framework"""
1318    activate_logging()
1319    execute_test(*args, test_function=test_function, **kwargs)
1320
1321# This is called from unittest style iotests
1322def main(*args, **kwargs):
1323    """Run tests using the unittest framework"""
1324    execute_test(*args, **kwargs)
1325