xref: /qemu/tests/qemu-iotests/295 (revision 940bb5fa)
1#!/usr/bin/env python3
2# group: rw
3#
4# Test case QMP's encrypted key management
5#
6# Copyright (C) 2019 Red Hat, Inc.
7#
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20#
21
22import iotests
23import os
24import time
25import json
26
27test_img = os.path.join(iotests.test_dir, 'test.img')
28
29class Secret:
30    def __init__(self, index):
31        self._id = "keysec" + str(index)
32        # you are not supposed to see the password...
33        self._secret = "hunter" + str(index)
34
35    def id(self):
36        return self._id
37
38    def secret(self):
39        return self._secret
40
41    def to_cmdline_object(self):
42        return  [ "secret,id=" + self._id + ",data=" + self._secret]
43
44    def to_qmp_object(self):
45        return { "qom_type" : "secret", "id": self.id(),
46                 "data": self.secret() }
47
48################################################################################
49class EncryptionSetupTestCase(iotests.QMPTestCase):
50
51    # test case startup
52    def setUp(self):
53        # start the VM
54        self.vm = iotests.VM()
55        self.vm.launch()
56
57        # create the secrets and load 'em into the VM
58        self.secrets = [ Secret(i) for i in range(0, 6) ]
59        for secret in self.secrets:
60            self.vm.cmd("object-add", **secret.to_qmp_object())
61
62        if iotests.imgfmt == "qcow2":
63            self.pfx = "encrypt."
64            self.img_opts = [ '-o', "encrypt.format=luks" ]
65        else:
66            self.pfx = ""
67            self.img_opts = []
68
69    # test case shutdown
70    def tearDown(self):
71        # stop the VM
72        self.vm.shutdown()
73
74    ###########################################################################
75    # create the encrypted block device
76    def createImg(self, file, secret):
77
78        iotests.qemu_img(
79            'create',
80            '--object', *secret.to_cmdline_object(),
81            '-f', iotests.imgfmt,
82            '-o', self.pfx + 'key-secret=' + secret.id(),
83            '-o', self.pfx + 'iter-time=10',
84            *self.img_opts,
85            file,
86            '1M')
87
88    ###########################################################################
89    # open an encrypted block device
90    def openImageQmp(self, id, file, secret, read_only = False):
91
92        encrypt_options = {
93            'key-secret' : secret.id()
94        }
95
96        if iotests.imgfmt == "qcow2":
97            encrypt_options = {
98                'encrypt': {
99                    'format':'luks',
100                    **encrypt_options
101                }
102            }
103
104        self.vm.cmd('blockdev-add', {
105                'driver': iotests.imgfmt,
106                'node-name': id,
107                'read-only': read_only,
108
109                **encrypt_options,
110
111                'file': {
112                    'driver': 'file',
113                    'filename': test_img,
114                }
115            }
116        )
117
118    # close the encrypted block device
119    def closeImageQmp(self, id):
120        self.vm.cmd('blockdev-del', {'node-name': id})
121
122    ###########################################################################
123    # add a key to an encrypted block device
124    def addKeyQmp(self, id, new_secret, secret = None,
125                  slot = None, force = False):
126
127        crypt_options = {
128            'state'      : 'active',
129            'new-secret' : new_secret.id(),
130            'iter-time' : 10
131        }
132
133        if slot != None:
134            crypt_options['keyslot'] = slot
135
136
137        if secret != None:
138            crypt_options['secret'] = secret.id()
139
140        if iotests.imgfmt == "qcow2":
141            crypt_options['format'] = 'luks'
142            crypt_options = {
143                'encrypt': crypt_options
144            }
145
146        args = {
147            'node-name': id,
148            'job-id' : 'job_add_key',
149            'options' : {
150                    'driver' : iotests.imgfmt,
151                    **crypt_options
152                },
153        }
154
155        if force == True:
156            args['force'] = True
157
158        #TODO: check what jobs return
159        self.vm.cmd('x-blockdev-amend', **args)
160        self.vm.run_job('job_add_key')
161
162    # erase a key from an encrypted block device
163    def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False):
164
165        crypt_options = {
166            'state'      : 'inactive',
167        }
168
169        if slot != None:
170            crypt_options['keyslot'] = slot
171        if old_secret != None:
172            crypt_options['old-secret'] = old_secret.id()
173
174        if iotests.imgfmt == "qcow2":
175            crypt_options['format'] = 'luks'
176            crypt_options = {
177                'encrypt': crypt_options
178            }
179
180        args = {
181            'node-name': id,
182            'job-id' : 'job_erase_key',
183            'options' : {
184                    'driver' : iotests.imgfmt,
185                    **crypt_options
186                },
187        }
188
189        if force == True:
190            args['force'] = True
191
192        self.vm.cmd('x-blockdev-amend', **args)
193        self.vm.run_job('job_erase_key')
194
195    ###########################################################################
196    # create image, and change its key
197    def testChangeKey(self):
198
199        # create the image with secret0 and open it
200        self.createImg(test_img, self.secrets[0]);
201        self.openImageQmp("testdev", test_img, self.secrets[0])
202
203        # add key to slot 1
204        self.addKeyQmp("testdev", new_secret = self.secrets[1])
205
206        # add key to slot 5
207        self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5)
208
209        # erase key from slot 0
210        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
211
212        #reopen the image with secret1
213        self.closeImageQmp("testdev")
214        self.openImageQmp("testdev", test_img, self.secrets[1])
215
216        # close and erase the image for good
217        self.closeImageQmp("testdev")
218        os.remove(test_img)
219
220    # test that if we erase the old password,
221    # we can still change the encryption keys using 'old-secret'
222    def testOldPassword(self):
223
224        # create the image with secret0 and open it
225        self.createImg(test_img, self.secrets[0]);
226        self.openImageQmp("testdev", test_img, self.secrets[0])
227
228        # add key to slot 1
229        self.addKeyQmp("testdev", new_secret = self.secrets[1])
230
231        # erase key from slot 0
232        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
233
234        # this will fail as the old password is no longer valid
235        self.addKeyQmp("testdev", new_secret = self.secrets[2])
236
237        # this will work
238        self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1])
239
240        # close and erase the image for good
241        self.closeImageQmp("testdev")
242        os.remove(test_img)
243
244    def testUseForceLuke(self):
245
246        self.createImg(test_img, self.secrets[0]);
247        self.openImageQmp("testdev", test_img, self.secrets[0])
248
249        # Add bunch of secrets
250        self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4)
251        self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2)
252
253        # overwrite an active secret
254        self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2)
255        self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True)
256
257        self.addKeyQmp("testdev", new_secret = self.secrets[0])
258
259        # Now erase all the secrets
260        self.eraseKeyQmp("testdev", old_secret = self.secrets[5])
261        self.eraseKeyQmp("testdev", slot=4)
262
263        # erase last keyslot
264        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
265        self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True)
266
267        self.closeImageQmp("testdev")
268        os.remove(test_img)
269
270
271if __name__ == '__main__':
272    iotests.verify_working_luks()
273    # Encrypted formats support
274    iotests.activate_logging()
275    iotests.main(supported_fmts = ['qcow2', 'luks'])
276