xref: /qemu/migration/multifd-zlib.c (revision ec87b5da)
1 /*
2  * Multifd zlib compression implementation
3  *
4  * Copyright (c) 2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include <zlib.h>
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "qapi/error.h"
18 #include "migration.h"
19 #include "trace.h"
20 #include "multifd.h"
21 
22 struct zlib_data {
23     /* stream for compression */
24     z_stream zs;
25     /* compressed buffer */
26     uint8_t *zbuff;
27     /* size of compressed buffer */
28     uint32_t zbuff_len;
29 };
30 
31 /* Multifd zlib compression */
32 
33 /**
34  * zlib_send_setup: setup send side
35  *
36  * Setup each channel with zlib compression.
37  *
38  * Returns 0 for success or -1 for error
39  *
40  * @p: Params for the channel that we are using
41  * @errp: pointer to an error
42  */
43 static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
44 {
45     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
46     struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
47     z_stream *zs = &z->zs;
48 
49     zs->zalloc = Z_NULL;
50     zs->zfree = Z_NULL;
51     zs->opaque = Z_NULL;
52     if (deflateInit(zs, migrate_multifd_zlib_level()) != Z_OK) {
53         g_free(z);
54         error_setg(errp, "multifd %d: deflate init failed", p->id);
55         return -1;
56     }
57     /* We will never have more than page_count pages */
58     z->zbuff_len = page_count * qemu_target_page_size();
59     z->zbuff_len *= 2;
60     z->zbuff = g_try_malloc(z->zbuff_len);
61     if (!z->zbuff) {
62         deflateEnd(&z->zs);
63         g_free(z);
64         error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
65         return -1;
66     }
67     p->data = z;
68     return 0;
69 }
70 
71 /**
72  * zlib_send_cleanup: cleanup send side
73  *
74  * Close the channel and return memory.
75  *
76  * @p: Params for the channel that we are using
77  */
78 static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
79 {
80     struct zlib_data *z = p->data;
81 
82     deflateEnd(&z->zs);
83     g_free(z->zbuff);
84     z->zbuff = NULL;
85     g_free(p->data);
86     p->data = NULL;
87 }
88 
89 /**
90  * zlib_send_prepare: prepare date to be able to send
91  *
92  * Create a compressed buffer with all the pages that we are going to
93  * send.
94  *
95  * Returns 0 for success or -1 for error
96  *
97  * @p: Params for the channel that we are using
98  * @used: number of pages used
99  */
100 static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
101 {
102     struct iovec *iov = p->pages->iov;
103     struct zlib_data *z = p->data;
104     z_stream *zs = &z->zs;
105     uint32_t out_size = 0;
106     int ret;
107     uint32_t i;
108 
109     for (i = 0; i < used; i++) {
110         uint32_t available = z->zbuff_len - out_size;
111         int flush = Z_NO_FLUSH;
112 
113         if (i == used - 1) {
114             flush = Z_SYNC_FLUSH;
115         }
116 
117         zs->avail_in = iov[i].iov_len;
118         zs->next_in = iov[i].iov_base;
119 
120         zs->avail_out = available;
121         zs->next_out = z->zbuff + out_size;
122 
123         /*
124          * Welcome to deflate semantics
125          *
126          * We need to loop while:
127          * - return is Z_OK
128          * - there are stuff to be compressed
129          * - there are output space free
130          */
131         do {
132             ret = deflate(zs, flush);
133         } while (ret == Z_OK && zs->avail_in && zs->avail_out);
134         if (ret == Z_OK && zs->avail_in) {
135             error_setg(errp, "multifd %d: deflate failed to compress all input",
136                        p->id);
137             return -1;
138         }
139         if (ret != Z_OK) {
140             error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
141                        p->id, ret);
142             return -1;
143         }
144         out_size += available - zs->avail_out;
145     }
146     p->next_packet_size = out_size;
147     p->flags |= MULTIFD_FLAG_ZLIB;
148 
149     return 0;
150 }
151 
152 /**
153  * zlib_send_write: do the actual write of the data
154  *
155  * Do the actual write of the comprresed buffer.
156  *
157  * Returns 0 for success or -1 for error
158  *
159  * @p: Params for the channel that we are using
160  * @used: number of pages used
161  * @errp: pointer to an error
162  */
163 static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
164 {
165     struct zlib_data *z = p->data;
166 
167     return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
168                                  errp);
169 }
170 
171 /**
172  * zlib_recv_setup: setup receive side
173  *
174  * Create the compressed channel and buffer.
175  *
176  * Returns 0 for success or -1 for error
177  *
178  * @p: Params for the channel that we are using
179  * @errp: pointer to an error
180  */
181 static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
182 {
183     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
184     struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
185     z_stream *zs = &z->zs;
186 
187     p->data = z;
188     zs->zalloc = Z_NULL;
189     zs->zfree = Z_NULL;
190     zs->opaque = Z_NULL;
191     zs->avail_in = 0;
192     zs->next_in = Z_NULL;
193     if (inflateInit(zs) != Z_OK) {
194         error_setg(errp, "multifd %d: inflate init failed", p->id);
195         return -1;
196     }
197     /* We will never have more than page_count pages */
198     z->zbuff_len = page_count * qemu_target_page_size();
199     /* We know compression "could" use more space */
200     z->zbuff_len *= 2;
201     z->zbuff = g_try_malloc(z->zbuff_len);
202     if (!z->zbuff) {
203         inflateEnd(zs);
204         error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
205         return -1;
206     }
207     return 0;
208 }
209 
210 /**
211  * zlib_recv_cleanup: setup receive side
212  *
213  * For no compression this function does nothing.
214  *
215  * @p: Params for the channel that we are using
216  */
217 static void zlib_recv_cleanup(MultiFDRecvParams *p)
218 {
219     struct zlib_data *z = p->data;
220 
221     inflateEnd(&z->zs);
222     g_free(z->zbuff);
223     z->zbuff = NULL;
224     g_free(p->data);
225     p->data = NULL;
226 }
227 
228 /**
229  * zlib_recv_pages: read the data from the channel into actual pages
230  *
231  * Read the compressed buffer, and uncompress it into the actual
232  * pages.
233  *
234  * Returns 0 for success or -1 for error
235  *
236  * @p: Params for the channel that we are using
237  * @used: number of pages used
238  * @errp: pointer to an error
239  */
240 static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
241 {
242     struct zlib_data *z = p->data;
243     z_stream *zs = &z->zs;
244     uint32_t in_size = p->next_packet_size;
245     /* we measure the change of total_out */
246     uint32_t out_size = zs->total_out;
247     uint32_t expected_size = used * qemu_target_page_size();
248     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
249     int ret;
250     int i;
251 
252     if (flags != MULTIFD_FLAG_ZLIB) {
253         error_setg(errp, "multifd %d: flags received %x flags expected %x",
254                    p->id, flags, MULTIFD_FLAG_ZLIB);
255         return -1;
256     }
257     ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
258 
259     if (ret != 0) {
260         return ret;
261     }
262 
263     zs->avail_in = in_size;
264     zs->next_in = z->zbuff;
265 
266     for (i = 0; i < used; i++) {
267         struct iovec *iov = &p->pages->iov[i];
268         int flush = Z_NO_FLUSH;
269         unsigned long start = zs->total_out;
270 
271         if (i == used - 1) {
272             flush = Z_SYNC_FLUSH;
273         }
274 
275         zs->avail_out = iov->iov_len;
276         zs->next_out = iov->iov_base;
277 
278         /*
279          * Welcome to inflate semantics
280          *
281          * We need to loop while:
282          * - return is Z_OK
283          * - there are input available
284          * - we haven't completed a full page
285          */
286         do {
287             ret = inflate(zs, flush);
288         } while (ret == Z_OK && zs->avail_in
289                              && (zs->total_out - start) < iov->iov_len);
290         if (ret == Z_OK && (zs->total_out - start) < iov->iov_len) {
291             error_setg(errp, "multifd %d: inflate generated too few output",
292                        p->id);
293             return -1;
294         }
295         if (ret != Z_OK) {
296             error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
297                        p->id, ret);
298             return -1;
299         }
300     }
301     out_size = zs->total_out - out_size;
302     if (out_size != expected_size) {
303         error_setg(errp, "multifd %d: packet size received %d size expected %d",
304                    p->id, out_size, expected_size);
305         return -1;
306     }
307     return 0;
308 }
309 
310 static MultiFDMethods multifd_zlib_ops = {
311     .send_setup = zlib_send_setup,
312     .send_cleanup = zlib_send_cleanup,
313     .send_prepare = zlib_send_prepare,
314     .send_write = zlib_send_write,
315     .recv_setup = zlib_recv_setup,
316     .recv_cleanup = zlib_recv_cleanup,
317     .recv_pages = zlib_recv_pages
318 };
319 
320 static void multifd_zlib_register(void)
321 {
322     multifd_register_ops(MULTIFD_COMPRESSION_ZLIB, &multifd_zlib_ops);
323 }
324 
325 migration_init(multifd_zlib_register);
326