1#!/usr/bin/env python3
2# group: rw quick
3#
4# Test cases for the QMP 'blockdev-del' command
5#
6# Copyright (C) 2015 Igalia, S.L.
7# Author: Alberto Garcia <berto@igalia.com>
8#
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program.  If not, see <http://www.gnu.org/licenses/>.
21#
22
23import os
24import iotests
25import time
26
27base_img = os.path.join(iotests.test_dir, 'base.img')
28new_img = os.path.join(iotests.test_dir, 'new.img')
29
30class TestBlockdevDel(iotests.QMPTestCase):
31
32    def setUp(self):
33        iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
34        self.vm = iotests.VM()
35        self.vm.add_device("{},id=virtio-scsi".format('virtio-scsi'))
36        self.vm.launch()
37
38    def tearDown(self):
39        self.vm.shutdown()
40        os.remove(base_img)
41        if os.path.isfile(new_img):
42            os.remove(new_img)
43
44    # Check whether a BlockDriverState exists
45    def checkBlockDriverState(self, node, must_exist = True):
46        result = self.vm.qmp('query-named-block-nodes')
47        nodes = [x for x in result['return'] if x['node-name'] == node]
48        self.assertLessEqual(len(nodes), 1)
49        self.assertEqual(must_exist, len(nodes) == 1)
50
51    # Add a BlockDriverState without a BlockBackend
52    def addBlockDriverState(self, node):
53        file_node = '%s_file' % node
54        self.checkBlockDriverState(node, False)
55        self.checkBlockDriverState(file_node, False)
56        opts = {'driver': iotests.imgfmt,
57                'node-name': node,
58                'file': {'driver': 'file',
59                         'node-name': file_node,
60                         'filename': base_img}}
61        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
62        self.assert_qmp(result, 'return', {})
63        self.checkBlockDriverState(node)
64        self.checkBlockDriverState(file_node)
65
66    # Add a BlockDriverState that will be used as overlay for the base_img BDS
67    def addBlockDriverStateOverlay(self, node):
68        self.checkBlockDriverState(node, False)
69        iotests.qemu_img('create', '-u', '-f', iotests.imgfmt,
70                         '-b', base_img, '-F', iotests.imgfmt, new_img, '1M')
71        opts = {'driver': iotests.imgfmt,
72                'node-name': node,
73                'backing': None,
74                'file': {'driver': 'file',
75                         'filename': new_img}}
76        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
77        self.assert_qmp(result, 'return', {})
78        self.checkBlockDriverState(node)
79
80    # Delete a BlockDriverState
81    def delBlockDriverState(self, node, expect_error = False):
82        self.checkBlockDriverState(node)
83        result = self.vm.qmp('blockdev-del', node_name = node)
84        if expect_error:
85            self.assert_qmp(result, 'error/class', 'GenericError')
86        else:
87            self.assert_qmp(result, 'return', {})
88        self.checkBlockDriverState(node, expect_error)
89
90    # Add a device model
91    def addDeviceModel(self, device, backend, driver = 'virtio-blk'):
92        result = self.vm.qmp('device_add', id = device,
93                             driver = driver, drive = backend)
94        self.assert_qmp(result, 'return', {})
95
96    # Delete a device model
97    def delDeviceModel(self, device, is_virtio_blk = True):
98        result = self.vm.qmp('device_del', id = device)
99        self.assert_qmp(result, 'return', {})
100
101        result = self.vm.qmp('system_reset')
102        self.assert_qmp(result, 'return', {})
103
104        if is_virtio_blk:
105            device_path = '/machine/peripheral/%s/virtio-backend' % device
106            event = self.vm.event_wait(name="DEVICE_DELETED",
107                                       match={'data': {'path': device_path}})
108            self.assertNotEqual(event, None)
109
110        event = self.vm.event_wait(name="DEVICE_DELETED",
111                                   match={'data': {'device': device}})
112        self.assertNotEqual(event, None)
113
114    # Remove a BlockDriverState
115    def ejectDrive(self, device, node, expect_error = False,
116                   destroys_media = True):
117        self.checkBlockDriverState(node)
118        result = self.vm.qmp('eject', id = device)
119        if expect_error:
120            self.assert_qmp(result, 'error/class', 'GenericError')
121            self.checkBlockDriverState(node)
122        else:
123            self.assert_qmp(result, 'return', {})
124            self.checkBlockDriverState(node, not destroys_media)
125
126    # Insert a BlockDriverState
127    def insertDrive(self, device, node):
128        self.checkBlockDriverState(node)
129        result = self.vm.qmp('blockdev-insert-medium',
130                             id = device, node_name = node)
131        self.assert_qmp(result, 'return', {})
132        self.checkBlockDriverState(node)
133
134    # Create a snapshot using 'blockdev-snapshot-sync'
135    def createSnapshotSync(self, node, overlay):
136        self.checkBlockDriverState(node)
137        self.checkBlockDriverState(overlay, False)
138        opts = {'node-name': node,
139                'snapshot-file': new_img,
140                'snapshot-node-name': overlay,
141                'format': iotests.imgfmt}
142        result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
143        self.assert_qmp(result, 'return', {})
144        self.checkBlockDriverState(node)
145        self.checkBlockDriverState(overlay)
146
147    # Create a snapshot using 'blockdev-snapshot'
148    def createSnapshot(self, node, overlay):
149        self.checkBlockDriverState(node)
150        self.checkBlockDriverState(overlay)
151        result = self.vm.qmp('blockdev-snapshot',
152                             node = node, overlay = overlay)
153        self.assert_qmp(result, 'return', {})
154        self.checkBlockDriverState(node)
155        self.checkBlockDriverState(overlay)
156
157    # Create a mirror
158    def createMirror(self, node, new_node):
159        self.checkBlockDriverState(new_node, False)
160        opts = {'device': node,
161                'job-id': node,
162                'target': new_img,
163                'node-name': new_node,
164                'sync': 'top',
165                'format': iotests.imgfmt}
166        result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
167        self.assert_qmp(result, 'return', {})
168        self.checkBlockDriverState(new_node)
169
170    # Complete an existing block job
171    def completeBlockJob(self, id, node_before, node_after):
172        result = self.vm.qmp('block-job-complete', device=id)
173        self.assert_qmp(result, 'return', {})
174        self.wait_until_completed(id)
175
176    # Add a BlkDebug node
177    # Note that the purpose of this is to test the blockdev-del
178    # sanity checks, not to create a usable blkdebug drive
179    def addBlkDebug(self, debug, node):
180        self.checkBlockDriverState(node, False)
181        self.checkBlockDriverState(debug, False)
182        image = {'driver': iotests.imgfmt,
183                 'node-name': node,
184                 'file': {'driver': 'file',
185                          'filename': base_img}}
186        opts = {'driver': 'blkdebug',
187                'node-name': debug,
188                'image': image}
189        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
190        self.assert_qmp(result, 'return', {})
191        self.checkBlockDriverState(node)
192        self.checkBlockDriverState(debug)
193
194    # Add a BlkVerify node
195    # Note that the purpose of this is to test the blockdev-del
196    # sanity checks, not to create a usable blkverify drive
197    def addBlkVerify(self, blkverify, test, raw):
198        self.checkBlockDriverState(test, False)
199        self.checkBlockDriverState(raw, False)
200        self.checkBlockDriverState(blkverify, False)
201        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
202        node_0 = {'driver': iotests.imgfmt,
203                  'node-name': test,
204                  'file': {'driver': 'file',
205                           'filename': base_img}}
206        node_1 = {'driver': iotests.imgfmt,
207                  'node-name': raw,
208                  'file': {'driver': 'file',
209                           'filename': new_img}}
210        opts = {'driver': 'blkverify',
211                'node-name': blkverify,
212                'test': node_0,
213                'raw': node_1}
214        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
215        self.assert_qmp(result, 'return', {})
216        self.checkBlockDriverState(test)
217        self.checkBlockDriverState(raw)
218        self.checkBlockDriverState(blkverify)
219
220    # Add a Quorum node
221    def addQuorum(self, quorum, child0, child1):
222        self.checkBlockDriverState(child0, False)
223        self.checkBlockDriverState(child1, False)
224        self.checkBlockDriverState(quorum, False)
225        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
226        child_0 = {'driver': iotests.imgfmt,
227                   'node-name': child0,
228                   'file': {'driver': 'file',
229                            'filename': base_img}}
230        child_1 = {'driver': iotests.imgfmt,
231                   'node-name': child1,
232                   'file': {'driver': 'file',
233                            'filename': new_img}}
234        opts = {'driver': 'quorum',
235                'node-name': quorum,
236                'vote-threshold': 1,
237                'children': [ child_0, child_1 ]}
238        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
239        self.assert_qmp(result, 'return', {})
240        self.checkBlockDriverState(child0)
241        self.checkBlockDriverState(child1)
242        self.checkBlockDriverState(quorum)
243
244    ########################
245    # The tests start here #
246    ########################
247
248    def testBlockDriverState(self):
249        self.addBlockDriverState('node0')
250        # You cannot delete a file BDS directly
251        self.delBlockDriverState('node0_file', expect_error = True)
252        self.delBlockDriverState('node0')
253
254    def testDeviceModel(self):
255        self.addBlockDriverState('node0')
256        self.addDeviceModel('device0', 'node0')
257        self.ejectDrive('device0', 'node0', expect_error = True)
258        self.delBlockDriverState('node0', expect_error = True)
259        self.delDeviceModel('device0')
260        self.delBlockDriverState('node0')
261
262    def testAttachMedia(self):
263        # This creates a BlockBackend and removes its media
264        self.addBlockDriverState('node0')
265        self.addDeviceModel('device0', 'node0', 'scsi-cd')
266        self.ejectDrive('device0', 'node0', destroys_media = False)
267        self.delBlockDriverState('node0')
268
269        # This creates a new BlockDriverState and inserts it into the device
270        self.addBlockDriverState('node1')
271        self.insertDrive('device0', 'node1')
272        # The node can't be removed: the new device has an extra reference
273        self.delBlockDriverState('node1', expect_error = True)
274        # The BDS still exists after being ejected, but now it can be removed
275        self.ejectDrive('device0', 'node1', destroys_media = False)
276        self.delBlockDriverState('node1')
277        self.delDeviceModel('device0', False)
278
279    def testSnapshotSync(self):
280        self.addBlockDriverState('node0')
281        self.addDeviceModel('device0', 'node0')
282        self.createSnapshotSync('node0', 'overlay0')
283        # This fails because node0 is now being used as a backing image
284        self.delBlockDriverState('node0', expect_error = True)
285        self.delBlockDriverState('overlay0', expect_error = True)
286        # This succeeds because device0 only has the backend reference
287        self.delDeviceModel('device0')
288        # FIXME Would still be there if blockdev-snapshot-sync took a ref
289        self.checkBlockDriverState('overlay0', False)
290        self.delBlockDriverState('node0')
291
292    def testSnapshot(self):
293        self.addBlockDriverState('node0')
294        self.addDeviceModel('device0', 'node0', 'scsi-cd')
295        self.addBlockDriverStateOverlay('overlay0')
296        self.createSnapshot('node0', 'overlay0')
297        self.delBlockDriverState('node0', expect_error = True)
298        self.delBlockDriverState('overlay0', expect_error = True)
299        self.ejectDrive('device0', 'overlay0', destroys_media = False)
300        self.delBlockDriverState('node0', expect_error = True)
301        self.delBlockDriverState('overlay0')
302        self.delBlockDriverState('node0')
303
304    def testMirror(self):
305        self.addBlockDriverState('node0')
306        self.addDeviceModel('device0', 'node0', 'scsi-cd')
307        self.createMirror('node0', 'mirror0')
308        # The block job prevents removing the device
309        self.delBlockDriverState('node0', expect_error = True)
310        self.delBlockDriverState('mirror0', expect_error = True)
311        self.wait_ready('node0')
312        self.completeBlockJob('node0', 'node0', 'mirror0')
313        self.assert_no_active_block_jobs()
314        # This succeeds because the device now points to mirror0
315        self.delBlockDriverState('node0')
316        self.delBlockDriverState('mirror0', expect_error = True)
317        self.delDeviceModel('device0', False)
318        # FIXME mirror0 disappears, drive-mirror doesn't take a reference
319        #self.delBlockDriverState('mirror0')
320
321    @iotests.skip_if_unsupported(['blkdebug'])
322    def testBlkDebug(self):
323        self.addBlkDebug('debug0', 'node0')
324        # 'node0' is used by the blkdebug node
325        self.delBlockDriverState('node0', expect_error = True)
326        # But we can remove the blkdebug node directly
327        self.delBlockDriverState('debug0')
328        self.checkBlockDriverState('node0', False)
329
330    @iotests.skip_if_unsupported(['blkverify'])
331    def testBlkVerify(self):
332        self.addBlkVerify('verify0', 'node0', 'node1')
333        # We cannot remove the children of a blkverify device
334        self.delBlockDriverState('node0', expect_error = True)
335        self.delBlockDriverState('node1', expect_error = True)
336        # But we can remove the blkverify node directly
337        self.delBlockDriverState('verify0')
338        self.checkBlockDriverState('node0', False)
339        self.checkBlockDriverState('node1', False)
340
341    @iotests.skip_if_unsupported(['quorum'])
342    def testQuorum(self):
343        self.addQuorum('quorum0', 'node0', 'node1')
344        # We cannot remove the children of a Quorum device
345        self.delBlockDriverState('node0', expect_error = True)
346        self.delBlockDriverState('node1', expect_error = True)
347        # But we can remove the Quorum node directly
348        self.delBlockDriverState('quorum0')
349        self.checkBlockDriverState('node0', False)
350        self.checkBlockDriverState('node1', False)
351
352
353if __name__ == '__main__':
354    iotests.main(supported_fmts=["qcow2"],
355                 supported_protocols=["file"])
356