xref: /qemu/tests/qemu-iotests/296 (revision 5db05230)
1#!/usr/bin/env python3
2# group: rw
3#
4# Test case for encryption key management versus image sharing
5#
6# Copyright (C) 2019 Red Hat, Inc.
7#
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20#
21
22import iotests
23import os
24import time
25import json
26
27test_img = os.path.join(iotests.test_dir, 'test.img')
28
29class Secret:
30    def __init__(self, index):
31        self._id = "keysec" + str(index)
32        # you are not supposed to see the password...
33        self._secret = "hunter" + str(index)
34
35    def id(self):
36        return self._id
37
38    def secret(self):
39        return self._secret
40
41    def to_cmdline_object(self):
42        return  [ "secret,id=" + self._id + ",data=" + self._secret]
43
44    def to_qmp_object(self):
45        return { "qom-type" : "secret", "id": self.id(),
46                 "data": self.secret() }
47
48################################################################################
49
50class EncryptionSetupTestCase(iotests.QMPTestCase):
51
52    # test case startup
53    def setUp(self):
54
55        # start the VMs
56        self.vm1 = iotests.VM(path_suffix = 'VM1')
57        self.vm2 = iotests.VM(path_suffix = 'VM2')
58        self.vm1.launch()
59        self.vm2.launch()
60
61        # create the secrets and load 'em into the VMs
62        self.secrets = [ Secret(i) for i in range(0, 4) ]
63        for secret in self.secrets:
64            self.vm1.cmd("object-add", secret.to_qmp_object())
65            self.vm2.cmd("object-add", secret.to_qmp_object())
66
67    # test case shutdown
68    def tearDown(self):
69        # stop the VM
70        self.vm1.shutdown()
71        self.vm2.shutdown()
72
73    ###########################################################################
74    # create the encrypted block device using qemu-img
75    def createImg(self, file, secret):
76
77        iotests.qemu_img(
78            'create',
79            '--object', *secret.to_cmdline_object(),
80            '-f', iotests.imgfmt,
81            '-o', 'key-secret=' + secret.id(),
82            '-o', 'iter-time=10',
83            file,
84            '1M')
85        iotests.log('')
86
87    # attempts to add a key using qemu-img
88    def addKey(self, file, secret, new_secret):
89
90        image_options = {
91            'key-secret' : secret.id(),
92            'driver' : iotests.imgfmt,
93            'file' : {
94                'driver':'file',
95                'filename': file,
96                }
97            }
98
99        output = iotests.qemu_img(
100            'amend',
101            '--object', *secret.to_cmdline_object(),
102            '--object', *new_secret.to_cmdline_object(),
103
104            '-o', 'state=active',
105            '-o', 'new-secret=' + new_secret.id(),
106            '-o', 'iter-time=10',
107
108            "json:" + json.dumps(image_options),
109            check=False  # Expected to fail. Log output.
110        ).stdout
111
112        iotests.log(output, filters=[iotests.filter_test_dir])
113
114    ###########################################################################
115    # open an encrypted block device
116    def openImageQmp(self, vm, id, file, secret,
117                     readOnly = False, reOpen = False):
118
119        command = 'blockdev-reopen' if reOpen else 'blockdev-add'
120
121        opts = {
122                'driver': iotests.imgfmt,
123                'node-name': id,
124                'read-only': readOnly,
125                'key-secret' : secret.id(),
126                'file': {
127                    'driver': 'file',
128                    'filename': test_img,
129                }
130            }
131
132        if reOpen:
133            vm.cmd(command, options=[opts])
134        else:
135            vm.cmd(command, opts)
136
137
138    ###########################################################################
139    # add virtio-blk consumer for a block device
140    def addImageUser(self, vm, id, disk_id, share_rw=False):
141        result = vm.qmp('device_add', {
142                'driver': 'virtio-blk',
143                'id': id,
144                'drive': disk_id,
145                'share-rw' : share_rw
146            }
147        )
148
149        iotests.log(result)
150
151    # close the encrypted block device
152    def closeImageQmp(self, vm, id):
153        vm.cmd('blockdev-del', {'node-name': id})
154
155    ###########################################################################
156
157    # add a key to an encrypted block device
158    def addKeyQmp(self, vm, id, new_secret):
159
160        args = {
161            'node-name': id,
162            'job-id' : 'job0',
163            'options' : {
164                'state'     : 'active',
165                'driver'    : iotests.imgfmt,
166                'new-secret': new_secret.id(),
167                'iter-time' : 10
168            },
169        }
170
171        result = vm.qmp('x-blockdev-amend', args)
172        iotests.log(result)
173        # Run the job only if it was created
174        event = ('JOB_STATUS_CHANGE',
175                 {'data': {'id': 'job0', 'status': 'created'}})
176        if vm.events_wait([event], timeout=0.0) is not None:
177            vm.run_job('job0')
178
179    # test that when the image opened by two qemu processes,
180    # neither of them can update the encryption keys
181    def test1(self):
182        self.createImg(test_img, self.secrets[0]);
183
184        # VM1 opens the image and adds a key
185        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
186        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1])
187
188
189        # VM2 opens the image
190        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
191
192
193        # neither VMs now should be able to add a key
194        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
195        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
196
197
198        # VM 1 closes the image
199        self.closeImageQmp(self.vm1, "testdev")
200
201
202        # now VM2 can add the key
203        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
204
205
206        # qemu-img should also not be able to add a key
207        self.addKey(test_img, self.secrets[0], self.secrets[2])
208
209        # cleanup
210        self.closeImageQmp(self.vm2, "testdev")
211        os.remove(test_img)
212
213
214    # test that when the image opened by two qemu processes,
215    # even if first VM opens it read-only, the second can't update encryption
216    # keys
217    def test2(self):
218        self.createImg(test_img, self.secrets[0]);
219
220        # VM1 opens the image readonly
221        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
222                          readOnly = True)
223
224        # VM2 opens the image
225        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
226
227        # VM1 can't add a key since image is readonly
228        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
229
230        # VM2 can't add a key since VM is has the image opened
231        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
232
233
234        #VM1 reopens the image read-write
235        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
236                          reOpen = True, readOnly = False)
237
238        # VM1 still can't add the key
239        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
240
241        # VM2 gets away
242        self.closeImageQmp(self.vm2, "testdev")
243
244        # VM1 now can add the key
245        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
246
247        self.closeImageQmp(self.vm1, "testdev")
248        os.remove(test_img)
249
250    # test that two VMs can't open the same luks image by default
251    # and attach it to a guest device
252    def test3(self):
253        self.createImg(test_img, self.secrets[0]);
254
255        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
256        self.addImageUser(self.vm1, "testctrl", "testdev")
257
258        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
259        self.addImageUser(self.vm2, "testctrl", "testdev")
260
261
262    # test that two VMs can attach the same luks image to a guest device,
263    # if both use share-rw=on
264    def test4(self):
265        self.createImg(test_img, self.secrets[0]);
266
267        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
268        self.addImageUser(self.vm1, "testctrl", "testdev", share_rw=True)
269
270        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
271        self.addImageUser(self.vm2, "testctrl", "testdev", share_rw=True)
272
273
274
275if __name__ == '__main__':
276    # support only raw luks since luks encrypted qcow2 is a proper
277    # format driver which doesn't allow any sharing
278    iotests.activate_logging()
279    iotests.main(supported_fmts = ['luks'])
280