xref: /qemu/tests/qemu-iotests/iotests.py (revision abff1abf)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43assert sys.version_info >= (3, 6)
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_measure(*args):
145    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
146
147def qemu_img_check(*args):
148    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
149
150def qemu_img_verbose(*args):
151    '''Run qemu-img without suppressing its output and return the exit code'''
152    exitcode = subprocess.call(qemu_img_args + list(args))
153    if exitcode < 0:
154        sys.stderr.write('qemu-img received signal %i: %s\n'
155                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
156    return exitcode
157
158def qemu_img_pipe(*args: str) -> str:
159    '''Run qemu-img and return its output'''
160    return qemu_img_pipe_and_status(*args)[0]
161
162def qemu_img_log(*args):
163    result = qemu_img_pipe(*args)
164    log(result, filters=[filter_testfiles])
165    return result
166
167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168    args = ['info']
169    if imgopts:
170        args.append('--image-opts')
171    else:
172        args += ['-f', imgfmt]
173    args += extra_args
174    args.append(filename)
175
176    output = qemu_img_pipe(*args)
177    if not filter_path:
178        filter_path = filename
179    log(filter_img_info(output, filter_path))
180
181def qemu_io(*args):
182    '''Run qemu-io and return the stdout data'''
183    args = qemu_io_args + list(args)
184    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185                            stderr=subprocess.STDOUT,
186                            universal_newlines=True)
187    output = subp.communicate()[0]
188    if subp.returncode < 0:
189        sys.stderr.write('qemu-io received signal %i: %s\n'
190                         % (-subp.returncode, ' '.join(args)))
191    return output
192
193def qemu_io_log(*args):
194    result = qemu_io(*args)
195    log(result, filters=[filter_testfiles, filter_qemu_io])
196    return result
197
198def qemu_io_silent(*args):
199    '''Run qemu-io and return the exit code, suppressing stdout'''
200    args = qemu_io_args + list(args)
201    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202    if exitcode < 0:
203        sys.stderr.write('qemu-io received signal %i: %s\n' %
204                         (-exitcode, ' '.join(args)))
205    return exitcode
206
207def qemu_io_silent_check(*args):
208    '''Run qemu-io and return the true if subprocess returned 0'''
209    args = qemu_io_args + list(args)
210    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211                               stderr=subprocess.STDOUT)
212    return exitcode == 0
213
214def get_virtio_scsi_device():
215    if qemu_default_machine == 's390-ccw-virtio':
216        return 'virtio-scsi-ccw'
217    return 'virtio-scsi-pci'
218
219class QemuIoInteractive:
220    def __init__(self, *args):
221        self.args = qemu_io_args_no_fmt + list(args)
222        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223                                   stdout=subprocess.PIPE,
224                                   stderr=subprocess.STDOUT,
225                                   universal_newlines=True)
226        out = self._p.stdout.read(9)
227        if out != 'qemu-io> ':
228            # Most probably qemu-io just failed to start.
229            # Let's collect the whole output and exit.
230            out += self._p.stdout.read()
231            self._p.wait(timeout=1)
232            raise ValueError(out)
233
234    def close(self):
235        self._p.communicate('q\n')
236
237    def _read_output(self):
238        pattern = 'qemu-io> '
239        n = len(pattern)
240        pos = 0
241        s = []
242        while pos != n:
243            c = self._p.stdout.read(1)
244            # check unexpected EOF
245            assert c != ''
246            s.append(c)
247            if c == pattern[pos]:
248                pos += 1
249            else:
250                pos = 0
251
252        return ''.join(s[:-n])
253
254    def cmd(self, cmd):
255        # quit command is in close(), '\n' is added automatically
256        assert '\n' not in cmd
257        cmd = cmd.strip()
258        assert cmd not in ('q', 'quit')
259        self._p.stdin.write(cmd + '\n')
260        self._p.stdin.flush()
261        return self._read_output()
262
263
264def qemu_nbd(*args):
265    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
267
268def qemu_nbd_early_pipe(*args):
269    '''Run qemu-nbd in daemon mode and return both the parent's exit code
270       and its output in case of an error'''
271    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272                            stdout=subprocess.PIPE,
273                            universal_newlines=True)
274    output = subp.communicate()[0]
275    if subp.returncode < 0:
276        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277                         (-subp.returncode,
278                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
279
280    return subp.returncode, output if subp.returncode else ''
281
282@contextmanager
283def qemu_nbd_popen(*args):
284    '''Context manager running qemu-nbd within the context'''
285    pid_file = file_path("pid")
286
287    cmd = list(qemu_nbd_args)
288    cmd.extend(('--persistent', '--pid-file', pid_file))
289    cmd.extend(args)
290
291    log('Start NBD server')
292    p = subprocess.Popen(cmd)
293    try:
294        while not os.path.exists(pid_file):
295            if p.poll() is not None:
296                raise RuntimeError(
297                    "qemu-nbd terminated with exit code {}: {}"
298                    .format(p.returncode, ' '.join(cmd)))
299
300            time.sleep(0.01)
301        yield
302    finally:
303        log('Kill NBD server')
304        p.kill()
305        p.wait()
306
307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308    '''Return True if two image files are identical'''
309    return qemu_img('compare', '-f', fmt1,
310                    '-F', fmt2, img1, img2) == 0
311
312def create_image(name, size):
313    '''Create a fully-allocated raw image with sector markers'''
314    file = open(name, 'wb')
315    i = 0
316    while i < size:
317        sector = struct.pack('>l504xl', i // 512, i // 512)
318        file.write(sector)
319        i = i + 512
320    file.close()
321
322def image_size(img):
323    '''Return image's virtual size'''
324    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325    return json.loads(r)['virtual-size']
326
327def is_str(val):
328    return isinstance(val, str)
329
330test_dir_re = re.compile(r"%s" % test_dir)
331def filter_test_dir(msg):
332    return test_dir_re.sub("TEST_DIR", msg)
333
334win32_re = re.compile(r"\r")
335def filter_win32(msg):
336    return win32_re.sub("", msg)
337
338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340                        r"and [0-9\/.inf]* ops\/sec\)")
341def filter_qemu_io(msg):
342    msg = filter_win32(msg)
343    return qemu_io_re.sub("X ops; XX:XX:XX.X "
344                          "(XXX YYY/sec and XXX ops/sec)", msg)
345
346chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347def filter_chown(msg):
348    return chown_re.sub("chown UID:GID", msg)
349
350def filter_qmp_event(event):
351    '''Filter a QMP event dict'''
352    event = dict(event)
353    if 'timestamp' in event:
354        event['timestamp']['seconds'] = 'SECS'
355        event['timestamp']['microseconds'] = 'USECS'
356    return event
357
358def filter_qmp(qmsg, filter_fn):
359    '''Given a string filter, filter a QMP object's values.
360    filter_fn takes a (key, value) pair.'''
361    # Iterate through either lists or dicts;
362    if isinstance(qmsg, list):
363        items = enumerate(qmsg)
364    else:
365        items = qmsg.items()
366
367    for k, v in items:
368        if isinstance(v, (dict, list)):
369            qmsg[k] = filter_qmp(v, filter_fn)
370        else:
371            qmsg[k] = filter_fn(k, v)
372    return qmsg
373
374def filter_testfiles(msg):
375    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
378
379def filter_qmp_testfiles(qmsg):
380    def _filter(_key, value):
381        if is_str(value):
382            return filter_testfiles(value)
383        return value
384    return filter_qmp(qmsg, _filter)
385
386def filter_generated_node_ids(msg):
387    return re.sub("#block[0-9]+", "NODE_NAME", msg)
388
389def filter_img_info(output, filename):
390    lines = []
391    for line in output.split('\n'):
392        if 'disk size' in line or 'actual-size' in line:
393            continue
394        line = line.replace(filename, 'TEST_IMG')
395        line = filter_testfiles(line)
396        line = line.replace(imgfmt, 'IMGFMT')
397        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398        line = re.sub('uuid: [-a-f0-9]+',
399                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400                      line)
401        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402        lines.append(line)
403    return '\n'.join(lines)
404
405def filter_imgfmt(msg):
406    return msg.replace(imgfmt, 'IMGFMT')
407
408def filter_qmp_imgfmt(qmsg):
409    def _filter(_key, value):
410        if is_str(value):
411            return filter_imgfmt(value)
412        return value
413    return filter_qmp(qmsg, _filter)
414
415
416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
417
418def log(msg: Msg,
419        filters: Iterable[Callable[[Msg], Msg]] = (),
420        indent: Optional[int] = None) -> None:
421    """
422    Logs either a string message or a JSON serializable message (like QMP).
423    If indent is provided, JSON serializable messages are pretty-printed.
424    """
425    for flt in filters:
426        msg = flt(msg)
427    if isinstance(msg, (dict, list)):
428        # Don't sort if it's already sorted
429        do_sort = not isinstance(msg, OrderedDict)
430        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431    else:
432        test_logger.info(msg)
433
434class Timeout:
435    def __init__(self, seconds, errmsg="Timeout"):
436        self.seconds = seconds
437        self.errmsg = errmsg
438    def __enter__(self):
439        signal.signal(signal.SIGALRM, self.timeout)
440        signal.setitimer(signal.ITIMER_REAL, self.seconds)
441        return self
442    def __exit__(self, exc_type, value, traceback):
443        signal.setitimer(signal.ITIMER_REAL, 0)
444        return False
445    def timeout(self, signum, frame):
446        raise Exception(self.errmsg)
447
448def file_pattern(name):
449    return "{0}-{1}".format(os.getpid(), name)
450
451class FilePaths:
452    """
453    FilePaths is an auto-generated filename that cleans itself up.
454
455    Use this context manager to generate filenames and ensure that the file
456    gets deleted::
457
458        with FilePaths(['test.img']) as img_path:
459            qemu_img('create', img_path, '1G')
460        # migration_sock_path is automatically deleted
461    """
462    def __init__(self, names, base_dir=test_dir):
463        self.paths = []
464        for name in names:
465            self.paths.append(os.path.join(base_dir, file_pattern(name)))
466
467    def __enter__(self):
468        return self.paths
469
470    def __exit__(self, exc_type, exc_val, exc_tb):
471        try:
472            for path in self.paths:
473                os.remove(path)
474        except OSError:
475            pass
476        return False
477
478class FilePath(FilePaths):
479    """
480    FilePath is a specialization of FilePaths that takes a single filename.
481    """
482    def __init__(self, name, base_dir=test_dir):
483        super(FilePath, self).__init__([name], base_dir)
484
485    def __enter__(self):
486        return self.paths[0]
487
488def file_path_remover():
489    for path in reversed(file_path_remover.paths):
490        try:
491            os.remove(path)
492        except OSError:
493            pass
494
495
496def file_path(*names, base_dir=test_dir):
497    ''' Another way to get auto-generated filename that cleans itself up.
498
499    Use is as simple as:
500
501    img_a, img_b = file_path('a.img', 'b.img')
502    sock = file_path('socket')
503    '''
504
505    if not hasattr(file_path_remover, 'paths'):
506        file_path_remover.paths = []
507        atexit.register(file_path_remover)
508
509    paths = []
510    for name in names:
511        filename = file_pattern(name)
512        path = os.path.join(base_dir, filename)
513        file_path_remover.paths.append(path)
514        paths.append(path)
515
516    return paths[0] if len(paths) == 1 else paths
517
518def remote_filename(path):
519    if imgproto == 'file':
520        return path
521    elif imgproto == 'ssh':
522        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
523    else:
524        raise Exception("Protocol %s not supported" % (imgproto))
525
526class VM(qtest.QEMUQtestMachine):
527    '''A QEMU VM'''
528
529    def __init__(self, path_suffix=''):
530        name = "qemu%s-%d" % (path_suffix, os.getpid())
531        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
532                                 test_dir=test_dir,
533                                 socket_scm_helper=socket_scm_helper,
534                                 sock_dir=sock_dir)
535        self._num_drives = 0
536
537    def add_object(self, opts):
538        self._args.append('-object')
539        self._args.append(opts)
540        return self
541
542    def add_device(self, opts):
543        self._args.append('-device')
544        self._args.append(opts)
545        return self
546
547    def add_drive_raw(self, opts):
548        self._args.append('-drive')
549        self._args.append(opts)
550        return self
551
552    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
553        '''Add a virtio-blk drive to the VM'''
554        options = ['if=%s' % interface,
555                   'id=drive%d' % self._num_drives]
556
557        if path is not None:
558            options.append('file=%s' % path)
559            options.append('format=%s' % img_format)
560            options.append('cache=%s' % cachemode)
561            options.append('aio=%s' % aiomode)
562
563        if opts:
564            options.append(opts)
565
566        if img_format == 'luks' and 'key-secret' not in opts:
567            # default luks support
568            if luks_default_secret_object not in self._args:
569                self.add_object(luks_default_secret_object)
570
571            options.append(luks_default_key_secret_opt)
572
573        self._args.append('-drive')
574        self._args.append(','.join(options))
575        self._num_drives += 1
576        return self
577
578    def add_blockdev(self, opts):
579        self._args.append('-blockdev')
580        if isinstance(opts, str):
581            self._args.append(opts)
582        else:
583            self._args.append(','.join(opts))
584        return self
585
586    def add_incoming(self, addr):
587        self._args.append('-incoming')
588        self._args.append(addr)
589        return self
590
591    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
592        cmd = 'human-monitor-command'
593        kwargs = {'command-line': command_line}
594        if use_log:
595            return self.qmp_log(cmd, **kwargs)
596        else:
597            return self.qmp(cmd, **kwargs)
598
599    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
600        """Pause drive r/w operations"""
601        if not event:
602            self.pause_drive(drive, "read_aio")
603            self.pause_drive(drive, "write_aio")
604            return
605        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
606
607    def resume_drive(self, drive: str) -> None:
608        """Resume drive r/w operations"""
609        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
610
611    def hmp_qemu_io(self, drive: str, cmd: str,
612                    use_log: bool = False) -> QMPMessage:
613        """Write to a given drive using an HMP command"""
614        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
615
616    def flatten_qmp_object(self, obj, output=None, basestr=''):
617        if output is None:
618            output = dict()
619        if isinstance(obj, list):
620            for i, item in enumerate(obj):
621                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
622        elif isinstance(obj, dict):
623            for key in obj:
624                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
625        else:
626            output[basestr[:-1]] = obj # Strip trailing '.'
627        return output
628
629    def qmp_to_opts(self, obj):
630        obj = self.flatten_qmp_object(obj)
631        output_list = list()
632        for key in obj:
633            output_list += [key + '=' + obj[key]]
634        return ','.join(output_list)
635
636    def get_qmp_events_filtered(self, wait=60.0):
637        result = []
638        for ev in self.get_qmp_events(wait=wait):
639            result.append(filter_qmp_event(ev))
640        return result
641
642    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
643        full_cmd = OrderedDict((
644            ("execute", cmd),
645            ("arguments", ordered_qmp(kwargs))
646        ))
647        log(full_cmd, filters, indent=indent)
648        result = self.qmp(cmd, **kwargs)
649        log(result, filters, indent=indent)
650        return result
651
652    # Returns None on success, and an error string on failure
653    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
654                pre_finalize=None, cancel=False, wait=60.0):
655        """
656        run_job moves a job from creation through to dismissal.
657
658        :param job: String. ID of recently-launched job
659        :param auto_finalize: Bool. True if the job was launched with
660                              auto_finalize. Defaults to True.
661        :param auto_dismiss: Bool. True if the job was launched with
662                             auto_dismiss=True. Defaults to False.
663        :param pre_finalize: Callback. A callable that takes no arguments to be
664                             invoked prior to issuing job-finalize, if any.
665        :param cancel: Bool. When true, cancels the job after the pre_finalize
666                       callback.
667        :param wait: Float. Timeout value specifying how long to wait for any
668                     event, in seconds. Defaults to 60.0.
669        """
670        match_device = {'data': {'device': job}}
671        match_id = {'data': {'id': job}}
672        events = [
673            ('BLOCK_JOB_COMPLETED', match_device),
674            ('BLOCK_JOB_CANCELLED', match_device),
675            ('BLOCK_JOB_ERROR', match_device),
676            ('BLOCK_JOB_READY', match_device),
677            ('BLOCK_JOB_PENDING', match_id),
678            ('JOB_STATUS_CHANGE', match_id)
679        ]
680        error = None
681        while True:
682            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
683            if ev['event'] != 'JOB_STATUS_CHANGE':
684                log(ev)
685                continue
686            status = ev['data']['status']
687            if status == 'aborting':
688                result = self.qmp('query-jobs')
689                for j in result['return']:
690                    if j['id'] == job:
691                        error = j['error']
692                        log('Job failed: %s' % (j['error']))
693            elif status == 'ready':
694                self.qmp_log('job-complete', id=job)
695            elif status == 'pending' and not auto_finalize:
696                if pre_finalize:
697                    pre_finalize()
698                if cancel:
699                    self.qmp_log('job-cancel', id=job)
700                else:
701                    self.qmp_log('job-finalize', id=job)
702            elif status == 'concluded' and not auto_dismiss:
703                self.qmp_log('job-dismiss', id=job)
704            elif status == 'null':
705                return error
706
707    # Returns None on success, and an error string on failure
708    def blockdev_create(self, options, job_id='job0', filters=None):
709        if filters is None:
710            filters = [filter_qmp_testfiles]
711        result = self.qmp_log('blockdev-create', filters=filters,
712                              job_id=job_id, options=options)
713
714        if 'return' in result:
715            assert result['return'] == {}
716            job_result = self.run_job(job_id)
717        else:
718            job_result = result['error']
719
720        log("")
721        return job_result
722
723    def enable_migration_events(self, name):
724        log('Enabling migration QMP events on %s...' % name)
725        log(self.qmp('migrate-set-capabilities', capabilities=[
726            {
727                'capability': 'events',
728                'state': True
729            }
730        ]))
731
732    def wait_migration(self, expect_runstate):
733        while True:
734            event = self.event_wait('MIGRATION')
735            log(event, filters=[filter_qmp_event])
736            if event['data']['status'] == 'completed':
737                break
738        # The event may occur in finish-migrate, so wait for the expected
739        # post-migration runstate
740        while self.qmp('query-status')['return']['status'] != expect_runstate:
741            pass
742
743    def node_info(self, node_name):
744        nodes = self.qmp('query-named-block-nodes')
745        for x in nodes['return']:
746            if x['node-name'] == node_name:
747                return x
748        return None
749
750    def query_bitmaps(self):
751        res = self.qmp("query-named-block-nodes")
752        return {device['node-name']: device['dirty-bitmaps']
753                for device in res['return'] if 'dirty-bitmaps' in device}
754
755    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
756        """
757        get a specific bitmap from the object returned by query_bitmaps.
758        :param recording: If specified, filter results by the specified value.
759        :param bitmaps: If specified, use it instead of call query_bitmaps()
760        """
761        if bitmaps is None:
762            bitmaps = self.query_bitmaps()
763
764        for bitmap in bitmaps[node_name]:
765            if bitmap.get('name', '') == bitmap_name:
766                if recording is None or bitmap.get('recording') == recording:
767                    return bitmap
768        return None
769
770    def check_bitmap_status(self, node_name, bitmap_name, fields):
771        ret = self.get_bitmap(node_name, bitmap_name)
772
773        return fields.items() <= ret.items()
774
775    def assert_block_path(self, root, path, expected_node, graph=None):
776        """
777        Check whether the node under the given path in the block graph
778        is @expected_node.
779
780        @root is the node name of the node where the @path is rooted.
781
782        @path is a string that consists of child names separated by
783        slashes.  It must begin with a slash.
784
785        Examples for @root + @path:
786          - root="qcow2-node", path="/backing/file"
787          - root="quorum-node", path="/children.2/file"
788
789        Hypothetically, @path could be empty, in which case it would
790        point to @root.  However, in practice this case is not useful
791        and hence not allowed.
792
793        @expected_node may be None.  (All elements of the path but the
794        leaf must still exist.)
795
796        @graph may be None or the result of an x-debug-query-block-graph
797        call that has already been performed.
798        """
799        if graph is None:
800            graph = self.qmp('x-debug-query-block-graph')['return']
801
802        iter_path = iter(path.split('/'))
803
804        # Must start with a /
805        assert next(iter_path) == ''
806
807        node = next((node for node in graph['nodes'] if node['name'] == root),
808                    None)
809
810        # An empty @path is not allowed, so the root node must be present
811        assert node is not None, 'Root node %s not found' % root
812
813        for child_name in iter_path:
814            assert node is not None, 'Cannot follow path %s%s' % (root, path)
815
816            try:
817                node_id = next(edge['child'] for edge in graph['edges']
818                               if (edge['parent'] == node['id'] and
819                                   edge['name'] == child_name))
820
821                node = next(node for node in graph['nodes']
822                            if node['id'] == node_id)
823
824            except StopIteration:
825                node = None
826
827        if node is None:
828            assert expected_node is None, \
829                   'No node found under %s (but expected %s)' % \
830                   (path, expected_node)
831        else:
832            assert node['name'] == expected_node, \
833                   'Found node %s under %s (but expected %s)' % \
834                   (node['name'], path, expected_node)
835
836index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
837
838class QMPTestCase(unittest.TestCase):
839    '''Abstract base class for QMP test cases'''
840
841    def __init__(self, *args, **kwargs):
842        super().__init__(*args, **kwargs)
843        # Many users of this class set a VM property we rely on heavily
844        # in the methods below.
845        self.vm = None
846
847    def dictpath(self, d, path):
848        '''Traverse a path in a nested dict'''
849        for component in path.split('/'):
850            m = index_re.match(component)
851            if m:
852                component, idx = m.groups()
853                idx = int(idx)
854
855            if not isinstance(d, dict) or component not in d:
856                self.fail(f'failed path traversal for "{path}" in "{d}"')
857            d = d[component]
858
859            if m:
860                if not isinstance(d, list):
861                    self.fail(f'path component "{component}" in "{path}" '
862                              f'is not a list in "{d}"')
863                try:
864                    d = d[idx]
865                except IndexError:
866                    self.fail(f'invalid index "{idx}" in path "{path}" '
867                              f'in "{d}"')
868        return d
869
870    def assert_qmp_absent(self, d, path):
871        try:
872            result = self.dictpath(d, path)
873        except AssertionError:
874            return
875        self.fail('path "%s" has value "%s"' % (path, str(result)))
876
877    def assert_qmp(self, d, path, value):
878        '''Assert that the value for a specific path in a QMP dict
879           matches.  When given a list of values, assert that any of
880           them matches.'''
881
882        result = self.dictpath(d, path)
883
884        # [] makes no sense as a list of valid values, so treat it as
885        # an actual single value.
886        if isinstance(value, list) and value != []:
887            for v in value:
888                if result == v:
889                    return
890            self.fail('no match for "%s" in %s' % (str(result), str(value)))
891        else:
892            self.assertEqual(result, value,
893                             '"%s" is "%s", expected "%s"'
894                             % (path, str(result), str(value)))
895
896    def assert_no_active_block_jobs(self):
897        result = self.vm.qmp('query-block-jobs')
898        self.assert_qmp(result, 'return', [])
899
900    def assert_has_block_node(self, node_name=None, file_name=None):
901        """Issue a query-named-block-nodes and assert node_name and/or
902        file_name is present in the result"""
903        def check_equal_or_none(a, b):
904            return a is None or b is None or a == b
905        assert node_name or file_name
906        result = self.vm.qmp('query-named-block-nodes')
907        for x in result["return"]:
908            if check_equal_or_none(x.get("node-name"), node_name) and \
909                    check_equal_or_none(x.get("file"), file_name):
910                return
911        self.fail("Cannot find %s %s in result:\n%s" %
912                  (node_name, file_name, result))
913
914    def assert_json_filename_equal(self, json_filename, reference):
915        '''Asserts that the given filename is a json: filename and that its
916           content is equal to the given reference object'''
917        self.assertEqual(json_filename[:5], 'json:')
918        self.assertEqual(
919            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
920            self.vm.flatten_qmp_object(reference)
921        )
922
923    def cancel_and_wait(self, drive='drive0', force=False,
924                        resume=False, wait=60.0):
925        '''Cancel a block job and wait for it to finish, returning the event'''
926        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
927        self.assert_qmp(result, 'return', {})
928
929        if resume:
930            self.vm.resume_drive(drive)
931
932        cancelled = False
933        result = None
934        while not cancelled:
935            for event in self.vm.get_qmp_events(wait=wait):
936                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
937                   event['event'] == 'BLOCK_JOB_CANCELLED':
938                    self.assert_qmp(event, 'data/device', drive)
939                    result = event
940                    cancelled = True
941                elif event['event'] == 'JOB_STATUS_CHANGE':
942                    self.assert_qmp(event, 'data/id', drive)
943
944
945        self.assert_no_active_block_jobs()
946        return result
947
948    def wait_until_completed(self, drive='drive0', check_offset=True,
949                             wait=60.0, error=None):
950        '''Wait for a block job to finish, returning the event'''
951        while True:
952            for event in self.vm.get_qmp_events(wait=wait):
953                if event['event'] == 'BLOCK_JOB_COMPLETED':
954                    self.assert_qmp(event, 'data/device', drive)
955                    if error is None:
956                        self.assert_qmp_absent(event, 'data/error')
957                        if check_offset:
958                            self.assert_qmp(event, 'data/offset',
959                                            event['data']['len'])
960                    else:
961                        self.assert_qmp(event, 'data/error', error)
962                    self.assert_no_active_block_jobs()
963                    return event
964                if event['event'] == 'JOB_STATUS_CHANGE':
965                    self.assert_qmp(event, 'data/id', drive)
966
967    def wait_ready(self, drive='drive0'):
968        """Wait until a BLOCK_JOB_READY event, and return the event."""
969        f = {'data': {'type': 'mirror', 'device': drive}}
970        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
971
972    def wait_ready_and_cancel(self, drive='drive0'):
973        self.wait_ready(drive=drive)
974        event = self.cancel_and_wait(drive=drive)
975        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
976        self.assert_qmp(event, 'data/type', 'mirror')
977        self.assert_qmp(event, 'data/offset', event['data']['len'])
978
979    def complete_and_wait(self, drive='drive0', wait_ready=True,
980                          completion_error=None):
981        '''Complete a block job and wait for it to finish'''
982        if wait_ready:
983            self.wait_ready(drive=drive)
984
985        result = self.vm.qmp('block-job-complete', device=drive)
986        self.assert_qmp(result, 'return', {})
987
988        event = self.wait_until_completed(drive=drive, error=completion_error)
989        self.assert_qmp(event, 'data/type', 'mirror')
990
991    def pause_wait(self, job_id='job0'):
992        with Timeout(3, "Timeout waiting for job to pause"):
993            while True:
994                result = self.vm.qmp('query-block-jobs')
995                found = False
996                for job in result['return']:
997                    if job['device'] == job_id:
998                        found = True
999                        if job['paused'] and not job['busy']:
1000                            return job
1001                        break
1002                assert found
1003
1004    def pause_job(self, job_id='job0', wait=True):
1005        result = self.vm.qmp('block-job-pause', device=job_id)
1006        self.assert_qmp(result, 'return', {})
1007        if wait:
1008            return self.pause_wait(job_id)
1009        return result
1010
1011    def case_skip(self, reason):
1012        '''Skip this test case'''
1013        case_notrun(reason)
1014        self.skipTest(reason)
1015
1016
1017def notrun(reason):
1018    '''Skip this test suite'''
1019    # Each test in qemu-iotests has a number ("seq")
1020    seq = os.path.basename(sys.argv[0])
1021
1022    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1023    logger.warning("%s not run: %s", seq, reason)
1024    sys.exit(0)
1025
1026def case_notrun(reason):
1027    '''Mark this test case as not having been run (without actually
1028    skipping it, that is left to the caller).  See
1029    QMPTestCase.case_skip() for a variant that actually skips the
1030    current test case.'''
1031
1032    # Each test in qemu-iotests has a number ("seq")
1033    seq = os.path.basename(sys.argv[0])
1034
1035    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1036        '    [case not run] ' + reason + '\n')
1037
1038def _verify_image_format(supported_fmts: Sequence[str] = (),
1039                         unsupported_fmts: Sequence[str] = ()) -> None:
1040    assert not (supported_fmts and unsupported_fmts)
1041
1042    if 'generic' in supported_fmts and \
1043            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1044        # similar to
1045        #   _supported_fmt generic
1046        # for bash tests
1047        if imgfmt == 'luks':
1048            verify_working_luks()
1049        return
1050
1051    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1052    if not_sup or (imgfmt in unsupported_fmts):
1053        notrun('not suitable for this image format: %s' % imgfmt)
1054
1055    if imgfmt == 'luks':
1056        verify_working_luks()
1057
1058def _verify_protocol(supported: Sequence[str] = (),
1059                     unsupported: Sequence[str] = ()) -> None:
1060    assert not (supported and unsupported)
1061
1062    if 'generic' in supported:
1063        return
1064
1065    not_sup = supported and (imgproto not in supported)
1066    if not_sup or (imgproto in unsupported):
1067        notrun('not suitable for this protocol: %s' % imgproto)
1068
1069def _verify_platform(supported: Sequence[str] = (),
1070                     unsupported: Sequence[str] = ()) -> None:
1071    if any((sys.platform.startswith(x) for x in unsupported)):
1072        notrun('not suitable for this OS: %s' % sys.platform)
1073
1074    if supported:
1075        if not any((sys.platform.startswith(x) for x in supported)):
1076            notrun('not suitable for this OS: %s' % sys.platform)
1077
1078def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1079    if supported_cache_modes and (cachemode not in supported_cache_modes):
1080        notrun('not suitable for this cache mode: %s' % cachemode)
1081
1082def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1083    if supported_aio_modes and (aiomode not in supported_aio_modes):
1084        notrun('not suitable for this aio mode: %s' % aiomode)
1085
1086def supports_quorum():
1087    return 'quorum' in qemu_img_pipe('--help')
1088
1089def verify_quorum():
1090    '''Skip test suite if quorum support is not available'''
1091    if not supports_quorum():
1092        notrun('quorum support missing')
1093
1094def has_working_luks() -> Tuple[bool, str]:
1095    """
1096    Check whether our LUKS driver can actually create images
1097    (this extends to LUKS encryption for qcow2).
1098
1099    If not, return the reason why.
1100    """
1101
1102    img_file = f'{test_dir}/luks-test.luks'
1103    (output, status) = \
1104        qemu_img_pipe_and_status('create', '-f', 'luks',
1105                                 '--object', luks_default_secret_object,
1106                                 '-o', luks_default_key_secret_opt,
1107                                 '-o', 'iter-time=10',
1108                                 img_file, '1G')
1109    try:
1110        os.remove(img_file)
1111    except OSError:
1112        pass
1113
1114    if status != 0:
1115        reason = output
1116        for line in output.splitlines():
1117            if img_file + ':' in line:
1118                reason = line.split(img_file + ':', 1)[1].strip()
1119                break
1120
1121        return (False, reason)
1122    else:
1123        return (True, '')
1124
1125def verify_working_luks():
1126    """
1127    Skip test suite if LUKS does not work
1128    """
1129    (working, reason) = has_working_luks()
1130    if not working:
1131        notrun(reason)
1132
1133def qemu_pipe(*args):
1134    """
1135    Run qemu with an option to print something and exit (e.g. a help option).
1136
1137    :return: QEMU's stdout output.
1138    """
1139    args = [qemu_prog] + qemu_opts + list(args)
1140    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1141                            stderr=subprocess.STDOUT,
1142                            universal_newlines=True)
1143    output = subp.communicate()[0]
1144    if subp.returncode < 0:
1145        sys.stderr.write('qemu received signal %i: %s\n' %
1146                         (-subp.returncode, ' '.join(args)))
1147    return output
1148
1149def supported_formats(read_only=False):
1150    '''Set 'read_only' to True to check ro-whitelist
1151       Otherwise, rw-whitelist is checked'''
1152
1153    if not hasattr(supported_formats, "formats"):
1154        supported_formats.formats = {}
1155
1156    if read_only not in supported_formats.formats:
1157        format_message = qemu_pipe("-drive", "format=help")
1158        line = 1 if read_only else 0
1159        supported_formats.formats[read_only] = \
1160            format_message.splitlines()[line].split(":")[1].split()
1161
1162    return supported_formats.formats[read_only]
1163
1164def skip_if_unsupported(required_formats=(), read_only=False):
1165    '''Skip Test Decorator
1166       Runs the test if all the required formats are whitelisted'''
1167    def skip_test_decorator(func):
1168        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1169                         **kwargs: Dict[str, Any]) -> None:
1170            if callable(required_formats):
1171                fmts = required_formats(test_case)
1172            else:
1173                fmts = required_formats
1174
1175            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1176            if usf_list:
1177                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1178                test_case.case_skip(msg)
1179            else:
1180                func(test_case, *args, **kwargs)
1181        return func_wrapper
1182    return skip_test_decorator
1183
1184def skip_for_formats(formats: Sequence[str] = ()) \
1185    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1186                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1187    '''Skip Test Decorator
1188       Skips the test for the given formats'''
1189    def skip_test_decorator(func):
1190        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1191                         **kwargs: Dict[str, Any]) -> None:
1192            if imgfmt in formats:
1193                msg = f'{test_case}: Skipped for format {imgfmt}'
1194                test_case.case_skip(msg)
1195            else:
1196                func(test_case, *args, **kwargs)
1197        return func_wrapper
1198    return skip_test_decorator
1199
1200def skip_if_user_is_root(func):
1201    '''Skip Test Decorator
1202       Runs the test only without root permissions'''
1203    def func_wrapper(*args, **kwargs):
1204        if os.getuid() == 0:
1205            case_notrun('{}: cannot be run as root'.format(args[0]))
1206            return None
1207        else:
1208            return func(*args, **kwargs)
1209    return func_wrapper
1210
1211def execute_unittest(debug=False):
1212    """Executes unittests within the calling module."""
1213
1214    verbosity = 2 if debug else 1
1215
1216    if debug:
1217        output = sys.stdout
1218    else:
1219        # We need to filter out the time taken from the output so that
1220        # qemu-iotest can reliably diff the results against master output.
1221        output = io.StringIO()
1222
1223    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1224                                     verbosity=verbosity)
1225    try:
1226        # unittest.main() will use sys.exit(); so expect a SystemExit
1227        # exception
1228        unittest.main(testRunner=runner)
1229    finally:
1230        # We need to filter out the time taken from the output so that
1231        # qemu-iotest can reliably diff the results against master output.
1232        if not debug:
1233            out = output.getvalue()
1234            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1235
1236            # Hide skipped tests from the reference output
1237            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1238            out_first_line, out_rest = out.split('\n', 1)
1239            out = out_first_line.replace('s', '.') + '\n' + out_rest
1240
1241            sys.stderr.write(out)
1242
1243def execute_setup_common(supported_fmts: Sequence[str] = (),
1244                         supported_platforms: Sequence[str] = (),
1245                         supported_cache_modes: Sequence[str] = (),
1246                         supported_aio_modes: Sequence[str] = (),
1247                         unsupported_fmts: Sequence[str] = (),
1248                         supported_protocols: Sequence[str] = (),
1249                         unsupported_protocols: Sequence[str] = ()) -> bool:
1250    """
1251    Perform necessary setup for either script-style or unittest-style tests.
1252
1253    :return: Bool; Whether or not debug mode has been requested via the CLI.
1254    """
1255    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1256
1257    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1258    # indicate that we're not being run via "check". There may be
1259    # other things set up by "check" that individual test cases rely
1260    # on.
1261    if test_dir is None or qemu_default_machine is None:
1262        sys.stderr.write('Please run this test via the "check" script\n')
1263        sys.exit(os.EX_USAGE)
1264
1265    debug = '-d' in sys.argv
1266    if debug:
1267        sys.argv.remove('-d')
1268    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1269
1270    _verify_image_format(supported_fmts, unsupported_fmts)
1271    _verify_protocol(supported_protocols, unsupported_protocols)
1272    _verify_platform(supported=supported_platforms)
1273    _verify_cache_mode(supported_cache_modes)
1274    _verify_aio_mode(supported_aio_modes)
1275
1276    return debug
1277
1278def execute_test(*args, test_function=None, **kwargs):
1279    """Run either unittest or script-style tests."""
1280
1281    debug = execute_setup_common(*args, **kwargs)
1282    if not test_function:
1283        execute_unittest(debug)
1284    else:
1285        test_function()
1286
1287def activate_logging():
1288    """Activate iotests.log() output to stdout for script-style tests."""
1289    handler = logging.StreamHandler(stream=sys.stdout)
1290    formatter = logging.Formatter('%(message)s')
1291    handler.setFormatter(formatter)
1292    test_logger.addHandler(handler)
1293    test_logger.setLevel(logging.INFO)
1294    test_logger.propagate = False
1295
1296# This is called from script-style iotests without a single point of entry
1297def script_initialize(*args, **kwargs):
1298    """Initialize script-style tests without running any tests."""
1299    activate_logging()
1300    execute_setup_common(*args, **kwargs)
1301
1302# This is called from script-style iotests with a single point of entry
1303def script_main(test_function, *args, **kwargs):
1304    """Run script-style tests outside of the unittest framework"""
1305    activate_logging()
1306    execute_test(*args, test_function=test_function, **kwargs)
1307
1308# This is called from unittest style iotests
1309def main(*args, **kwargs):
1310    """Run tests using the unittest framework"""
1311    execute_test(*args, **kwargs)
1312