xref: /qemu/tests/qemu-iotests/155 (revision 0153d2f5)
1#!/usr/bin/env python
2#
3# Test whether the backing BDSs are correct after completion of a
4# mirror block job; in "existing" modes (drive-mirror with
5# mode=existing and blockdev-mirror) the backing chain should not be
6# overridden.
7#
8# Copyright (C) 2016 Red Hat, Inc.
9#
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22#
23
24import os
25import iotests
26from iotests import qemu_img
27
28back0_img = os.path.join(iotests.test_dir, 'back0.' + iotests.imgfmt)
29back1_img = os.path.join(iotests.test_dir, 'back1.' + iotests.imgfmt)
30back2_img = os.path.join(iotests.test_dir, 'back2.' + iotests.imgfmt)
31source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt)
32target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt)
33
34
35# Class variables for controlling its behavior:
36#
37# existing: If True, explicitly create the target image and blockdev-add it
38# target_backing: If existing is True: Use this filename as the backing file
39#                 of the target image
40#                 (None: no backing file)
41# target_blockdev_backing: If existing is True: Pass this dict as "backing"
42#                          for the blockdev-add command
43#                          (None: do not pass "backing")
44# target_real_backing: If existing is True: The real filename of the backing
45#                      image during runtime, only makes sense if
46#                      target_blockdev_backing is not None
47#                      (None: same as target_backing)
48
49class BaseClass(iotests.QMPTestCase):
50    target_blockdev_backing = None
51    target_real_backing = None
52
53    def setUp(self):
54        qemu_img('create', '-f', iotests.imgfmt, back0_img, '1M')
55        qemu_img('create', '-f', iotests.imgfmt, '-b', back0_img, back1_img)
56        qemu_img('create', '-f', iotests.imgfmt, '-b', back1_img, back2_img)
57        qemu_img('create', '-f', iotests.imgfmt, '-b', back2_img, source_img)
58
59        self.vm = iotests.VM()
60        self.vm.add_drive(None, '', 'none')
61        self.vm.launch()
62
63        # Add the BDS via blockdev-add so it stays around after the mirror block
64        # job has been completed
65        result = self.vm.qmp('blockdev-add',
66                             node_name='source',
67                             driver=iotests.imgfmt,
68                             file={'driver': 'file',
69                                   'filename': source_img})
70        self.assert_qmp(result, 'return', {})
71
72        result = self.vm.qmp('x-blockdev-insert-medium',
73                             device='drive0', node_name='source')
74        self.assert_qmp(result, 'return', {})
75
76        self.assertIntactSourceBackingChain()
77
78        if self.existing:
79            if self.target_backing:
80                qemu_img('create', '-f', iotests.imgfmt,
81                         '-b', self.target_backing, target_img, '1M')
82            else:
83                qemu_img('create', '-f', iotests.imgfmt, target_img, '1M')
84
85            if self.cmd == 'blockdev-mirror':
86                options = { 'node-name': 'target',
87                            'driver': iotests.imgfmt,
88                            'file': { 'driver': 'file',
89                                      'filename': target_img } }
90                if self.target_blockdev_backing:
91                    options['backing'] = self.target_blockdev_backing
92
93                result = self.vm.qmp('blockdev-add', **options)
94                self.assert_qmp(result, 'return', {})
95
96    def tearDown(self):
97        self.vm.shutdown()
98        os.remove(source_img)
99        os.remove(back2_img)
100        os.remove(back1_img)
101        os.remove(back0_img)
102        try:
103            os.remove(target_img)
104        except OSError:
105            pass
106
107    def findBlockNode(self, node_name, id=None):
108        if id:
109            result = self.vm.qmp('query-block')
110            for device in result['return']:
111                if device['device'] == id:
112                    if node_name:
113                        self.assert_qmp(device, 'inserted/node-name', node_name)
114                    return device['inserted']
115        else:
116            result = self.vm.qmp('query-named-block-nodes')
117            for node in result['return']:
118                if node['node-name'] == node_name:
119                    return node
120
121        self.fail('Cannot find node %s/%s' % (id, node_name))
122
123    def assertIntactSourceBackingChain(self):
124        node = self.findBlockNode('source')
125
126        self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename',
127                        source_img)
128        self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename',
129                        back2_img)
130        self.assert_qmp(node, 'image' + '/backing-image' * 2 + '/filename',
131                        back1_img)
132        self.assert_qmp(node, 'image' + '/backing-image' * 3 + '/filename',
133                        back0_img)
134        self.assert_qmp_absent(node, 'image' + '/backing-image' * 4)
135
136    def assertCorrectBackingImage(self, node, default_image):
137        if self.existing:
138            if self.target_real_backing:
139                image = self.target_real_backing
140            else:
141                image = self.target_backing
142        else:
143            image = default_image
144
145        if image:
146            self.assert_qmp(node, 'image/backing-image/filename', image)
147        else:
148            self.assert_qmp_absent(node, 'image/backing-image')
149
150
151# Class variables for controlling its behavior:
152#
153# cmd: Mirroring command to execute, either drive-mirror or blockdev-mirror
154
155class MirrorBaseClass(BaseClass):
156    def runMirror(self, sync):
157        if self.cmd == 'blockdev-mirror':
158            result = self.vm.qmp(self.cmd, device='drive0', sync=sync,
159                                 target='target')
160        else:
161            if self.existing:
162                mode = 'existing'
163            else:
164                mode = 'absolute-paths'
165            result = self.vm.qmp(self.cmd, device='drive0', sync=sync,
166                                 target=target_img, format=iotests.imgfmt,
167                                 mode=mode, node_name='target')
168
169        self.assert_qmp(result, 'return', {})
170
171        self.vm.event_wait('BLOCK_JOB_READY')
172
173        result = self.vm.qmp('block-job-complete', device='drive0')
174        self.assert_qmp(result, 'return', {})
175
176        self.vm.event_wait('BLOCK_JOB_COMPLETED')
177
178    def testFull(self):
179        self.runMirror('full')
180
181        node = self.findBlockNode('target', 'drive0')
182        self.assertCorrectBackingImage(node, None)
183        self.assertIntactSourceBackingChain()
184
185    def testTop(self):
186        self.runMirror('top')
187
188        node = self.findBlockNode('target', 'drive0')
189        self.assertCorrectBackingImage(node, back2_img)
190        self.assertIntactSourceBackingChain()
191
192    def testNone(self):
193        self.runMirror('none')
194
195        node = self.findBlockNode('target', 'drive0')
196        self.assertCorrectBackingImage(node, source_img)
197        self.assertIntactSourceBackingChain()
198
199
200class TestDriveMirrorAbsolutePaths(MirrorBaseClass):
201    cmd = 'drive-mirror'
202    existing = False
203
204class TestDriveMirrorExistingNoBacking(MirrorBaseClass):
205    cmd = 'drive-mirror'
206    existing = True
207    target_backing = None
208
209class TestDriveMirrorExistingBacking(MirrorBaseClass):
210    cmd = 'drive-mirror'
211    existing = True
212    target_backing = 'null-co://'
213
214class TestBlockdevMirrorNoBacking(MirrorBaseClass):
215    cmd = 'blockdev-mirror'
216    existing = True
217    target_backing = None
218
219class TestBlockdevMirrorBacking(MirrorBaseClass):
220    cmd = 'blockdev-mirror'
221    existing = True
222    target_backing = 'null-co://'
223
224class TestBlockdevMirrorForcedBacking(MirrorBaseClass):
225    cmd = 'blockdev-mirror'
226    existing = True
227    target_backing = None
228    target_blockdev_backing = { 'driver': 'null-co' }
229    target_real_backing = 'null-co://'
230
231
232class TestCommit(BaseClass):
233    existing = False
234
235    def testCommit(self):
236        result = self.vm.qmp('block-commit', device='drive0', base=back1_img)
237        self.assert_qmp(result, 'return', {})
238
239        self.vm.event_wait('BLOCK_JOB_READY')
240
241        result = self.vm.qmp('block-job-complete', device='drive0')
242        self.assert_qmp(result, 'return', {})
243
244        self.vm.event_wait('BLOCK_JOB_COMPLETED')
245
246        node = self.findBlockNode(None, 'drive0')
247        self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename',
248                        back1_img)
249        self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename',
250                        back0_img)
251        self.assert_qmp_absent(node, 'image' + '/backing-image' * 2 +
252                               '/filename')
253
254        self.assertIntactSourceBackingChain()
255
256
257BaseClass = None
258MirrorBaseClass = None
259
260if __name__ == '__main__':
261    iotests.main(supported_fmts=['qcow2'])
262