xref: /qemu/migration/multifd-zstd.c (revision 27a4a30e)
1 /*
2  * Multifd zlib compression implementation
3  *
4  * Copyright (c) 2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include <zstd.h>
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "qapi/error.h"
18 #include "migration.h"
19 #include "trace.h"
20 #include "multifd.h"
21 
22 struct zstd_data {
23     /* stream for compression */
24     ZSTD_CStream *zcs;
25     /* stream for decompression */
26     ZSTD_DStream *zds;
27     /* buffers */
28     ZSTD_inBuffer in;
29     ZSTD_outBuffer out;
30     /* compressed buffer */
31     uint8_t *zbuff;
32     /* size of compressed buffer */
33     uint32_t zbuff_len;
34 };
35 
36 /* Multifd zstd compression */
37 
38 /**
39  * zstd_send_setup: setup send side
40  *
41  * Setup each channel with zstd compression.
42  *
43  * Returns 0 for success or -1 for error
44  *
45  * @p: Params for the channel that we are using
46  * @errp: pointer to an error
47  */
48 static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
49 {
50     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
51     struct zstd_data *z = g_new0(struct zstd_data, 1);
52     int res;
53 
54     p->data = z;
55     z->zcs = ZSTD_createCStream();
56     if (!z->zcs) {
57         g_free(z);
58         error_setg(errp, "multifd %d: zstd createCStream failed", p->id);
59         return -1;
60     }
61 
62     res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
63     if (ZSTD_isError(res)) {
64         ZSTD_freeCStream(z->zcs);
65         g_free(z);
66         error_setg(errp, "multifd %d: initCStream failed with error %s",
67                    p->id, ZSTD_getErrorName(res));
68         return -1;
69     }
70     /* We will never have more than page_count pages */
71     z->zbuff_len = page_count * qemu_target_page_size();
72     z->zbuff_len *= 2;
73     z->zbuff = g_try_malloc(z->zbuff_len);
74     if (!z->zbuff) {
75         ZSTD_freeCStream(z->zcs);
76         g_free(z);
77         error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
78         return -1;
79     }
80     return 0;
81 }
82 
83 /**
84  * zstd_send_cleanup: cleanup send side
85  *
86  * Close the channel and return memory.
87  *
88  * @p: Params for the channel that we are using
89  */
90 static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
91 {
92     struct zstd_data *z = p->data;
93 
94     ZSTD_freeCStream(z->zcs);
95     z->zcs = NULL;
96     g_free(z->zbuff);
97     z->zbuff = NULL;
98     g_free(p->data);
99     p->data = NULL;
100 }
101 
102 /**
103  * zstd_send_prepare: prepare date to be able to send
104  *
105  * Create a compressed buffer with all the pages that we are going to
106  * send.
107  *
108  * Returns 0 for success or -1 for error
109  *
110  * @p: Params for the channel that we are using
111  * @used: number of pages used
112  */
113 static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
114 {
115     struct iovec *iov = p->pages->iov;
116     struct zstd_data *z = p->data;
117     int ret;
118     uint32_t i;
119 
120     z->out.dst = z->zbuff;
121     z->out.size = z->zbuff_len;
122     z->out.pos = 0;
123 
124     for (i = 0; i < used; i++) {
125         ZSTD_EndDirective flush = ZSTD_e_continue;
126 
127         if (i == used - 1) {
128             flush = ZSTD_e_flush;
129         }
130         z->in.src = iov[i].iov_base;
131         z->in.size = iov[i].iov_len;
132         z->in.pos = 0;
133 
134         /*
135          * Welcome to compressStream2 semantics
136          *
137          * We need to loop while:
138          * - return is > 0
139          * - there is input available
140          * - there is output space free
141          */
142         do {
143             ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
144         } while (ret > 0 && (z->in.size - z->in.pos > 0)
145                          && (z->out.size - z->out.pos > 0));
146         if (ret > 0 && (z->in.size - z->in.pos > 0)) {
147             error_setg(errp, "multifd %d: compressStream buffer too small",
148                        p->id);
149             return -1;
150         }
151         if (ZSTD_isError(ret)) {
152             error_setg(errp, "multifd %d: compressStream error %s",
153                        p->id, ZSTD_getErrorName(ret));
154             return -1;
155         }
156     }
157     p->next_packet_size = z->out.pos;
158     p->flags |= MULTIFD_FLAG_ZSTD;
159 
160     return 0;
161 }
162 
163 /**
164  * zstd_send_write: do the actual write of the data
165  *
166  * Do the actual write of the comprresed buffer.
167  *
168  * Returns 0 for success or -1 for error
169  *
170  * @p: Params for the channel that we are using
171  * @used: number of pages used
172  * @errp: pointer to an error
173  */
174 static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
175 {
176     struct zstd_data *z = p->data;
177 
178     return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
179                                  errp);
180 }
181 
182 /**
183  * zstd_recv_setup: setup receive side
184  *
185  * Create the compressed channel and buffer.
186  *
187  * Returns 0 for success or -1 for error
188  *
189  * @p: Params for the channel that we are using
190  * @errp: pointer to an error
191  */
192 static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
193 {
194     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
195     struct zstd_data *z = g_new0(struct zstd_data, 1);
196     int ret;
197 
198     p->data = z;
199     z->zds = ZSTD_createDStream();
200     if (!z->zds) {
201         g_free(z);
202         error_setg(errp, "multifd %d: zstd createDStream failed", p->id);
203         return -1;
204     }
205 
206     ret = ZSTD_initDStream(z->zds);
207     if (ZSTD_isError(ret)) {
208         ZSTD_freeDStream(z->zds);
209         g_free(z);
210         error_setg(errp, "multifd %d: initDStream failed with error %s",
211                    p->id, ZSTD_getErrorName(ret));
212         return -1;
213     }
214 
215     /* We will never have more than page_count pages */
216     z->zbuff_len = page_count * qemu_target_page_size();
217     /* We know compression "could" use more space */
218     z->zbuff_len *= 2;
219     z->zbuff = g_try_malloc(z->zbuff_len);
220     if (!z->zbuff) {
221         ZSTD_freeDStream(z->zds);
222         g_free(z);
223         error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
224         return -1;
225     }
226     return 0;
227 }
228 
229 /**
230  * zstd_recv_cleanup: setup receive side
231  *
232  * For no compression this function does nothing.
233  *
234  * @p: Params for the channel that we are using
235  */
236 static void zstd_recv_cleanup(MultiFDRecvParams *p)
237 {
238     struct zstd_data *z = p->data;
239 
240     ZSTD_freeDStream(z->zds);
241     z->zds = NULL;
242     g_free(z->zbuff);
243     z->zbuff = NULL;
244     g_free(p->data);
245     p->data = NULL;
246 }
247 
248 /**
249  * zstd_recv_pages: read the data from the channel into actual pages
250  *
251  * Read the compressed buffer, and uncompress it into the actual
252  * pages.
253  *
254  * Returns 0 for success or -1 for error
255  *
256  * @p: Params for the channel that we are using
257  * @used: number of pages used
258  * @errp: pointer to an error
259  */
260 static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
261 {
262     uint32_t in_size = p->next_packet_size;
263     uint32_t out_size = 0;
264     uint32_t expected_size = used * qemu_target_page_size();
265     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
266     struct zstd_data *z = p->data;
267     int ret;
268     int i;
269 
270     if (flags != MULTIFD_FLAG_ZSTD) {
271         error_setg(errp, "multifd %d: flags received %x flags expected %x",
272                    p->id, flags, MULTIFD_FLAG_ZSTD);
273         return -1;
274     }
275     ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
276 
277     if (ret != 0) {
278         return ret;
279     }
280 
281     z->in.src = z->zbuff;
282     z->in.size = in_size;
283     z->in.pos = 0;
284 
285     for (i = 0; i < used; i++) {
286         struct iovec *iov = &p->pages->iov[i];
287 
288         z->out.dst = iov->iov_base;
289         z->out.size = iov->iov_len;
290         z->out.pos = 0;
291 
292         /*
293          * Welcome to decompressStream semantics
294          *
295          * We need to loop while:
296          * - return is > 0
297          * - there is input available
298          * - we haven't put out a full page
299          */
300         do {
301             ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
302         } while (ret > 0 && (z->in.size - z->in.pos > 0)
303                          && (z->out.pos < iov->iov_len));
304         if (ret > 0 && (z->out.pos < iov->iov_len)) {
305             error_setg(errp, "multifd %d: decompressStream buffer too small",
306                        p->id);
307             return -1;
308         }
309         if (ZSTD_isError(ret)) {
310             error_setg(errp, "multifd %d: decompressStream returned %s",
311                        p->id, ZSTD_getErrorName(ret));
312             return ret;
313         }
314         out_size += z->out.pos;
315     }
316     if (out_size != expected_size) {
317         error_setg(errp, "multifd %d: packet size received %d size expected %d",
318                    p->id, out_size, expected_size);
319         return -1;
320     }
321     return 0;
322 }
323 
324 static MultiFDMethods multifd_zstd_ops = {
325     .send_setup = zstd_send_setup,
326     .send_cleanup = zstd_send_cleanup,
327     .send_prepare = zstd_send_prepare,
328     .send_write = zstd_send_write,
329     .recv_setup = zstd_recv_setup,
330     .recv_cleanup = zstd_recv_cleanup,
331     .recv_pages = zstd_recv_pages
332 };
333 
334 static void multifd_zstd_register(void)
335 {
336     multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
337 }
338 
339 migration_init(multifd_zstd_register);
340