xref: /qemu/migration/ram-compress.c (revision 7c1f51bf)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
31 
32 #include "ram-compress.h"
33 
34 #include "qemu/error-report.h"
35 #include "migration.h"
36 #include "options.h"
37 #include "io/channel-null.h"
38 #include "exec/target_page.h"
39 #include "exec/ramblock.h"
40 
41 CompressionStats compression_counters;
42 
43 static CompressParam *comp_param;
44 static QemuThread *compress_threads;
45 /* comp_done_cond is used to wake up the migration thread when
46  * one of the compression threads has finished the compression.
47  * comp_done_lock is used to co-work with comp_done_cond.
48  */
49 static QemuMutex comp_done_lock;
50 static QemuCond comp_done_cond;
51 
52 struct DecompressParam {
53     bool done;
54     bool quit;
55     QemuMutex mutex;
56     QemuCond cond;
57     void *des;
58     uint8_t *compbuf;
59     int len;
60     z_stream stream;
61 };
62 typedef struct DecompressParam DecompressParam;
63 
64 static QEMUFile *decomp_file;
65 static DecompressParam *decomp_param;
66 static QemuThread *decompress_threads;
67 static QemuMutex decomp_done_lock;
68 static QemuCond decomp_done_cond;
69 
70 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
71                                            RAMBlock *block, ram_addr_t offset,
72                                            uint8_t *source_buf);
73 
74 static void *do_data_compress(void *opaque)
75 {
76     CompressParam *param = opaque;
77     RAMBlock *block;
78     ram_addr_t offset;
79     CompressResult result;
80 
81     qemu_mutex_lock(&param->mutex);
82     while (!param->quit) {
83         if (param->trigger) {
84             block = param->block;
85             offset = param->offset;
86             param->trigger = false;
87             qemu_mutex_unlock(&param->mutex);
88 
89             result = do_compress_ram_page(param->file, &param->stream,
90                                           block, offset, param->originbuf);
91 
92             qemu_mutex_lock(&comp_done_lock);
93             param->done = true;
94             param->result = result;
95             qemu_cond_signal(&comp_done_cond);
96             qemu_mutex_unlock(&comp_done_lock);
97 
98             qemu_mutex_lock(&param->mutex);
99         } else {
100             qemu_cond_wait(&param->cond, &param->mutex);
101         }
102     }
103     qemu_mutex_unlock(&param->mutex);
104 
105     return NULL;
106 }
107 
108 void compress_threads_save_cleanup(void)
109 {
110     int i, thread_count;
111 
112     if (!migrate_compress() || !comp_param) {
113         return;
114     }
115 
116     thread_count = migrate_compress_threads();
117     for (i = 0; i < thread_count; i++) {
118         /*
119          * we use it as a indicator which shows if the thread is
120          * properly init'd or not
121          */
122         if (!comp_param[i].file) {
123             break;
124         }
125 
126         qemu_mutex_lock(&comp_param[i].mutex);
127         comp_param[i].quit = true;
128         qemu_cond_signal(&comp_param[i].cond);
129         qemu_mutex_unlock(&comp_param[i].mutex);
130 
131         qemu_thread_join(compress_threads + i);
132         qemu_mutex_destroy(&comp_param[i].mutex);
133         qemu_cond_destroy(&comp_param[i].cond);
134         deflateEnd(&comp_param[i].stream);
135         g_free(comp_param[i].originbuf);
136         qemu_fclose(comp_param[i].file);
137         comp_param[i].file = NULL;
138     }
139     qemu_mutex_destroy(&comp_done_lock);
140     qemu_cond_destroy(&comp_done_cond);
141     g_free(compress_threads);
142     g_free(comp_param);
143     compress_threads = NULL;
144     comp_param = NULL;
145 }
146 
147 int compress_threads_save_setup(void)
148 {
149     int i, thread_count;
150 
151     if (!migrate_compress()) {
152         return 0;
153     }
154     thread_count = migrate_compress_threads();
155     compress_threads = g_new0(QemuThread, thread_count);
156     comp_param = g_new0(CompressParam, thread_count);
157     qemu_cond_init(&comp_done_cond);
158     qemu_mutex_init(&comp_done_lock);
159     for (i = 0; i < thread_count; i++) {
160         comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
161         if (!comp_param[i].originbuf) {
162             goto exit;
163         }
164 
165         if (deflateInit(&comp_param[i].stream,
166                         migrate_compress_level()) != Z_OK) {
167             g_free(comp_param[i].originbuf);
168             goto exit;
169         }
170 
171         /* comp_param[i].file is just used as a dummy buffer to save data,
172          * set its ops to empty.
173          */
174         comp_param[i].file = qemu_file_new_output(
175             QIO_CHANNEL(qio_channel_null_new()));
176         comp_param[i].done = true;
177         comp_param[i].quit = false;
178         qemu_mutex_init(&comp_param[i].mutex);
179         qemu_cond_init(&comp_param[i].cond);
180         qemu_thread_create(compress_threads + i, "compress",
181                            do_data_compress, comp_param + i,
182                            QEMU_THREAD_JOINABLE);
183     }
184     return 0;
185 
186 exit:
187     compress_threads_save_cleanup();
188     return -1;
189 }
190 
191 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
192                                            RAMBlock *block, ram_addr_t offset,
193                                            uint8_t *source_buf)
194 {
195     uint8_t *p = block->host + offset;
196     size_t page_size = qemu_target_page_size();
197     int ret;
198 
199     assert(qemu_file_buffer_empty(f));
200 
201     if (buffer_is_zero(p, page_size)) {
202         return RES_ZEROPAGE;
203     }
204 
205     /*
206      * copy it to a internal buffer to avoid it being modified by VM
207      * so that we can catch up the error during compression and
208      * decompression
209      */
210     memcpy(source_buf, p, page_size);
211     ret = qemu_put_compression_data(f, stream, source_buf, page_size);
212     if (ret < 0) {
213         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
214         error_report("compressed data failed!");
215         qemu_fflush(f);
216         return RES_NONE;
217     }
218     return RES_COMPRESS;
219 }
220 
221 static inline void compress_reset_result(CompressParam *param)
222 {
223     param->result = RES_NONE;
224     param->block = NULL;
225     param->offset = 0;
226 }
227 
228 void flush_compressed_data(int (send_queued_data(CompressParam *)))
229 {
230     int idx, thread_count;
231 
232     thread_count = migrate_compress_threads();
233 
234     qemu_mutex_lock(&comp_done_lock);
235     for (idx = 0; idx < thread_count; idx++) {
236         while (!comp_param[idx].done) {
237             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
238         }
239     }
240     qemu_mutex_unlock(&comp_done_lock);
241 
242     for (idx = 0; idx < thread_count; idx++) {
243         qemu_mutex_lock(&comp_param[idx].mutex);
244         if (!comp_param[idx].quit) {
245             CompressParam *param = &comp_param[idx];
246             send_queued_data(param);
247             assert(qemu_file_buffer_empty(param->file));
248             compress_reset_result(param);
249         }
250         qemu_mutex_unlock(&comp_param[idx].mutex);
251     }
252 }
253 
254 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
255                                        ram_addr_t offset)
256 {
257     param->block = block;
258     param->offset = offset;
259     param->trigger = true;
260 }
261 
262 int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
263                                 int (send_queued_data(CompressParam *)))
264 {
265     int idx, thread_count, pages = -1;
266     bool wait = migrate_compress_wait_thread();
267 
268     thread_count = migrate_compress_threads();
269     qemu_mutex_lock(&comp_done_lock);
270 retry:
271     for (idx = 0; idx < thread_count; idx++) {
272         if (comp_param[idx].done) {
273             CompressParam *param = &comp_param[idx];
274             qemu_mutex_lock(&param->mutex);
275             param->done = false;
276             send_queued_data(param);
277             assert(qemu_file_buffer_empty(param->file));
278             compress_reset_result(param);
279             set_compress_params(param, block, offset);
280 
281             qemu_cond_signal(&param->cond);
282             qemu_mutex_unlock(&param->mutex);
283             pages = 1;
284             break;
285         }
286     }
287 
288     /*
289      * wait for the free thread if the user specifies 'compress-wait-thread',
290      * otherwise we will post the page out in the main thread as normal page.
291      */
292     if (pages < 0 && wait) {
293         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
294         goto retry;
295     }
296     qemu_mutex_unlock(&comp_done_lock);
297 
298     return pages;
299 }
300 
301 /* return the size after decompression, or negative value on error */
302 static int
303 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
304                      const uint8_t *source, size_t source_len)
305 {
306     int err;
307 
308     err = inflateReset(stream);
309     if (err != Z_OK) {
310         return -1;
311     }
312 
313     stream->avail_in = source_len;
314     stream->next_in = (uint8_t *)source;
315     stream->avail_out = dest_len;
316     stream->next_out = dest;
317 
318     err = inflate(stream, Z_NO_FLUSH);
319     if (err != Z_STREAM_END) {
320         return -1;
321     }
322 
323     return stream->total_out;
324 }
325 
326 static void *do_data_decompress(void *opaque)
327 {
328     DecompressParam *param = opaque;
329     unsigned long pagesize;
330     uint8_t *des;
331     int len, ret;
332 
333     qemu_mutex_lock(&param->mutex);
334     while (!param->quit) {
335         if (param->des) {
336             des = param->des;
337             len = param->len;
338             param->des = 0;
339             qemu_mutex_unlock(&param->mutex);
340 
341             pagesize = qemu_target_page_size();
342 
343             ret = qemu_uncompress_data(&param->stream, des, pagesize,
344                                        param->compbuf, len);
345             if (ret < 0 && migrate_get_current()->decompress_error_check) {
346                 error_report("decompress data failed");
347                 qemu_file_set_error(decomp_file, ret);
348             }
349 
350             qemu_mutex_lock(&decomp_done_lock);
351             param->done = true;
352             qemu_cond_signal(&decomp_done_cond);
353             qemu_mutex_unlock(&decomp_done_lock);
354 
355             qemu_mutex_lock(&param->mutex);
356         } else {
357             qemu_cond_wait(&param->cond, &param->mutex);
358         }
359     }
360     qemu_mutex_unlock(&param->mutex);
361 
362     return NULL;
363 }
364 
365 int wait_for_decompress_done(void)
366 {
367     int idx, thread_count;
368 
369     if (!migrate_compress()) {
370         return 0;
371     }
372 
373     thread_count = migrate_decompress_threads();
374     qemu_mutex_lock(&decomp_done_lock);
375     for (idx = 0; idx < thread_count; idx++) {
376         while (!decomp_param[idx].done) {
377             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
378         }
379     }
380     qemu_mutex_unlock(&decomp_done_lock);
381     return qemu_file_get_error(decomp_file);
382 }
383 
384 void compress_threads_load_cleanup(void)
385 {
386     int i, thread_count;
387 
388     if (!migrate_compress()) {
389         return;
390     }
391     thread_count = migrate_decompress_threads();
392     for (i = 0; i < thread_count; i++) {
393         /*
394          * we use it as a indicator which shows if the thread is
395          * properly init'd or not
396          */
397         if (!decomp_param[i].compbuf) {
398             break;
399         }
400 
401         qemu_mutex_lock(&decomp_param[i].mutex);
402         decomp_param[i].quit = true;
403         qemu_cond_signal(&decomp_param[i].cond);
404         qemu_mutex_unlock(&decomp_param[i].mutex);
405     }
406     for (i = 0; i < thread_count; i++) {
407         if (!decomp_param[i].compbuf) {
408             break;
409         }
410 
411         qemu_thread_join(decompress_threads + i);
412         qemu_mutex_destroy(&decomp_param[i].mutex);
413         qemu_cond_destroy(&decomp_param[i].cond);
414         inflateEnd(&decomp_param[i].stream);
415         g_free(decomp_param[i].compbuf);
416         decomp_param[i].compbuf = NULL;
417     }
418     g_free(decompress_threads);
419     g_free(decomp_param);
420     decompress_threads = NULL;
421     decomp_param = NULL;
422     decomp_file = NULL;
423 }
424 
425 int compress_threads_load_setup(QEMUFile *f)
426 {
427     int i, thread_count;
428 
429     if (!migrate_compress()) {
430         return 0;
431     }
432 
433     thread_count = migrate_decompress_threads();
434     decompress_threads = g_new0(QemuThread, thread_count);
435     decomp_param = g_new0(DecompressParam, thread_count);
436     qemu_mutex_init(&decomp_done_lock);
437     qemu_cond_init(&decomp_done_cond);
438     decomp_file = f;
439     for (i = 0; i < thread_count; i++) {
440         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
441             goto exit;
442         }
443 
444         size_t compbuf_size = compressBound(qemu_target_page_size());
445         decomp_param[i].compbuf = g_malloc0(compbuf_size);
446         qemu_mutex_init(&decomp_param[i].mutex);
447         qemu_cond_init(&decomp_param[i].cond);
448         decomp_param[i].done = true;
449         decomp_param[i].quit = false;
450         qemu_thread_create(decompress_threads + i, "decompress",
451                            do_data_decompress, decomp_param + i,
452                            QEMU_THREAD_JOINABLE);
453     }
454     return 0;
455 exit:
456     compress_threads_load_cleanup();
457     return -1;
458 }
459 
460 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
461 {
462     int idx, thread_count;
463 
464     thread_count = migrate_decompress_threads();
465     QEMU_LOCK_GUARD(&decomp_done_lock);
466     while (true) {
467         for (idx = 0; idx < thread_count; idx++) {
468             if (decomp_param[idx].done) {
469                 decomp_param[idx].done = false;
470                 qemu_mutex_lock(&decomp_param[idx].mutex);
471                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
472                 decomp_param[idx].des = host;
473                 decomp_param[idx].len = len;
474                 qemu_cond_signal(&decomp_param[idx].cond);
475                 qemu_mutex_unlock(&decomp_param[idx].mutex);
476                 break;
477             }
478         }
479         if (idx < thread_count) {
480             break;
481         } else {
482             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
483         }
484     }
485 }
486