xref: /qemu/tests/qemu-iotests/163 (revision 09a274d8)
1#!/usr/bin/env python
2#
3# Tests for shrinking images
4#
5# Copyright (c) 2016-2017 Parallels International GmbH
6#
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20
21import os, random, iotests, struct, qcow2, sys
22from iotests import qemu_img, qemu_io, image_size
23
24if sys.version_info.major == 2:
25    range = xrange
26
27test_img = os.path.join(iotests.test_dir, 'test.img')
28check_img = os.path.join(iotests.test_dir, 'check.img')
29
30def size_to_int(str):
31    suff = ['B', 'K', 'M', 'G', 'T']
32    return int(str[:-1]) * 1024**suff.index(str[-1:])
33
34class ShrinkBaseClass(iotests.QMPTestCase):
35    image_len = '128M'
36    shrink_size = '10M'
37    chunk_size = '16M'
38    refcount_bits = '16'
39
40    def __qcow2_check(self, filename):
41        entry_bits = 3
42        entry_size = 1 << entry_bits
43        l1_mask = 0x00fffffffffffe00
44        div_roundup = lambda n, d: (n + d - 1) // d
45
46        def split_by_n(data, n):
47            for x in range(0, len(data), n):
48                yield struct.unpack('>Q', data[x:x + n])[0] & l1_mask
49
50        def check_l1_table(h, l1_data):
51            l1_list = list(split_by_n(l1_data, entry_size))
52            real_l1_size = div_roundup(h.size,
53                                       1 << (h.cluster_bits*2 - entry_size))
54            used, unused = l1_list[:real_l1_size], l1_list[real_l1_size:]
55
56            self.assertTrue(len(used) != 0, "Verifying l1 table content")
57            self.assertFalse(any(unused), "Verifying l1 table content")
58
59        def check_reftable(fd, h, reftable):
60            for offset in split_by_n(reftable, entry_size):
61                if offset != 0:
62                    fd.seek(offset)
63                    cluster = fd.read(1 << h.cluster_bits)
64                    self.assertTrue(any(cluster), "Verifying reftable content")
65
66        with open(filename, "rb") as fd:
67            h = qcow2.QcowHeader(fd)
68
69            fd.seek(h.l1_table_offset)
70            l1_table = fd.read(h.l1_size << entry_bits)
71
72            fd.seek(h.refcount_table_offset)
73            reftable = fd.read(h.refcount_table_clusters << h.cluster_bits)
74
75            check_l1_table(h, l1_table)
76            check_reftable(fd, h, reftable)
77
78    def __raw_check(self, filename):
79        pass
80
81    image_check = {
82        'qcow2' : __qcow2_check,
83        'raw' : __raw_check
84    }
85
86    def setUp(self):
87        if iotests.imgfmt == 'raw':
88            qemu_img('create', '-f', iotests.imgfmt, test_img, self.image_len)
89            qemu_img('create', '-f', iotests.imgfmt, check_img,
90                     self.shrink_size)
91        else:
92            qemu_img('create', '-f', iotests.imgfmt,
93                     '-o', 'cluster_size=' + self.cluster_size +
94                     ',refcount_bits=' + self.refcount_bits,
95                     test_img, self.image_len)
96            qemu_img('create', '-f', iotests.imgfmt,
97                     '-o', 'cluster_size=%s'% self.cluster_size,
98                     check_img, self.shrink_size)
99        qemu_io('-c', 'write -P 0xff 0 ' + self.shrink_size, check_img)
100
101    def tearDown(self):
102        os.remove(test_img)
103        os.remove(check_img)
104
105    def image_verify(self):
106        self.assertEqual(image_size(test_img), image_size(check_img),
107                         "Verifying image size")
108        self.image_check[iotests.imgfmt](self, test_img)
109
110        if iotests.imgfmt == 'raw':
111            return
112        self.assertEqual(qemu_img('check', test_img), 0,
113                         "Verifying image corruption")
114
115    def test_empty_image(self):
116        qemu_img('resize',  '-f', iotests.imgfmt, '--shrink', test_img,
117                 self.shrink_size)
118
119        self.assertEqual(
120            qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, test_img),
121            qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, check_img),
122            "Verifying image content")
123
124        self.image_verify()
125
126    def test_sequential_write(self):
127        for offs in range(0, size_to_int(self.image_len),
128                          size_to_int(self.chunk_size)):
129            qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size),
130                    test_img)
131
132        qemu_img('resize',  '-f', iotests.imgfmt, '--shrink', test_img,
133                 self.shrink_size)
134
135        self.assertEqual(qemu_img("compare", test_img, check_img), 0,
136                         "Verifying image content")
137
138        self.image_verify()
139
140    def test_random_write(self):
141        offs_list = list(range(0, size_to_int(self.image_len),
142                               size_to_int(self.chunk_size)))
143        random.shuffle(offs_list)
144        for offs in offs_list:
145            qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size),
146                    test_img)
147
148        qemu_img('resize',  '-f', iotests.imgfmt, '--shrink', test_img,
149                 self.shrink_size)
150
151        self.assertEqual(qemu_img("compare", test_img, check_img), 0,
152                         "Verifying image content")
153
154        self.image_verify()
155
156class TestShrink512(ShrinkBaseClass):
157    image_len = '3M'
158    shrink_size = '1M'
159    chunk_size = '256K'
160    cluster_size = '512'
161    refcount_bits = '64'
162
163class TestShrink64K(ShrinkBaseClass):
164    cluster_size = '64K'
165
166class TestShrink1M(ShrinkBaseClass):
167    cluster_size = '1M'
168    refcount_bits = '1'
169
170ShrinkBaseClass = None
171
172if __name__ == '__main__':
173    iotests.main(supported_fmts=['raw', 'qcow2'])
174