xref: /qemu/tests/qemu-iotests/iotests.py (revision 12b35405)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31from typing import (Any, Callable, Dict, Iterable,
32                    List, Optional, Sequence, Tuple, TypeVar)
33import unittest
34
35# pylint: disable=import-error, wrong-import-position
36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37from qemu import qtest
38
39assert sys.version_info >= (3, 6)
40
41# Type Aliases
42QMPResponse = Dict[str, Any]
43
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_verbose(*args):
145    '''Run qemu-img without suppressing its output and return the exit code'''
146    exitcode = subprocess.call(qemu_img_args + list(args))
147    if exitcode < 0:
148        sys.stderr.write('qemu-img received signal %i: %s\n'
149                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
150    return exitcode
151
152def qemu_img_pipe(*args: str) -> str:
153    '''Run qemu-img and return its output'''
154    return qemu_img_pipe_and_status(*args)[0]
155
156def qemu_img_log(*args):
157    result = qemu_img_pipe(*args)
158    log(result, filters=[filter_testfiles])
159    return result
160
161def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
162    args = ['info']
163    if imgopts:
164        args.append('--image-opts')
165    else:
166        args += ['-f', imgfmt]
167    args += extra_args
168    args.append(filename)
169
170    output = qemu_img_pipe(*args)
171    if not filter_path:
172        filter_path = filename
173    log(filter_img_info(output, filter_path))
174
175def qemu_io(*args):
176    '''Run qemu-io and return the stdout data'''
177    args = qemu_io_args + list(args)
178    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
179                            stderr=subprocess.STDOUT,
180                            universal_newlines=True)
181    output = subp.communicate()[0]
182    if subp.returncode < 0:
183        sys.stderr.write('qemu-io received signal %i: %s\n'
184                         % (-subp.returncode, ' '.join(args)))
185    return output
186
187def qemu_io_log(*args):
188    result = qemu_io(*args)
189    log(result, filters=[filter_testfiles, filter_qemu_io])
190    return result
191
192def qemu_io_silent(*args):
193    '''Run qemu-io and return the exit code, suppressing stdout'''
194    args = qemu_io_args + list(args)
195    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
196    if exitcode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n' %
198                         (-exitcode, ' '.join(args)))
199    return exitcode
200
201def qemu_io_silent_check(*args):
202    '''Run qemu-io and return the true if subprocess returned 0'''
203    args = qemu_io_args + list(args)
204    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
205                               stderr=subprocess.STDOUT)
206    return exitcode == 0
207
208def get_virtio_scsi_device():
209    if qemu_default_machine == 's390-ccw-virtio':
210        return 'virtio-scsi-ccw'
211    return 'virtio-scsi-pci'
212
213class QemuIoInteractive:
214    def __init__(self, *args):
215        self.args = qemu_io_args + list(args)
216        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
217                                   stdout=subprocess.PIPE,
218                                   stderr=subprocess.STDOUT,
219                                   universal_newlines=True)
220        assert self._p.stdout.read(9) == 'qemu-io> '
221
222    def close(self):
223        self._p.communicate('q\n')
224
225    def _read_output(self):
226        pattern = 'qemu-io> '
227        n = len(pattern)
228        pos = 0
229        s = []
230        while pos != n:
231            c = self._p.stdout.read(1)
232            # check unexpected EOF
233            assert c != ''
234            s.append(c)
235            if c == pattern[pos]:
236                pos += 1
237            else:
238                pos = 0
239
240        return ''.join(s[:-n])
241
242    def cmd(self, cmd):
243        # quit command is in close(), '\n' is added automatically
244        assert '\n' not in cmd
245        cmd = cmd.strip()
246        assert cmd not in ('q', 'quit')
247        self._p.stdin.write(cmd + '\n')
248        self._p.stdin.flush()
249        return self._read_output()
250
251
252def qemu_nbd(*args):
253    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
254    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
255
256def qemu_nbd_early_pipe(*args):
257    '''Run qemu-nbd in daemon mode and return both the parent's exit code
258       and its output in case of an error'''
259    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
260                            stdout=subprocess.PIPE,
261                            universal_newlines=True)
262    output = subp.communicate()[0]
263    if subp.returncode < 0:
264        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
265                         (-subp.returncode,
266                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
267
268    return subp.returncode, output if subp.returncode else ''
269
270def qemu_nbd_popen(*args):
271    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
272    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
273
274def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
275    '''Return True if two image files are identical'''
276    return qemu_img('compare', '-f', fmt1,
277                    '-F', fmt2, img1, img2) == 0
278
279def create_image(name, size):
280    '''Create a fully-allocated raw image with sector markers'''
281    file = open(name, 'wb')
282    i = 0
283    while i < size:
284        sector = struct.pack('>l504xl', i // 512, i // 512)
285        file.write(sector)
286        i = i + 512
287    file.close()
288
289def image_size(img):
290    '''Return image's virtual size'''
291    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
292    return json.loads(r)['virtual-size']
293
294def is_str(val):
295    return isinstance(val, str)
296
297test_dir_re = re.compile(r"%s" % test_dir)
298def filter_test_dir(msg):
299    return test_dir_re.sub("TEST_DIR", msg)
300
301win32_re = re.compile(r"\r")
302def filter_win32(msg):
303    return win32_re.sub("", msg)
304
305qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
306                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
307                        r"and [0-9\/.inf]* ops\/sec\)")
308def filter_qemu_io(msg):
309    msg = filter_win32(msg)
310    return qemu_io_re.sub("X ops; XX:XX:XX.X "
311                          "(XXX YYY/sec and XXX ops/sec)", msg)
312
313chown_re = re.compile(r"chown [0-9]+:[0-9]+")
314def filter_chown(msg):
315    return chown_re.sub("chown UID:GID", msg)
316
317def filter_qmp_event(event):
318    '''Filter a QMP event dict'''
319    event = dict(event)
320    if 'timestamp' in event:
321        event['timestamp']['seconds'] = 'SECS'
322        event['timestamp']['microseconds'] = 'USECS'
323    return event
324
325def filter_qmp(qmsg, filter_fn):
326    '''Given a string filter, filter a QMP object's values.
327    filter_fn takes a (key, value) pair.'''
328    # Iterate through either lists or dicts;
329    if isinstance(qmsg, list):
330        items = enumerate(qmsg)
331    else:
332        items = qmsg.items()
333
334    for k, v in items:
335        if isinstance(v, (dict, list)):
336            qmsg[k] = filter_qmp(v, filter_fn)
337        else:
338            qmsg[k] = filter_fn(k, v)
339    return qmsg
340
341def filter_testfiles(msg):
342    prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
343    return msg.replace(prefix, 'TEST_DIR/PID-')
344
345def filter_qmp_testfiles(qmsg):
346    def _filter(_key, value):
347        if is_str(value):
348            return filter_testfiles(value)
349        return value
350    return filter_qmp(qmsg, _filter)
351
352def filter_generated_node_ids(msg):
353    return re.sub("#block[0-9]+", "NODE_NAME", msg)
354
355def filter_img_info(output, filename):
356    lines = []
357    for line in output.split('\n'):
358        if 'disk size' in line or 'actual-size' in line:
359            continue
360        line = line.replace(filename, 'TEST_IMG')
361        line = filter_testfiles(line)
362        line = line.replace(imgfmt, 'IMGFMT')
363        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
364        line = re.sub('uuid: [-a-f0-9]+',
365                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
366                      line)
367        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
368        lines.append(line)
369    return '\n'.join(lines)
370
371def filter_imgfmt(msg):
372    return msg.replace(imgfmt, 'IMGFMT')
373
374def filter_qmp_imgfmt(qmsg):
375    def _filter(_key, value):
376        if is_str(value):
377            return filter_imgfmt(value)
378        return value
379    return filter_qmp(qmsg, _filter)
380
381
382Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
383
384def log(msg: Msg,
385        filters: Iterable[Callable[[Msg], Msg]] = (),
386        indent: Optional[int] = None) -> None:
387    """
388    Logs either a string message or a JSON serializable message (like QMP).
389    If indent is provided, JSON serializable messages are pretty-printed.
390    """
391    for flt in filters:
392        msg = flt(msg)
393    if isinstance(msg, (dict, list)):
394        # Don't sort if it's already sorted
395        do_sort = not isinstance(msg, OrderedDict)
396        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
397    else:
398        test_logger.info(msg)
399
400class Timeout:
401    def __init__(self, seconds, errmsg="Timeout"):
402        self.seconds = seconds
403        self.errmsg = errmsg
404    def __enter__(self):
405        signal.signal(signal.SIGALRM, self.timeout)
406        signal.setitimer(signal.ITIMER_REAL, self.seconds)
407        return self
408    def __exit__(self, exc_type, value, traceback):
409        signal.setitimer(signal.ITIMER_REAL, 0)
410        return False
411    def timeout(self, signum, frame):
412        raise Exception(self.errmsg)
413
414def file_pattern(name):
415    return "{0}-{1}".format(os.getpid(), name)
416
417class FilePaths:
418    """
419    FilePaths is an auto-generated filename that cleans itself up.
420
421    Use this context manager to generate filenames and ensure that the file
422    gets deleted::
423
424        with FilePaths(['test.img']) as img_path:
425            qemu_img('create', img_path, '1G')
426        # migration_sock_path is automatically deleted
427    """
428    def __init__(self, names, base_dir=test_dir):
429        self.paths = []
430        for name in names:
431            self.paths.append(os.path.join(base_dir, file_pattern(name)))
432
433    def __enter__(self):
434        return self.paths
435
436    def __exit__(self, exc_type, exc_val, exc_tb):
437        try:
438            for path in self.paths:
439                os.remove(path)
440        except OSError:
441            pass
442        return False
443
444class FilePath(FilePaths):
445    """
446    FilePath is a specialization of FilePaths that takes a single filename.
447    """
448    def __init__(self, name, base_dir=test_dir):
449        super(FilePath, self).__init__([name], base_dir)
450
451    def __enter__(self):
452        return self.paths[0]
453
454def file_path_remover():
455    for path in reversed(file_path_remover.paths):
456        try:
457            os.remove(path)
458        except OSError:
459            pass
460
461
462def file_path(*names, base_dir=test_dir):
463    ''' Another way to get auto-generated filename that cleans itself up.
464
465    Use is as simple as:
466
467    img_a, img_b = file_path('a.img', 'b.img')
468    sock = file_path('socket')
469    '''
470
471    if not hasattr(file_path_remover, 'paths'):
472        file_path_remover.paths = []
473        atexit.register(file_path_remover)
474
475    paths = []
476    for name in names:
477        filename = file_pattern(name)
478        path = os.path.join(base_dir, filename)
479        file_path_remover.paths.append(path)
480        paths.append(path)
481
482    return paths[0] if len(paths) == 1 else paths
483
484def remote_filename(path):
485    if imgproto == 'file':
486        return path
487    elif imgproto == 'ssh':
488        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
489    else:
490        raise Exception("Protocol %s not supported" % (imgproto))
491
492class VM(qtest.QEMUQtestMachine):
493    '''A QEMU VM'''
494
495    def __init__(self, path_suffix=''):
496        name = "qemu%s-%d" % (path_suffix, os.getpid())
497        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
498                                 test_dir=test_dir,
499                                 socket_scm_helper=socket_scm_helper,
500                                 sock_dir=sock_dir)
501        self._num_drives = 0
502
503    def add_object(self, opts):
504        self._args.append('-object')
505        self._args.append(opts)
506        return self
507
508    def add_device(self, opts):
509        self._args.append('-device')
510        self._args.append(opts)
511        return self
512
513    def add_drive_raw(self, opts):
514        self._args.append('-drive')
515        self._args.append(opts)
516        return self
517
518    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
519        '''Add a virtio-blk drive to the VM'''
520        options = ['if=%s' % interface,
521                   'id=drive%d' % self._num_drives]
522
523        if path is not None:
524            options.append('file=%s' % path)
525            options.append('format=%s' % img_format)
526            options.append('cache=%s' % cachemode)
527            options.append('aio=%s' % aiomode)
528
529        if opts:
530            options.append(opts)
531
532        if img_format == 'luks' and 'key-secret' not in opts:
533            # default luks support
534            if luks_default_secret_object not in self._args:
535                self.add_object(luks_default_secret_object)
536
537            options.append(luks_default_key_secret_opt)
538
539        self._args.append('-drive')
540        self._args.append(','.join(options))
541        self._num_drives += 1
542        return self
543
544    def add_blockdev(self, opts):
545        self._args.append('-blockdev')
546        if isinstance(opts, str):
547            self._args.append(opts)
548        else:
549            self._args.append(','.join(opts))
550        return self
551
552    def add_incoming(self, addr):
553        self._args.append('-incoming')
554        self._args.append(addr)
555        return self
556
557    def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
558        cmd = 'human-monitor-command'
559        kwargs = {'command-line': command_line}
560        if use_log:
561            return self.qmp_log(cmd, **kwargs)
562        else:
563            return self.qmp(cmd, **kwargs)
564
565    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
566        """Pause drive r/w operations"""
567        if not event:
568            self.pause_drive(drive, "read_aio")
569            self.pause_drive(drive, "write_aio")
570            return
571        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
572
573    def resume_drive(self, drive: str) -> None:
574        """Resume drive r/w operations"""
575        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
576
577    def hmp_qemu_io(self, drive: str, cmd: str,
578                    use_log: bool = False) -> QMPResponse:
579        """Write to a given drive using an HMP command"""
580        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
581
582    def flatten_qmp_object(self, obj, output=None, basestr=''):
583        if output is None:
584            output = dict()
585        if isinstance(obj, list):
586            for i, item in enumerate(obj):
587                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
588        elif isinstance(obj, dict):
589            for key in obj:
590                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
591        else:
592            output[basestr[:-1]] = obj # Strip trailing '.'
593        return output
594
595    def qmp_to_opts(self, obj):
596        obj = self.flatten_qmp_object(obj)
597        output_list = list()
598        for key in obj:
599            output_list += [key + '=' + obj[key]]
600        return ','.join(output_list)
601
602    def get_qmp_events_filtered(self, wait=60.0):
603        result = []
604        for ev in self.get_qmp_events(wait=wait):
605            result.append(filter_qmp_event(ev))
606        return result
607
608    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
609        full_cmd = OrderedDict((
610            ("execute", cmd),
611            ("arguments", ordered_qmp(kwargs))
612        ))
613        log(full_cmd, filters, indent=indent)
614        result = self.qmp(cmd, **kwargs)
615        log(result, filters, indent=indent)
616        return result
617
618    # Returns None on success, and an error string on failure
619    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
620                pre_finalize=None, cancel=False, wait=60.0):
621        """
622        run_job moves a job from creation through to dismissal.
623
624        :param job: String. ID of recently-launched job
625        :param auto_finalize: Bool. True if the job was launched with
626                              auto_finalize. Defaults to True.
627        :param auto_dismiss: Bool. True if the job was launched with
628                             auto_dismiss=True. Defaults to False.
629        :param pre_finalize: Callback. A callable that takes no arguments to be
630                             invoked prior to issuing job-finalize, if any.
631        :param cancel: Bool. When true, cancels the job after the pre_finalize
632                       callback.
633        :param wait: Float. Timeout value specifying how long to wait for any
634                     event, in seconds. Defaults to 60.0.
635        """
636        match_device = {'data': {'device': job}}
637        match_id = {'data': {'id': job}}
638        events = [
639            ('BLOCK_JOB_COMPLETED', match_device),
640            ('BLOCK_JOB_CANCELLED', match_device),
641            ('BLOCK_JOB_ERROR', match_device),
642            ('BLOCK_JOB_READY', match_device),
643            ('BLOCK_JOB_PENDING', match_id),
644            ('JOB_STATUS_CHANGE', match_id)
645        ]
646        error = None
647        while True:
648            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
649            if ev['event'] != 'JOB_STATUS_CHANGE':
650                log(ev)
651                continue
652            status = ev['data']['status']
653            if status == 'aborting':
654                result = self.qmp('query-jobs')
655                for j in result['return']:
656                    if j['id'] == job:
657                        error = j['error']
658                        log('Job failed: %s' % (j['error']))
659            elif status == 'ready':
660                self.qmp_log('job-complete', id=job)
661            elif status == 'pending' and not auto_finalize:
662                if pre_finalize:
663                    pre_finalize()
664                if cancel:
665                    self.qmp_log('job-cancel', id=job)
666                else:
667                    self.qmp_log('job-finalize', id=job)
668            elif status == 'concluded' and not auto_dismiss:
669                self.qmp_log('job-dismiss', id=job)
670            elif status == 'null':
671                return error
672
673    # Returns None on success, and an error string on failure
674    def blockdev_create(self, options, job_id='job0', filters=None):
675        if filters is None:
676            filters = [filter_qmp_testfiles]
677        result = self.qmp_log('blockdev-create', filters=filters,
678                              job_id=job_id, options=options)
679
680        if 'return' in result:
681            assert result['return'] == {}
682            job_result = self.run_job(job_id)
683        else:
684            job_result = result['error']
685
686        log("")
687        return job_result
688
689    def enable_migration_events(self, name):
690        log('Enabling migration QMP events on %s...' % name)
691        log(self.qmp('migrate-set-capabilities', capabilities=[
692            {
693                'capability': 'events',
694                'state': True
695            }
696        ]))
697
698    def wait_migration(self, expect_runstate):
699        while True:
700            event = self.event_wait('MIGRATION')
701            log(event, filters=[filter_qmp_event])
702            if event['data']['status'] == 'completed':
703                break
704        # The event may occur in finish-migrate, so wait for the expected
705        # post-migration runstate
706        while self.qmp('query-status')['return']['status'] != expect_runstate:
707            pass
708
709    def node_info(self, node_name):
710        nodes = self.qmp('query-named-block-nodes')
711        for x in nodes['return']:
712            if x['node-name'] == node_name:
713                return x
714        return None
715
716    def query_bitmaps(self):
717        res = self.qmp("query-named-block-nodes")
718        return {device['node-name']: device['dirty-bitmaps']
719                for device in res['return'] if 'dirty-bitmaps' in device}
720
721    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
722        """
723        get a specific bitmap from the object returned by query_bitmaps.
724        :param recording: If specified, filter results by the specified value.
725        :param bitmaps: If specified, use it instead of call query_bitmaps()
726        """
727        if bitmaps is None:
728            bitmaps = self.query_bitmaps()
729
730        for bitmap in bitmaps[node_name]:
731            if bitmap.get('name', '') == bitmap_name:
732                if recording is None or bitmap.get('recording') == recording:
733                    return bitmap
734        return None
735
736    def check_bitmap_status(self, node_name, bitmap_name, fields):
737        ret = self.get_bitmap(node_name, bitmap_name)
738
739        return fields.items() <= ret.items()
740
741    def assert_block_path(self, root, path, expected_node, graph=None):
742        """
743        Check whether the node under the given path in the block graph
744        is @expected_node.
745
746        @root is the node name of the node where the @path is rooted.
747
748        @path is a string that consists of child names separated by
749        slashes.  It must begin with a slash.
750
751        Examples for @root + @path:
752          - root="qcow2-node", path="/backing/file"
753          - root="quorum-node", path="/children.2/file"
754
755        Hypothetically, @path could be empty, in which case it would
756        point to @root.  However, in practice this case is not useful
757        and hence not allowed.
758
759        @expected_node may be None.  (All elements of the path but the
760        leaf must still exist.)
761
762        @graph may be None or the result of an x-debug-query-block-graph
763        call that has already been performed.
764        """
765        if graph is None:
766            graph = self.qmp('x-debug-query-block-graph')['return']
767
768        iter_path = iter(path.split('/'))
769
770        # Must start with a /
771        assert next(iter_path) == ''
772
773        node = next((node for node in graph['nodes'] if node['name'] == root),
774                    None)
775
776        # An empty @path is not allowed, so the root node must be present
777        assert node is not None, 'Root node %s not found' % root
778
779        for child_name in iter_path:
780            assert node is not None, 'Cannot follow path %s%s' % (root, path)
781
782            try:
783                node_id = next(edge['child'] for edge in graph['edges']
784                               if (edge['parent'] == node['id'] and
785                                   edge['name'] == child_name))
786
787                node = next(node for node in graph['nodes']
788                            if node['id'] == node_id)
789
790            except StopIteration:
791                node = None
792
793        if node is None:
794            assert expected_node is None, \
795                   'No node found under %s (but expected %s)' % \
796                   (path, expected_node)
797        else:
798            assert node['name'] == expected_node, \
799                   'Found node %s under %s (but expected %s)' % \
800                   (node['name'], path, expected_node)
801
802index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
803
804class QMPTestCase(unittest.TestCase):
805    '''Abstract base class for QMP test cases'''
806
807    def __init__(self, *args, **kwargs):
808        super().__init__(*args, **kwargs)
809        # Many users of this class set a VM property we rely on heavily
810        # in the methods below.
811        self.vm = None
812
813    def dictpath(self, d, path):
814        '''Traverse a path in a nested dict'''
815        for component in path.split('/'):
816            m = index_re.match(component)
817            if m:
818                component, idx = m.groups()
819                idx = int(idx)
820
821            if not isinstance(d, dict) or component not in d:
822                self.fail(f'failed path traversal for "{path}" in "{d}"')
823            d = d[component]
824
825            if m:
826                if not isinstance(d, list):
827                    self.fail(f'path component "{component}" in "{path}" '
828                              f'is not a list in "{d}"')
829                try:
830                    d = d[idx]
831                except IndexError:
832                    self.fail(f'invalid index "{idx}" in path "{path}" '
833                              f'in "{d}"')
834        return d
835
836    def assert_qmp_absent(self, d, path):
837        try:
838            result = self.dictpath(d, path)
839        except AssertionError:
840            return
841        self.fail('path "%s" has value "%s"' % (path, str(result)))
842
843    def assert_qmp(self, d, path, value):
844        '''Assert that the value for a specific path in a QMP dict
845           matches.  When given a list of values, assert that any of
846           them matches.'''
847
848        result = self.dictpath(d, path)
849
850        # [] makes no sense as a list of valid values, so treat it as
851        # an actual single value.
852        if isinstance(value, list) and value != []:
853            for v in value:
854                if result == v:
855                    return
856            self.fail('no match for "%s" in %s' % (str(result), str(value)))
857        else:
858            self.assertEqual(result, value,
859                             '"%s" is "%s", expected "%s"'
860                             % (path, str(result), str(value)))
861
862    def assert_no_active_block_jobs(self):
863        result = self.vm.qmp('query-block-jobs')
864        self.assert_qmp(result, 'return', [])
865
866    def assert_has_block_node(self, node_name=None, file_name=None):
867        """Issue a query-named-block-nodes and assert node_name and/or
868        file_name is present in the result"""
869        def check_equal_or_none(a, b):
870            return a is None or b is None or a == b
871        assert node_name or file_name
872        result = self.vm.qmp('query-named-block-nodes')
873        for x in result["return"]:
874            if check_equal_or_none(x.get("node-name"), node_name) and \
875                    check_equal_or_none(x.get("file"), file_name):
876                return
877        self.fail("Cannot find %s %s in result:\n%s" %
878                  (node_name, file_name, result))
879
880    def assert_json_filename_equal(self, json_filename, reference):
881        '''Asserts that the given filename is a json: filename and that its
882           content is equal to the given reference object'''
883        self.assertEqual(json_filename[:5], 'json:')
884        self.assertEqual(
885            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
886            self.vm.flatten_qmp_object(reference)
887        )
888
889    def cancel_and_wait(self, drive='drive0', force=False,
890                        resume=False, wait=60.0):
891        '''Cancel a block job and wait for it to finish, returning the event'''
892        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
893        self.assert_qmp(result, 'return', {})
894
895        if resume:
896            self.vm.resume_drive(drive)
897
898        cancelled = False
899        result = None
900        while not cancelled:
901            for event in self.vm.get_qmp_events(wait=wait):
902                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
903                   event['event'] == 'BLOCK_JOB_CANCELLED':
904                    self.assert_qmp(event, 'data/device', drive)
905                    result = event
906                    cancelled = True
907                elif event['event'] == 'JOB_STATUS_CHANGE':
908                    self.assert_qmp(event, 'data/id', drive)
909
910
911        self.assert_no_active_block_jobs()
912        return result
913
914    def wait_until_completed(self, drive='drive0', check_offset=True,
915                             wait=60.0, error=None):
916        '''Wait for a block job to finish, returning the event'''
917        while True:
918            for event in self.vm.get_qmp_events(wait=wait):
919                if event['event'] == 'BLOCK_JOB_COMPLETED':
920                    self.assert_qmp(event, 'data/device', drive)
921                    if error is None:
922                        self.assert_qmp_absent(event, 'data/error')
923                        if check_offset:
924                            self.assert_qmp(event, 'data/offset',
925                                            event['data']['len'])
926                    else:
927                        self.assert_qmp(event, 'data/error', error)
928                    self.assert_no_active_block_jobs()
929                    return event
930                if event['event'] == 'JOB_STATUS_CHANGE':
931                    self.assert_qmp(event, 'data/id', drive)
932
933    def wait_ready(self, drive='drive0'):
934        """Wait until a BLOCK_JOB_READY event, and return the event."""
935        f = {'data': {'type': 'mirror', 'device': drive}}
936        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
937
938    def wait_ready_and_cancel(self, drive='drive0'):
939        self.wait_ready(drive=drive)
940        event = self.cancel_and_wait(drive=drive)
941        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
942        self.assert_qmp(event, 'data/type', 'mirror')
943        self.assert_qmp(event, 'data/offset', event['data']['len'])
944
945    def complete_and_wait(self, drive='drive0', wait_ready=True,
946                          completion_error=None):
947        '''Complete a block job and wait for it to finish'''
948        if wait_ready:
949            self.wait_ready(drive=drive)
950
951        result = self.vm.qmp('block-job-complete', device=drive)
952        self.assert_qmp(result, 'return', {})
953
954        event = self.wait_until_completed(drive=drive, error=completion_error)
955        self.assert_qmp(event, 'data/type', 'mirror')
956
957    def pause_wait(self, job_id='job0'):
958        with Timeout(3, "Timeout waiting for job to pause"):
959            while True:
960                result = self.vm.qmp('query-block-jobs')
961                found = False
962                for job in result['return']:
963                    if job['device'] == job_id:
964                        found = True
965                        if job['paused'] and not job['busy']:
966                            return job
967                        break
968                assert found
969
970    def pause_job(self, job_id='job0', wait=True):
971        result = self.vm.qmp('block-job-pause', device=job_id)
972        self.assert_qmp(result, 'return', {})
973        if wait:
974            return self.pause_wait(job_id)
975        return result
976
977    def case_skip(self, reason):
978        '''Skip this test case'''
979        case_notrun(reason)
980        self.skipTest(reason)
981
982
983def notrun(reason):
984    '''Skip this test suite'''
985    # Each test in qemu-iotests has a number ("seq")
986    seq = os.path.basename(sys.argv[0])
987
988    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
989    logger.warning("%s not run: %s", seq, reason)
990    sys.exit(0)
991
992def case_notrun(reason):
993    '''Mark this test case as not having been run (without actually
994    skipping it, that is left to the caller).  See
995    QMPTestCase.case_skip() for a variant that actually skips the
996    current test case.'''
997
998    # Each test in qemu-iotests has a number ("seq")
999    seq = os.path.basename(sys.argv[0])
1000
1001    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1002        '    [case not run] ' + reason + '\n')
1003
1004def _verify_image_format(supported_fmts: Sequence[str] = (),
1005                         unsupported_fmts: Sequence[str] = ()) -> None:
1006    assert not (supported_fmts and unsupported_fmts)
1007
1008    if 'generic' in supported_fmts and \
1009            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1010        # similar to
1011        #   _supported_fmt generic
1012        # for bash tests
1013        if imgfmt == 'luks':
1014            verify_working_luks()
1015        return
1016
1017    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1018    if not_sup or (imgfmt in unsupported_fmts):
1019        notrun('not suitable for this image format: %s' % imgfmt)
1020
1021    if imgfmt == 'luks':
1022        verify_working_luks()
1023
1024def _verify_protocol(supported: Sequence[str] = (),
1025                     unsupported: Sequence[str] = ()) -> None:
1026    assert not (supported and unsupported)
1027
1028    if 'generic' in supported:
1029        return
1030
1031    not_sup = supported and (imgproto not in supported)
1032    if not_sup or (imgproto in unsupported):
1033        notrun('not suitable for this protocol: %s' % imgproto)
1034
1035def _verify_platform(supported: Sequence[str] = (),
1036                     unsupported: Sequence[str] = ()) -> None:
1037    if any((sys.platform.startswith(x) for x in unsupported)):
1038        notrun('not suitable for this OS: %s' % sys.platform)
1039
1040    if supported:
1041        if not any((sys.platform.startswith(x) for x in supported)):
1042            notrun('not suitable for this OS: %s' % sys.platform)
1043
1044def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1045    if supported_cache_modes and (cachemode not in supported_cache_modes):
1046        notrun('not suitable for this cache mode: %s' % cachemode)
1047
1048def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1049    if supported_aio_modes and (aiomode not in supported_aio_modes):
1050        notrun('not suitable for this aio mode: %s' % aiomode)
1051
1052def supports_quorum():
1053    return 'quorum' in qemu_img_pipe('--help')
1054
1055def verify_quorum():
1056    '''Skip test suite if quorum support is not available'''
1057    if not supports_quorum():
1058        notrun('quorum support missing')
1059
1060def has_working_luks() -> Tuple[bool, str]:
1061    """
1062    Check whether our LUKS driver can actually create images
1063    (this extends to LUKS encryption for qcow2).
1064
1065    If not, return the reason why.
1066    """
1067
1068    img_file = f'{test_dir}/luks-test.luks'
1069    (output, status) = \
1070        qemu_img_pipe_and_status('create', '-f', 'luks',
1071                                 '--object', luks_default_secret_object,
1072                                 '-o', luks_default_key_secret_opt,
1073                                 '-o', 'iter-time=10',
1074                                 img_file, '1G')
1075    try:
1076        os.remove(img_file)
1077    except OSError:
1078        pass
1079
1080    if status != 0:
1081        reason = output
1082        for line in output.splitlines():
1083            if img_file + ':' in line:
1084                reason = line.split(img_file + ':', 1)[1].strip()
1085                break
1086
1087        return (False, reason)
1088    else:
1089        return (True, '')
1090
1091def verify_working_luks():
1092    """
1093    Skip test suite if LUKS does not work
1094    """
1095    (working, reason) = has_working_luks()
1096    if not working:
1097        notrun(reason)
1098
1099def qemu_pipe(*args):
1100    """
1101    Run qemu with an option to print something and exit (e.g. a help option).
1102
1103    :return: QEMU's stdout output.
1104    """
1105    args = [qemu_prog] + qemu_opts + list(args)
1106    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1107                            stderr=subprocess.STDOUT,
1108                            universal_newlines=True)
1109    output = subp.communicate()[0]
1110    if subp.returncode < 0:
1111        sys.stderr.write('qemu received signal %i: %s\n' %
1112                         (-subp.returncode, ' '.join(args)))
1113    return output
1114
1115def supported_formats(read_only=False):
1116    '''Set 'read_only' to True to check ro-whitelist
1117       Otherwise, rw-whitelist is checked'''
1118
1119    if not hasattr(supported_formats, "formats"):
1120        supported_formats.formats = {}
1121
1122    if read_only not in supported_formats.formats:
1123        format_message = qemu_pipe("-drive", "format=help")
1124        line = 1 if read_only else 0
1125        supported_formats.formats[read_only] = \
1126            format_message.splitlines()[line].split(":")[1].split()
1127
1128    return supported_formats.formats[read_only]
1129
1130def skip_if_unsupported(required_formats=(), read_only=False):
1131    '''Skip Test Decorator
1132       Runs the test if all the required formats are whitelisted'''
1133    def skip_test_decorator(func):
1134        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1135                         **kwargs: Dict[str, Any]) -> None:
1136            if callable(required_formats):
1137                fmts = required_formats(test_case)
1138            else:
1139                fmts = required_formats
1140
1141            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1142            if usf_list:
1143                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1144                test_case.case_skip(msg)
1145            else:
1146                func(test_case, *args, **kwargs)
1147        return func_wrapper
1148    return skip_test_decorator
1149
1150def skip_for_formats(formats: Sequence[str] = ()) \
1151    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1152                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1153    '''Skip Test Decorator
1154       Skips the test for the given formats'''
1155    def skip_test_decorator(func):
1156        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1157                         **kwargs: Dict[str, Any]) -> None:
1158            if imgfmt in formats:
1159                msg = f'{test_case}: Skipped for format {imgfmt}'
1160                test_case.case_skip(msg)
1161            else:
1162                func(test_case, *args, **kwargs)
1163        return func_wrapper
1164    return skip_test_decorator
1165
1166def skip_if_user_is_root(func):
1167    '''Skip Test Decorator
1168       Runs the test only without root permissions'''
1169    def func_wrapper(*args, **kwargs):
1170        if os.getuid() == 0:
1171            case_notrun('{}: cannot be run as root'.format(args[0]))
1172            return None
1173        else:
1174            return func(*args, **kwargs)
1175    return func_wrapper
1176
1177def execute_unittest(debug=False):
1178    """Executes unittests within the calling module."""
1179
1180    verbosity = 2 if debug else 1
1181
1182    if debug:
1183        output = sys.stdout
1184    else:
1185        # We need to filter out the time taken from the output so that
1186        # qemu-iotest can reliably diff the results against master output.
1187        output = io.StringIO()
1188
1189    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1190                                     verbosity=verbosity)
1191    try:
1192        # unittest.main() will use sys.exit(); so expect a SystemExit
1193        # exception
1194        unittest.main(testRunner=runner)
1195    finally:
1196        # We need to filter out the time taken from the output so that
1197        # qemu-iotest can reliably diff the results against master output.
1198        if not debug:
1199            out = output.getvalue()
1200            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1201
1202            # Hide skipped tests from the reference output
1203            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1204            out_first_line, out_rest = out.split('\n', 1)
1205            out = out_first_line.replace('s', '.') + '\n' + out_rest
1206
1207            sys.stderr.write(out)
1208
1209def execute_setup_common(supported_fmts: Sequence[str] = (),
1210                         supported_platforms: Sequence[str] = (),
1211                         supported_cache_modes: Sequence[str] = (),
1212                         supported_aio_modes: Sequence[str] = (),
1213                         unsupported_fmts: Sequence[str] = (),
1214                         supported_protocols: Sequence[str] = (),
1215                         unsupported_protocols: Sequence[str] = ()) -> bool:
1216    """
1217    Perform necessary setup for either script-style or unittest-style tests.
1218
1219    :return: Bool; Whether or not debug mode has been requested via the CLI.
1220    """
1221    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1222
1223    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1224    # indicate that we're not being run via "check". There may be
1225    # other things set up by "check" that individual test cases rely
1226    # on.
1227    if test_dir is None or qemu_default_machine is None:
1228        sys.stderr.write('Please run this test via the "check" script\n')
1229        sys.exit(os.EX_USAGE)
1230
1231    debug = '-d' in sys.argv
1232    if debug:
1233        sys.argv.remove('-d')
1234    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1235
1236    _verify_image_format(supported_fmts, unsupported_fmts)
1237    _verify_protocol(supported_protocols, unsupported_protocols)
1238    _verify_platform(supported=supported_platforms)
1239    _verify_cache_mode(supported_cache_modes)
1240    _verify_aio_mode(supported_aio_modes)
1241
1242    return debug
1243
1244def execute_test(*args, test_function=None, **kwargs):
1245    """Run either unittest or script-style tests."""
1246
1247    debug = execute_setup_common(*args, **kwargs)
1248    if not test_function:
1249        execute_unittest(debug)
1250    else:
1251        test_function()
1252
1253def activate_logging():
1254    """Activate iotests.log() output to stdout for script-style tests."""
1255    handler = logging.StreamHandler(stream=sys.stdout)
1256    formatter = logging.Formatter('%(message)s')
1257    handler.setFormatter(formatter)
1258    test_logger.addHandler(handler)
1259    test_logger.setLevel(logging.INFO)
1260    test_logger.propagate = False
1261
1262# This is called from script-style iotests without a single point of entry
1263def script_initialize(*args, **kwargs):
1264    """Initialize script-style tests without running any tests."""
1265    activate_logging()
1266    execute_setup_common(*args, **kwargs)
1267
1268# This is called from script-style iotests with a single point of entry
1269def script_main(test_function, *args, **kwargs):
1270    """Run script-style tests outside of the unittest framework"""
1271    activate_logging()
1272    execute_test(*args, test_function=test_function, **kwargs)
1273
1274# This is called from unittest style iotests
1275def main(*args, **kwargs):
1276    """Run tests using the unittest framework"""
1277    execute_test(*args, **kwargs)
1278