xref: /qemu/migration/ram-compress.c (revision 5db05230)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
31 
32 #include "ram-compress.h"
33 
34 #include "qemu/error-report.h"
35 #include "qemu/stats64.h"
36 #include "migration.h"
37 #include "options.h"
38 #include "io/channel-null.h"
39 #include "exec/target_page.h"
40 #include "exec/ramblock.h"
41 #include "ram.h"
42 #include "migration-stats.h"
43 
44 static struct {
45     int64_t pages;
46     int64_t busy;
47     double busy_rate;
48     int64_t compressed_size;
49     double compression_rate;
50     /* compression statistics since the beginning of the period */
51     /* amount of count that no free thread to compress data */
52     uint64_t compress_thread_busy_prev;
53     /* amount bytes after compression */
54     uint64_t compressed_size_prev;
55     /* amount of compressed pages */
56     uint64_t compress_pages_prev;
57 } compression_counters;
58 
59 static CompressParam *comp_param;
60 static QemuThread *compress_threads;
61 /* comp_done_cond is used to wake up the migration thread when
62  * one of the compression threads has finished the compression.
63  * comp_done_lock is used to co-work with comp_done_cond.
64  */
65 static QemuMutex comp_done_lock;
66 static QemuCond comp_done_cond;
67 
68 struct DecompressParam {
69     bool done;
70     bool quit;
71     QemuMutex mutex;
72     QemuCond cond;
73     void *des;
74     uint8_t *compbuf;
75     int len;
76     z_stream stream;
77 };
78 typedef struct DecompressParam DecompressParam;
79 
80 static QEMUFile *decomp_file;
81 static DecompressParam *decomp_param;
82 static QemuThread *decompress_threads;
83 static QemuMutex decomp_done_lock;
84 static QemuCond decomp_done_cond;
85 
86 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
87                                            RAMBlock *block, ram_addr_t offset,
88                                            uint8_t *source_buf);
89 
90 static void *do_data_compress(void *opaque)
91 {
92     CompressParam *param = opaque;
93     RAMBlock *block;
94     ram_addr_t offset;
95     CompressResult result;
96 
97     qemu_mutex_lock(&param->mutex);
98     while (!param->quit) {
99         if (param->trigger) {
100             block = param->block;
101             offset = param->offset;
102             param->trigger = false;
103             qemu_mutex_unlock(&param->mutex);
104 
105             result = do_compress_ram_page(param->file, &param->stream,
106                                           block, offset, param->originbuf);
107 
108             qemu_mutex_lock(&comp_done_lock);
109             param->done = true;
110             param->result = result;
111             qemu_cond_signal(&comp_done_cond);
112             qemu_mutex_unlock(&comp_done_lock);
113 
114             qemu_mutex_lock(&param->mutex);
115         } else {
116             qemu_cond_wait(&param->cond, &param->mutex);
117         }
118     }
119     qemu_mutex_unlock(&param->mutex);
120 
121     return NULL;
122 }
123 
124 void compress_threads_save_cleanup(void)
125 {
126     int i, thread_count;
127 
128     if (!migrate_compress() || !comp_param) {
129         return;
130     }
131 
132     thread_count = migrate_compress_threads();
133     for (i = 0; i < thread_count; i++) {
134         /*
135          * we use it as a indicator which shows if the thread is
136          * properly init'd or not
137          */
138         if (!comp_param[i].file) {
139             break;
140         }
141 
142         qemu_mutex_lock(&comp_param[i].mutex);
143         comp_param[i].quit = true;
144         qemu_cond_signal(&comp_param[i].cond);
145         qemu_mutex_unlock(&comp_param[i].mutex);
146 
147         qemu_thread_join(compress_threads + i);
148         qemu_mutex_destroy(&comp_param[i].mutex);
149         qemu_cond_destroy(&comp_param[i].cond);
150         deflateEnd(&comp_param[i].stream);
151         g_free(comp_param[i].originbuf);
152         qemu_fclose(comp_param[i].file);
153         comp_param[i].file = NULL;
154     }
155     qemu_mutex_destroy(&comp_done_lock);
156     qemu_cond_destroy(&comp_done_cond);
157     g_free(compress_threads);
158     g_free(comp_param);
159     compress_threads = NULL;
160     comp_param = NULL;
161 }
162 
163 int compress_threads_save_setup(void)
164 {
165     int i, thread_count;
166 
167     if (!migrate_compress()) {
168         return 0;
169     }
170     thread_count = migrate_compress_threads();
171     compress_threads = g_new0(QemuThread, thread_count);
172     comp_param = g_new0(CompressParam, thread_count);
173     qemu_cond_init(&comp_done_cond);
174     qemu_mutex_init(&comp_done_lock);
175     for (i = 0; i < thread_count; i++) {
176         comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
177         if (!comp_param[i].originbuf) {
178             goto exit;
179         }
180 
181         if (deflateInit(&comp_param[i].stream,
182                         migrate_compress_level()) != Z_OK) {
183             g_free(comp_param[i].originbuf);
184             goto exit;
185         }
186 
187         /* comp_param[i].file is just used as a dummy buffer to save data,
188          * set its ops to empty.
189          */
190         comp_param[i].file = qemu_file_new_output(
191             QIO_CHANNEL(qio_channel_null_new()));
192         comp_param[i].done = true;
193         comp_param[i].quit = false;
194         qemu_mutex_init(&comp_param[i].mutex);
195         qemu_cond_init(&comp_param[i].cond);
196         qemu_thread_create(compress_threads + i, "compress",
197                            do_data_compress, comp_param + i,
198                            QEMU_THREAD_JOINABLE);
199     }
200     return 0;
201 
202 exit:
203     compress_threads_save_cleanup();
204     return -1;
205 }
206 
207 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
208                                            RAMBlock *block, ram_addr_t offset,
209                                            uint8_t *source_buf)
210 {
211     uint8_t *p = block->host + offset;
212     size_t page_size = qemu_target_page_size();
213     int ret;
214 
215     assert(qemu_file_buffer_empty(f));
216 
217     if (buffer_is_zero(p, page_size)) {
218         return RES_ZEROPAGE;
219     }
220 
221     /*
222      * copy it to a internal buffer to avoid it being modified by VM
223      * so that we can catch up the error during compression and
224      * decompression
225      */
226     memcpy(source_buf, p, page_size);
227     ret = qemu_put_compression_data(f, stream, source_buf, page_size);
228     if (ret < 0) {
229         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
230         error_report("compressed data failed!");
231         qemu_fflush(f);
232         return RES_NONE;
233     }
234     return RES_COMPRESS;
235 }
236 
237 static inline void compress_reset_result(CompressParam *param)
238 {
239     param->result = RES_NONE;
240     param->block = NULL;
241     param->offset = 0;
242 }
243 
244 void compress_flush_data(void)
245 {
246     int thread_count = migrate_compress_threads();
247 
248     if (!migrate_compress()) {
249         return;
250     }
251 
252     qemu_mutex_lock(&comp_done_lock);
253     for (int i = 0; i < thread_count; i++) {
254         while (!comp_param[i].done) {
255             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
256         }
257     }
258     qemu_mutex_unlock(&comp_done_lock);
259 
260     for (int i = 0; i < thread_count; i++) {
261         qemu_mutex_lock(&comp_param[i].mutex);
262         if (!comp_param[i].quit) {
263             CompressParam *param = &comp_param[i];
264             compress_send_queued_data(param);
265             assert(qemu_file_buffer_empty(param->file));
266             compress_reset_result(param);
267         }
268         qemu_mutex_unlock(&comp_param[i].mutex);
269     }
270 }
271 
272 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
273                                        ram_addr_t offset)
274 {
275     param->block = block;
276     param->offset = offset;
277     param->trigger = true;
278 }
279 
280 /*
281  * Return true when it compress a page
282  */
283 bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
284                                      int (send_queued_data(CompressParam *)))
285 {
286     int thread_count;
287     bool wait = migrate_compress_wait_thread();
288 
289     thread_count = migrate_compress_threads();
290     qemu_mutex_lock(&comp_done_lock);
291 
292     while (true) {
293         for (int i = 0; i < thread_count; i++) {
294             if (comp_param[i].done) {
295                 CompressParam *param = &comp_param[i];
296                 qemu_mutex_lock(&param->mutex);
297                 param->done = false;
298                 send_queued_data(param);
299                 assert(qemu_file_buffer_empty(param->file));
300                 compress_reset_result(param);
301                 set_compress_params(param, block, offset);
302 
303                 qemu_cond_signal(&param->cond);
304                 qemu_mutex_unlock(&param->mutex);
305                 qemu_mutex_unlock(&comp_done_lock);
306                 return true;
307             }
308         }
309         if (!wait) {
310             qemu_mutex_unlock(&comp_done_lock);
311             compression_counters.busy++;
312             return false;
313         }
314         /*
315          * wait for a free thread if the user specifies
316          * 'compress-wait-thread', otherwise we will post the page out
317          * in the main thread as normal page.
318          */
319         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
320     }
321 }
322 
323 /* return the size after decompression, or negative value on error */
324 static int
325 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
326                      const uint8_t *source, size_t source_len)
327 {
328     int err;
329 
330     err = inflateReset(stream);
331     if (err != Z_OK) {
332         return -1;
333     }
334 
335     stream->avail_in = source_len;
336     stream->next_in = (uint8_t *)source;
337     stream->avail_out = dest_len;
338     stream->next_out = dest;
339 
340     err = inflate(stream, Z_NO_FLUSH);
341     if (err != Z_STREAM_END) {
342         return -1;
343     }
344 
345     return stream->total_out;
346 }
347 
348 static void *do_data_decompress(void *opaque)
349 {
350     DecompressParam *param = opaque;
351     unsigned long pagesize;
352     uint8_t *des;
353     int len, ret;
354 
355     qemu_mutex_lock(&param->mutex);
356     while (!param->quit) {
357         if (param->des) {
358             des = param->des;
359             len = param->len;
360             param->des = 0;
361             qemu_mutex_unlock(&param->mutex);
362 
363             pagesize = qemu_target_page_size();
364 
365             ret = qemu_uncompress_data(&param->stream, des, pagesize,
366                                        param->compbuf, len);
367             if (ret < 0 && migrate_get_current()->decompress_error_check) {
368                 error_report("decompress data failed");
369                 qemu_file_set_error(decomp_file, ret);
370             }
371 
372             qemu_mutex_lock(&decomp_done_lock);
373             param->done = true;
374             qemu_cond_signal(&decomp_done_cond);
375             qemu_mutex_unlock(&decomp_done_lock);
376 
377             qemu_mutex_lock(&param->mutex);
378         } else {
379             qemu_cond_wait(&param->cond, &param->mutex);
380         }
381     }
382     qemu_mutex_unlock(&param->mutex);
383 
384     return NULL;
385 }
386 
387 int wait_for_decompress_done(void)
388 {
389     if (!migrate_compress()) {
390         return 0;
391     }
392 
393     int thread_count = migrate_decompress_threads();
394     qemu_mutex_lock(&decomp_done_lock);
395     for (int i = 0; i < thread_count; i++) {
396         while (!decomp_param[i].done) {
397             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
398         }
399     }
400     qemu_mutex_unlock(&decomp_done_lock);
401     return qemu_file_get_error(decomp_file);
402 }
403 
404 void compress_threads_load_cleanup(void)
405 {
406     int i, thread_count;
407 
408     if (!migrate_compress()) {
409         return;
410     }
411     thread_count = migrate_decompress_threads();
412     for (i = 0; i < thread_count; i++) {
413         /*
414          * we use it as a indicator which shows if the thread is
415          * properly init'd or not
416          */
417         if (!decomp_param[i].compbuf) {
418             break;
419         }
420 
421         qemu_mutex_lock(&decomp_param[i].mutex);
422         decomp_param[i].quit = true;
423         qemu_cond_signal(&decomp_param[i].cond);
424         qemu_mutex_unlock(&decomp_param[i].mutex);
425     }
426     for (i = 0; i < thread_count; i++) {
427         if (!decomp_param[i].compbuf) {
428             break;
429         }
430 
431         qemu_thread_join(decompress_threads + i);
432         qemu_mutex_destroy(&decomp_param[i].mutex);
433         qemu_cond_destroy(&decomp_param[i].cond);
434         inflateEnd(&decomp_param[i].stream);
435         g_free(decomp_param[i].compbuf);
436         decomp_param[i].compbuf = NULL;
437     }
438     g_free(decompress_threads);
439     g_free(decomp_param);
440     decompress_threads = NULL;
441     decomp_param = NULL;
442     decomp_file = NULL;
443 }
444 
445 int compress_threads_load_setup(QEMUFile *f)
446 {
447     int i, thread_count;
448 
449     if (!migrate_compress()) {
450         return 0;
451     }
452 
453     /*
454      * set compression_counters memory to zero for a new migration
455      */
456     memset(&compression_counters, 0, sizeof(compression_counters));
457 
458     thread_count = migrate_decompress_threads();
459     decompress_threads = g_new0(QemuThread, thread_count);
460     decomp_param = g_new0(DecompressParam, thread_count);
461     qemu_mutex_init(&decomp_done_lock);
462     qemu_cond_init(&decomp_done_cond);
463     decomp_file = f;
464     for (i = 0; i < thread_count; i++) {
465         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
466             goto exit;
467         }
468 
469         size_t compbuf_size = compressBound(qemu_target_page_size());
470         decomp_param[i].compbuf = g_malloc0(compbuf_size);
471         qemu_mutex_init(&decomp_param[i].mutex);
472         qemu_cond_init(&decomp_param[i].cond);
473         decomp_param[i].done = true;
474         decomp_param[i].quit = false;
475         qemu_thread_create(decompress_threads + i, "decompress",
476                            do_data_decompress, decomp_param + i,
477                            QEMU_THREAD_JOINABLE);
478     }
479     return 0;
480 exit:
481     compress_threads_load_cleanup();
482     return -1;
483 }
484 
485 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
486 {
487     int thread_count = migrate_decompress_threads();
488     QEMU_LOCK_GUARD(&decomp_done_lock);
489     while (true) {
490         for (int i = 0; i < thread_count; i++) {
491             if (decomp_param[i].done) {
492                 decomp_param[i].done = false;
493                 qemu_mutex_lock(&decomp_param[i].mutex);
494                 qemu_get_buffer(f, decomp_param[i].compbuf, len);
495                 decomp_param[i].des = host;
496                 decomp_param[i].len = len;
497                 qemu_cond_signal(&decomp_param[i].cond);
498                 qemu_mutex_unlock(&decomp_param[i].mutex);
499                 return;
500             }
501         }
502         qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
503     }
504 }
505 
506 void populate_compress(MigrationInfo *info)
507 {
508     if (!migrate_compress()) {
509         return;
510     }
511     info->compression = g_malloc0(sizeof(*info->compression));
512     info->compression->pages = compression_counters.pages;
513     info->compression->busy = compression_counters.busy;
514     info->compression->busy_rate = compression_counters.busy_rate;
515     info->compression->compressed_size = compression_counters.compressed_size;
516     info->compression->compression_rate = compression_counters.compression_rate;
517 }
518 
519 uint64_t compress_ram_pages(void)
520 {
521     return compression_counters.pages;
522 }
523 
524 void update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
525 {
526     ram_transferred_add(bytes_xmit);
527 
528     if (param->result == RES_ZEROPAGE) {
529         stat64_add(&mig_stats.zero_pages, 1);
530         return;
531     }
532 
533     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
534     compression_counters.compressed_size += bytes_xmit - 8;
535     compression_counters.pages++;
536 }
537 
538 void compress_update_rates(uint64_t page_count)
539 {
540     if (!migrate_compress()) {
541         return;
542     }
543     compression_counters.busy_rate = (double)(compression_counters.busy -
544             compression_counters.compress_thread_busy_prev) / page_count;
545     compression_counters.compress_thread_busy_prev =
546             compression_counters.busy;
547 
548     double compressed_size = compression_counters.compressed_size -
549         compression_counters.compressed_size_prev;
550     if (compressed_size) {
551         double uncompressed_size = (compression_counters.pages -
552                                     compression_counters.compress_pages_prev) *
553             qemu_target_page_size();
554 
555         /* Compression-Ratio = Uncompressed-size / Compressed-size */
556         compression_counters.compression_rate =
557             uncompressed_size / compressed_size;
558 
559         compression_counters.compress_pages_prev =
560             compression_counters.pages;
561         compression_counters.compressed_size_prev =
562             compression_counters.compressed_size;
563     }
564 }
565