xref: /qemu/migration/multifd-zlib.c (revision c3bef3b4)
1 /*
2  * Multifd zlib compression implementation
3  *
4  * Copyright (c) 2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include <zlib.h>
15 #include "qemu/rcu.h"
16 #include "exec/ramblock.h"
17 #include "exec/target_page.h"
18 #include "qapi/error.h"
19 #include "migration.h"
20 #include "trace.h"
21 #include "multifd.h"
22 
23 struct zlib_data {
24     /* stream for compression */
25     z_stream zs;
26     /* compressed buffer */
27     uint8_t *zbuff;
28     /* size of compressed buffer */
29     uint32_t zbuff_len;
30     /* uncompressed buffer of size qemu_target_page_size() */
31     uint8_t *buf;
32 };
33 
34 /* Multifd zlib compression */
35 
36 /**
37  * zlib_send_setup: setup send side
38  *
39  * Setup each channel with zlib compression.
40  *
41  * Returns 0 for success or -1 for error
42  *
43  * @p: Params for the channel that we are using
44  * @errp: pointer to an error
45  */
46 static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
47 {
48     struct zlib_data *z = g_new0(struct zlib_data, 1);
49     z_stream *zs = &z->zs;
50     const char *err_msg;
51 
52     zs->zalloc = Z_NULL;
53     zs->zfree = Z_NULL;
54     zs->opaque = Z_NULL;
55     if (deflateInit(zs, migrate_multifd_zlib_level()) != Z_OK) {
56         err_msg = "deflate init failed";
57         goto err_free_z;
58     }
59     /* This is the maxium size of the compressed buffer */
60     z->zbuff_len = compressBound(MULTIFD_PACKET_SIZE);
61     z->zbuff = g_try_malloc(z->zbuff_len);
62     if (!z->zbuff) {
63         err_msg = "out of memory for zbuff";
64         goto err_deflate_end;
65     }
66     z->buf = g_try_malloc(qemu_target_page_size());
67     if (!z->buf) {
68         err_msg = "out of memory for buf";
69         goto err_free_zbuff;
70     }
71     p->data = z;
72     return 0;
73 
74 err_free_zbuff:
75     g_free(z->zbuff);
76 err_deflate_end:
77     deflateEnd(&z->zs);
78 err_free_z:
79     g_free(z);
80     error_setg(errp, "multifd %u: %s", p->id, err_msg);
81     return -1;
82 }
83 
84 /**
85  * zlib_send_cleanup: cleanup send side
86  *
87  * Close the channel and return memory.
88  *
89  * @p: Params for the channel that we are using
90  * @errp: pointer to an error
91  */
92 static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
93 {
94     struct zlib_data *z = p->data;
95 
96     deflateEnd(&z->zs);
97     g_free(z->zbuff);
98     z->zbuff = NULL;
99     g_free(z->buf);
100     z->buf = NULL;
101     g_free(p->data);
102     p->data = NULL;
103 }
104 
105 /**
106  * zlib_send_prepare: prepare date to be able to send
107  *
108  * Create a compressed buffer with all the pages that we are going to
109  * send.
110  *
111  * Returns 0 for success or -1 for error
112  *
113  * @p: Params for the channel that we are using
114  * @errp: pointer to an error
115  */
116 static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
117 {
118     struct zlib_data *z = p->data;
119     z_stream *zs = &z->zs;
120     uint32_t out_size = 0;
121     int ret;
122     uint32_t i;
123 
124     for (i = 0; i < p->normal_num; i++) {
125         uint32_t available = z->zbuff_len - out_size;
126         int flush = Z_NO_FLUSH;
127 
128         if (i == p->normal_num - 1) {
129             flush = Z_SYNC_FLUSH;
130         }
131 
132         /*
133          * Since the VM might be running, the page may be changing concurrently
134          * with compression. zlib does not guarantee that this is safe,
135          * therefore copy the page before calling deflate().
136          */
137         memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
138         zs->avail_in = p->page_size;
139         zs->next_in = z->buf;
140 
141         zs->avail_out = available;
142         zs->next_out = z->zbuff + out_size;
143 
144         /*
145          * Welcome to deflate semantics
146          *
147          * We need to loop while:
148          * - return is Z_OK
149          * - there are stuff to be compressed
150          * - there are output space free
151          */
152         do {
153             ret = deflate(zs, flush);
154         } while (ret == Z_OK && zs->avail_in && zs->avail_out);
155         if (ret == Z_OK && zs->avail_in) {
156             error_setg(errp, "multifd %u: deflate failed to compress all input",
157                        p->id);
158             return -1;
159         }
160         if (ret != Z_OK) {
161             error_setg(errp, "multifd %u: deflate returned %d instead of Z_OK",
162                        p->id, ret);
163             return -1;
164         }
165         out_size += available - zs->avail_out;
166     }
167     p->iov[p->iovs_num].iov_base = z->zbuff;
168     p->iov[p->iovs_num].iov_len = out_size;
169     p->iovs_num++;
170     p->next_packet_size = out_size;
171     p->flags |= MULTIFD_FLAG_ZLIB;
172 
173     return 0;
174 }
175 
176 /**
177  * zlib_recv_setup: setup receive side
178  *
179  * Create the compressed channel and buffer.
180  *
181  * Returns 0 for success or -1 for error
182  *
183  * @p: Params for the channel that we are using
184  * @errp: pointer to an error
185  */
186 static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
187 {
188     struct zlib_data *z = g_new0(struct zlib_data, 1);
189     z_stream *zs = &z->zs;
190 
191     p->data = z;
192     zs->zalloc = Z_NULL;
193     zs->zfree = Z_NULL;
194     zs->opaque = Z_NULL;
195     zs->avail_in = 0;
196     zs->next_in = Z_NULL;
197     if (inflateInit(zs) != Z_OK) {
198         error_setg(errp, "multifd %u: inflate init failed", p->id);
199         return -1;
200     }
201     /* To be safe, we reserve twice the size of the packet */
202     z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
203     z->zbuff = g_try_malloc(z->zbuff_len);
204     if (!z->zbuff) {
205         inflateEnd(zs);
206         error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
207         return -1;
208     }
209     return 0;
210 }
211 
212 /**
213  * zlib_recv_cleanup: setup receive side
214  *
215  * For no compression this function does nothing.
216  *
217  * @p: Params for the channel that we are using
218  */
219 static void zlib_recv_cleanup(MultiFDRecvParams *p)
220 {
221     struct zlib_data *z = p->data;
222 
223     inflateEnd(&z->zs);
224     g_free(z->zbuff);
225     z->zbuff = NULL;
226     g_free(p->data);
227     p->data = NULL;
228 }
229 
230 /**
231  * zlib_recv_pages: read the data from the channel into actual pages
232  *
233  * Read the compressed buffer, and uncompress it into the actual
234  * pages.
235  *
236  * Returns 0 for success or -1 for error
237  *
238  * @p: Params for the channel that we are using
239  * @errp: pointer to an error
240  */
241 static int zlib_recv_pages(MultiFDRecvParams *p, Error **errp)
242 {
243     struct zlib_data *z = p->data;
244     z_stream *zs = &z->zs;
245     uint32_t in_size = p->next_packet_size;
246     /* we measure the change of total_out */
247     uint32_t out_size = zs->total_out;
248     uint32_t expected_size = p->normal_num * p->page_size;
249     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
250     int ret;
251     int i;
252 
253     if (flags != MULTIFD_FLAG_ZLIB) {
254         error_setg(errp, "multifd %u: flags received %x flags expected %x",
255                    p->id, flags, MULTIFD_FLAG_ZLIB);
256         return -1;
257     }
258     ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
259 
260     if (ret != 0) {
261         return ret;
262     }
263 
264     zs->avail_in = in_size;
265     zs->next_in = z->zbuff;
266 
267     for (i = 0; i < p->normal_num; i++) {
268         int flush = Z_NO_FLUSH;
269         unsigned long start = zs->total_out;
270 
271         if (i == p->normal_num - 1) {
272             flush = Z_SYNC_FLUSH;
273         }
274 
275         zs->avail_out = p->page_size;
276         zs->next_out = p->host + p->normal[i];
277 
278         /*
279          * Welcome to inflate semantics
280          *
281          * We need to loop while:
282          * - return is Z_OK
283          * - there are input available
284          * - we haven't completed a full page
285          */
286         do {
287             ret = inflate(zs, flush);
288         } while (ret == Z_OK && zs->avail_in
289                              && (zs->total_out - start) < p->page_size);
290         if (ret == Z_OK && (zs->total_out - start) < p->page_size) {
291             error_setg(errp, "multifd %u: inflate generated too few output",
292                        p->id);
293             return -1;
294         }
295         if (ret != Z_OK) {
296             error_setg(errp, "multifd %u: inflate returned %d instead of Z_OK",
297                        p->id, ret);
298             return -1;
299         }
300     }
301     out_size = zs->total_out - out_size;
302     if (out_size != expected_size) {
303         error_setg(errp, "multifd %u: packet size received %u size expected %u",
304                    p->id, out_size, expected_size);
305         return -1;
306     }
307     return 0;
308 }
309 
310 static MultiFDMethods multifd_zlib_ops = {
311     .send_setup = zlib_send_setup,
312     .send_cleanup = zlib_send_cleanup,
313     .send_prepare = zlib_send_prepare,
314     .recv_setup = zlib_recv_setup,
315     .recv_cleanup = zlib_recv_cleanup,
316     .recv_pages = zlib_recv_pages
317 };
318 
319 static void multifd_zlib_register(void)
320 {
321     multifd_register_ops(MULTIFD_COMPRESSION_ZLIB, &multifd_zlib_ops);
322 }
323 
324 migration_init(multifd_zlib_register);
325