1#!/usr/bin/env python3
2# group: rw
3#
4# Tests for internal snapshot.
5#
6# Copyright (C) 2013 IBM, Inc.
7#
8# Based on 055.
9#
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22#
23
24import time
25import os
26import iotests
27from iotests import qemu_img, qemu_io
28
29test_drv_base_name = 'drive'
30
31class ImageSnapshotTestCase(iotests.QMPTestCase):
32    image_len = 120 * 1024 * 1024 # MB
33
34    def __init__(self, *args):
35        self.expect = []
36        super(ImageSnapshotTestCase, self).__init__(*args)
37
38    def _setUp(self, test_img_base_name, image_num):
39        self.vm = iotests.VM()
40        for i in range(0, image_num):
41            filename = '%s%d' % (test_img_base_name, i)
42            img = os.path.join(iotests.test_dir, filename)
43            device = '%s%d' % (test_drv_base_name, i)
44            qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len))
45            self.vm.add_drive(img)
46            self.expect.append({'image': img, 'device': device,
47                                'snapshots': [],
48                                'snapshots_name_counter': 0})
49        self.vm.launch()
50
51    def tearDown(self):
52        self.vm.shutdown()
53        for dev_expect in self.expect:
54            os.remove(dev_expect['image'])
55
56    def createSnapshotInTransaction(self, snapshot_num, abort = False):
57        actions = []
58        for dev_expect in self.expect:
59            num = dev_expect['snapshots_name_counter']
60            for j in range(0, snapshot_num):
61                name = '%s_sn%d' % (dev_expect['device'], num)
62                num = num + 1
63                if abort == False:
64                    dev_expect['snapshots'].append({'name': name})
65                    dev_expect['snapshots_name_counter'] = num
66                actions.append({
67                    'type': 'blockdev-snapshot-internal-sync',
68                    'data': { 'device': dev_expect['device'],
69                              'name': name },
70                })
71
72        if abort == True:
73            actions.append({
74                'type': 'abort',
75                'data': {},
76            })
77
78        result = self.vm.qmp('transaction', actions = actions)
79
80        if abort == True:
81            self.assert_qmp(result, 'error/class', 'GenericError')
82        else:
83            self.assert_qmp(result, 'return', {})
84
85    def verifySnapshotInfo(self):
86        result = self.vm.qmp('query-block')
87
88        # Verify each expected result
89        for dev_expect in self.expect:
90            # 1. Find the returned image value and snapshot info
91            image_result = None
92            for device in result['return']:
93                if device['device'] == dev_expect['device']:
94                    image_result = device['inserted']['image']
95                    break
96            self.assertTrue(image_result != None)
97            # Do not consider zero snapshot case now
98            sn_list_result = image_result['snapshots']
99            sn_list_expect = dev_expect['snapshots']
100
101            # 2. Verify it with expect
102            self.assertTrue(len(sn_list_result) == len(sn_list_expect))
103
104            for sn_expect in sn_list_expect:
105                sn_result = None
106                for sn in sn_list_result:
107                    if sn_expect['name'] == sn['name']:
108                        sn_result = sn
109                        break
110                self.assertTrue(sn_result != None)
111                # Fill in the detail info
112                sn_expect.update(sn_result)
113
114    def deleteSnapshot(self, device, id = None, name = None):
115        sn_list_expect = None
116        sn_expect = None
117
118        self.assertTrue(id != None or name != None)
119
120        # Fill in the detail info include ID
121        self.verifySnapshotInfo()
122
123        #find the expected snapshot list
124        for dev_expect in self.expect:
125            if dev_expect['device'] == device:
126                sn_list_expect = dev_expect['snapshots']
127                break
128        self.assertTrue(sn_list_expect != None)
129
130        if id != None and name != None:
131            for sn in sn_list_expect:
132                if sn['id'] == id and sn['name'] == name:
133                    sn_expect = sn
134                    result = \
135                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
136                                      device = device,
137                                      id = id,
138                                      name = name)
139                    break
140        elif id != None:
141            for sn in sn_list_expect:
142                if sn['id'] == id:
143                    sn_expect = sn
144                    result = \
145                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
146                                      device = device,
147                                      id = id)
148                    break
149        else:
150            for sn in sn_list_expect:
151                if sn['name'] == name:
152                    sn_expect = sn
153                    result = \
154                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
155                                      device = device,
156                                      name = name)
157                    break
158
159        self.assertTrue(sn_expect != None)
160
161        self.assert_qmp(result, 'return', sn_expect)
162        sn_list_expect.remove(sn_expect)
163
164class TestSingleTransaction(ImageSnapshotTestCase):
165    def setUp(self):
166        self._setUp('test_a.img', 1)
167
168    def test_create(self):
169        self.createSnapshotInTransaction(1)
170        self.verifySnapshotInfo()
171
172    def test_error_name_empty(self):
173        actions = [{'type': 'blockdev-snapshot-internal-sync',
174                    'data': { 'device': self.expect[0]['device'],
175                              'name': '' },
176                  }]
177        result = self.vm.qmp('transaction', actions = actions)
178        self.assert_qmp(result, 'error/class', 'GenericError')
179
180    def test_error_device(self):
181        actions = [{'type': 'blockdev-snapshot-internal-sync',
182                    'data': { 'device': 'drive_error',
183                              'name': 'a' },
184                  }]
185        result = self.vm.qmp('transaction', actions = actions)
186        self.assert_qmp(result, 'error/class', 'GenericError')
187
188    def test_error_exist(self):
189        self.createSnapshotInTransaction(1)
190        self.verifySnapshotInfo()
191        actions = [{'type': 'blockdev-snapshot-internal-sync',
192                    'data': { 'device': self.expect[0]['device'],
193                              'name': self.expect[0]['snapshots'][0] },
194                  }]
195        result = self.vm.qmp('transaction', actions = actions)
196        self.assert_qmp(result, 'error/class', 'GenericError')
197
198class TestMultipleTransaction(ImageSnapshotTestCase):
199    def setUp(self):
200        self._setUp('test_b.img', 2)
201
202    def test_create(self):
203        self.createSnapshotInTransaction(3)
204        self.verifySnapshotInfo()
205
206    def test_abort(self):
207        self.createSnapshotInTransaction(2)
208        self.verifySnapshotInfo()
209        self.createSnapshotInTransaction(3, abort = True)
210        self.verifySnapshotInfo()
211
212class TestSnapshotDelete(ImageSnapshotTestCase):
213    def setUp(self):
214        self._setUp('test_c.img', 1)
215
216    def test_delete_with_id(self):
217        self.createSnapshotInTransaction(2)
218        self.verifySnapshotInfo()
219        self.deleteSnapshot(self.expect[0]['device'],
220                            id = self.expect[0]['snapshots'][0]['id'])
221        self.verifySnapshotInfo()
222
223    def test_delete_with_name(self):
224        self.createSnapshotInTransaction(3)
225        self.verifySnapshotInfo()
226        self.deleteSnapshot(self.expect[0]['device'],
227                            name = self.expect[0]['snapshots'][1]['name'])
228        self.verifySnapshotInfo()
229
230    def test_delete_with_id_and_name(self):
231        self.createSnapshotInTransaction(4)
232        self.verifySnapshotInfo()
233        self.deleteSnapshot(self.expect[0]['device'],
234                            id = self.expect[0]['snapshots'][2]['id'],
235                            name = self.expect[0]['snapshots'][2]['name'])
236        self.verifySnapshotInfo()
237
238
239    def test_error_device(self):
240        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
241                              device = 'drive_error',
242                              id = '0')
243        self.assert_qmp(result, 'error/class', 'GenericError')
244
245    def test_error_no_id_and_name(self):
246        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
247                              device = self.expect[0]['device'])
248        self.assert_qmp(result, 'error/class', 'GenericError')
249
250    def test_error_snapshot_not_exist(self):
251        self.createSnapshotInTransaction(2)
252        self.verifySnapshotInfo()
253        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
254                              device = self.expect[0]['device'],
255                              id = self.expect[0]['snapshots'][0]['id'],
256                              name = self.expect[0]['snapshots'][1]['name'])
257        self.assert_qmp(result, 'error/class', 'GenericError')
258
259if __name__ == '__main__':
260    iotests.main(supported_fmts=['qcow2'],
261                 supported_protocols=['file'])
262