1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2011-2015 Red Hat Inc 6 * 7 * Authors: 8 * Juan Quintela <quintela@redhat.com> 9 * 10 * Permission is hereby granted, free of charge, to any person obtaining a copy 11 * of this software and associated documentation files (the "Software"), to deal 12 * in the Software without restriction, including without limitation the rights 13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 14 * copies of the Software, and to permit persons to whom the Software is 15 * furnished to do so, subject to the following conditions: 16 * 17 * The above copyright notice and this permission notice shall be included in 18 * all copies or substantial portions of the Software. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 */ 28 29 #include "qemu/osdep.h" 30 #include "qemu/cutils.h" 31 32 #include "ram-compress.h" 33 34 #include "qemu/error-report.h" 35 #include "qemu/stats64.h" 36 #include "migration.h" 37 #include "options.h" 38 #include "io/channel-null.h" 39 #include "exec/target_page.h" 40 #include "exec/ramblock.h" 41 #include "ram.h" 42 #include "migration-stats.h" 43 44 static struct { 45 int64_t pages; 46 int64_t busy; 47 double busy_rate; 48 int64_t compressed_size; 49 double compression_rate; 50 /* compression statistics since the beginning of the period */ 51 /* amount of count that no free thread to compress data */ 52 uint64_t compress_thread_busy_prev; 53 /* amount bytes after compression */ 54 uint64_t compressed_size_prev; 55 /* amount of compressed pages */ 56 uint64_t compress_pages_prev; 57 } compression_counters; 58 59 static CompressParam *comp_param; 60 static QemuThread *compress_threads; 61 /* comp_done_cond is used to wake up the migration thread when 62 * one of the compression threads has finished the compression. 63 * comp_done_lock is used to co-work with comp_done_cond. 64 */ 65 static QemuMutex comp_done_lock; 66 static QemuCond comp_done_cond; 67 68 struct DecompressParam { 69 bool done; 70 bool quit; 71 QemuMutex mutex; 72 QemuCond cond; 73 void *des; 74 uint8_t *compbuf; 75 int len; 76 z_stream stream; 77 }; 78 typedef struct DecompressParam DecompressParam; 79 80 static QEMUFile *decomp_file; 81 static DecompressParam *decomp_param; 82 static QemuThread *decompress_threads; 83 static QemuMutex decomp_done_lock; 84 static QemuCond decomp_done_cond; 85 86 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, 87 RAMBlock *block, ram_addr_t offset, 88 uint8_t *source_buf); 89 90 static void *do_data_compress(void *opaque) 91 { 92 CompressParam *param = opaque; 93 RAMBlock *block; 94 ram_addr_t offset; 95 CompressResult result; 96 97 qemu_mutex_lock(¶m->mutex); 98 while (!param->quit) { 99 if (param->trigger) { 100 block = param->block; 101 offset = param->offset; 102 param->trigger = false; 103 qemu_mutex_unlock(¶m->mutex); 104 105 result = do_compress_ram_page(param->file, ¶m->stream, 106 block, offset, param->originbuf); 107 108 qemu_mutex_lock(&comp_done_lock); 109 param->done = true; 110 param->result = result; 111 qemu_cond_signal(&comp_done_cond); 112 qemu_mutex_unlock(&comp_done_lock); 113 114 qemu_mutex_lock(¶m->mutex); 115 } else { 116 qemu_cond_wait(¶m->cond, ¶m->mutex); 117 } 118 } 119 qemu_mutex_unlock(¶m->mutex); 120 121 return NULL; 122 } 123 124 void compress_threads_save_cleanup(void) 125 { 126 int i, thread_count; 127 128 if (!migrate_compress() || !comp_param) { 129 return; 130 } 131 132 thread_count = migrate_compress_threads(); 133 for (i = 0; i < thread_count; i++) { 134 /* 135 * we use it as a indicator which shows if the thread is 136 * properly init'd or not 137 */ 138 if (!comp_param[i].file) { 139 break; 140 } 141 142 qemu_mutex_lock(&comp_param[i].mutex); 143 comp_param[i].quit = true; 144 qemu_cond_signal(&comp_param[i].cond); 145 qemu_mutex_unlock(&comp_param[i].mutex); 146 147 qemu_thread_join(compress_threads + i); 148 qemu_mutex_destroy(&comp_param[i].mutex); 149 qemu_cond_destroy(&comp_param[i].cond); 150 deflateEnd(&comp_param[i].stream); 151 g_free(comp_param[i].originbuf); 152 qemu_fclose(comp_param[i].file); 153 comp_param[i].file = NULL; 154 } 155 qemu_mutex_destroy(&comp_done_lock); 156 qemu_cond_destroy(&comp_done_cond); 157 g_free(compress_threads); 158 g_free(comp_param); 159 compress_threads = NULL; 160 comp_param = NULL; 161 } 162 163 int compress_threads_save_setup(void) 164 { 165 int i, thread_count; 166 167 if (!migrate_compress()) { 168 return 0; 169 } 170 thread_count = migrate_compress_threads(); 171 compress_threads = g_new0(QemuThread, thread_count); 172 comp_param = g_new0(CompressParam, thread_count); 173 qemu_cond_init(&comp_done_cond); 174 qemu_mutex_init(&comp_done_lock); 175 for (i = 0; i < thread_count; i++) { 176 comp_param[i].originbuf = g_try_malloc(qemu_target_page_size()); 177 if (!comp_param[i].originbuf) { 178 goto exit; 179 } 180 181 if (deflateInit(&comp_param[i].stream, 182 migrate_compress_level()) != Z_OK) { 183 g_free(comp_param[i].originbuf); 184 goto exit; 185 } 186 187 /* comp_param[i].file is just used as a dummy buffer to save data, 188 * set its ops to empty. 189 */ 190 comp_param[i].file = qemu_file_new_output( 191 QIO_CHANNEL(qio_channel_null_new())); 192 comp_param[i].done = true; 193 comp_param[i].quit = false; 194 qemu_mutex_init(&comp_param[i].mutex); 195 qemu_cond_init(&comp_param[i].cond); 196 qemu_thread_create(compress_threads + i, "compress", 197 do_data_compress, comp_param + i, 198 QEMU_THREAD_JOINABLE); 199 } 200 return 0; 201 202 exit: 203 compress_threads_save_cleanup(); 204 return -1; 205 } 206 207 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, 208 RAMBlock *block, ram_addr_t offset, 209 uint8_t *source_buf) 210 { 211 uint8_t *p = block->host + offset; 212 size_t page_size = qemu_target_page_size(); 213 int ret; 214 215 assert(qemu_file_buffer_empty(f)); 216 217 if (buffer_is_zero(p, page_size)) { 218 return RES_ZEROPAGE; 219 } 220 221 /* 222 * copy it to a internal buffer to avoid it being modified by VM 223 * so that we can catch up the error during compression and 224 * decompression 225 */ 226 memcpy(source_buf, p, page_size); 227 ret = qemu_put_compression_data(f, stream, source_buf, page_size); 228 if (ret < 0) { 229 qemu_file_set_error(migrate_get_current()->to_dst_file, ret); 230 error_report("compressed data failed!"); 231 qemu_fflush(f); 232 return RES_NONE; 233 } 234 return RES_COMPRESS; 235 } 236 237 static inline void compress_reset_result(CompressParam *param) 238 { 239 param->result = RES_NONE; 240 param->block = NULL; 241 param->offset = 0; 242 } 243 244 void compress_flush_data(void) 245 { 246 int thread_count = migrate_compress_threads(); 247 248 if (!migrate_compress()) { 249 return; 250 } 251 252 qemu_mutex_lock(&comp_done_lock); 253 for (int i = 0; i < thread_count; i++) { 254 while (!comp_param[i].done) { 255 qemu_cond_wait(&comp_done_cond, &comp_done_lock); 256 } 257 } 258 qemu_mutex_unlock(&comp_done_lock); 259 260 for (int i = 0; i < thread_count; i++) { 261 qemu_mutex_lock(&comp_param[i].mutex); 262 if (!comp_param[i].quit) { 263 CompressParam *param = &comp_param[i]; 264 compress_send_queued_data(param); 265 assert(qemu_file_buffer_empty(param->file)); 266 compress_reset_result(param); 267 } 268 qemu_mutex_unlock(&comp_param[i].mutex); 269 } 270 } 271 272 static inline void set_compress_params(CompressParam *param, RAMBlock *block, 273 ram_addr_t offset) 274 { 275 param->block = block; 276 param->offset = offset; 277 param->trigger = true; 278 } 279 280 /* 281 * Return true when it compress a page 282 */ 283 bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, 284 int (send_queued_data(CompressParam *))) 285 { 286 int thread_count; 287 bool wait = migrate_compress_wait_thread(); 288 289 thread_count = migrate_compress_threads(); 290 qemu_mutex_lock(&comp_done_lock); 291 292 while (true) { 293 for (int i = 0; i < thread_count; i++) { 294 if (comp_param[i].done) { 295 CompressParam *param = &comp_param[i]; 296 qemu_mutex_lock(¶m->mutex); 297 param->done = false; 298 send_queued_data(param); 299 assert(qemu_file_buffer_empty(param->file)); 300 compress_reset_result(param); 301 set_compress_params(param, block, offset); 302 303 qemu_cond_signal(¶m->cond); 304 qemu_mutex_unlock(¶m->mutex); 305 qemu_mutex_unlock(&comp_done_lock); 306 return true; 307 } 308 } 309 if (!wait) { 310 qemu_mutex_unlock(&comp_done_lock); 311 compression_counters.busy++; 312 return false; 313 } 314 /* 315 * wait for a free thread if the user specifies 316 * 'compress-wait-thread', otherwise we will post the page out 317 * in the main thread as normal page. 318 */ 319 qemu_cond_wait(&comp_done_cond, &comp_done_lock); 320 } 321 } 322 323 /* return the size after decompression, or negative value on error */ 324 static int 325 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, 326 const uint8_t *source, size_t source_len) 327 { 328 int err; 329 330 err = inflateReset(stream); 331 if (err != Z_OK) { 332 return -1; 333 } 334 335 stream->avail_in = source_len; 336 stream->next_in = (uint8_t *)source; 337 stream->avail_out = dest_len; 338 stream->next_out = dest; 339 340 err = inflate(stream, Z_NO_FLUSH); 341 if (err != Z_STREAM_END) { 342 return -1; 343 } 344 345 return stream->total_out; 346 } 347 348 static void *do_data_decompress(void *opaque) 349 { 350 DecompressParam *param = opaque; 351 unsigned long pagesize; 352 uint8_t *des; 353 int len, ret; 354 355 qemu_mutex_lock(¶m->mutex); 356 while (!param->quit) { 357 if (param->des) { 358 des = param->des; 359 len = param->len; 360 param->des = 0; 361 qemu_mutex_unlock(¶m->mutex); 362 363 pagesize = qemu_target_page_size(); 364 365 ret = qemu_uncompress_data(¶m->stream, des, pagesize, 366 param->compbuf, len); 367 if (ret < 0 && migrate_get_current()->decompress_error_check) { 368 error_report("decompress data failed"); 369 qemu_file_set_error(decomp_file, ret); 370 } 371 372 qemu_mutex_lock(&decomp_done_lock); 373 param->done = true; 374 qemu_cond_signal(&decomp_done_cond); 375 qemu_mutex_unlock(&decomp_done_lock); 376 377 qemu_mutex_lock(¶m->mutex); 378 } else { 379 qemu_cond_wait(¶m->cond, ¶m->mutex); 380 } 381 } 382 qemu_mutex_unlock(¶m->mutex); 383 384 return NULL; 385 } 386 387 int wait_for_decompress_done(void) 388 { 389 if (!migrate_compress()) { 390 return 0; 391 } 392 393 int thread_count = migrate_decompress_threads(); 394 qemu_mutex_lock(&decomp_done_lock); 395 for (int i = 0; i < thread_count; i++) { 396 while (!decomp_param[i].done) { 397 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); 398 } 399 } 400 qemu_mutex_unlock(&decomp_done_lock); 401 return qemu_file_get_error(decomp_file); 402 } 403 404 void compress_threads_load_cleanup(void) 405 { 406 int i, thread_count; 407 408 if (!migrate_compress()) { 409 return; 410 } 411 thread_count = migrate_decompress_threads(); 412 for (i = 0; i < thread_count; i++) { 413 /* 414 * we use it as a indicator which shows if the thread is 415 * properly init'd or not 416 */ 417 if (!decomp_param[i].compbuf) { 418 break; 419 } 420 421 qemu_mutex_lock(&decomp_param[i].mutex); 422 decomp_param[i].quit = true; 423 qemu_cond_signal(&decomp_param[i].cond); 424 qemu_mutex_unlock(&decomp_param[i].mutex); 425 } 426 for (i = 0; i < thread_count; i++) { 427 if (!decomp_param[i].compbuf) { 428 break; 429 } 430 431 qemu_thread_join(decompress_threads + i); 432 qemu_mutex_destroy(&decomp_param[i].mutex); 433 qemu_cond_destroy(&decomp_param[i].cond); 434 inflateEnd(&decomp_param[i].stream); 435 g_free(decomp_param[i].compbuf); 436 decomp_param[i].compbuf = NULL; 437 } 438 g_free(decompress_threads); 439 g_free(decomp_param); 440 decompress_threads = NULL; 441 decomp_param = NULL; 442 decomp_file = NULL; 443 } 444 445 int compress_threads_load_setup(QEMUFile *f) 446 { 447 int i, thread_count; 448 449 if (!migrate_compress()) { 450 return 0; 451 } 452 453 /* 454 * set compression_counters memory to zero for a new migration 455 */ 456 memset(&compression_counters, 0, sizeof(compression_counters)); 457 458 thread_count = migrate_decompress_threads(); 459 decompress_threads = g_new0(QemuThread, thread_count); 460 decomp_param = g_new0(DecompressParam, thread_count); 461 qemu_mutex_init(&decomp_done_lock); 462 qemu_cond_init(&decomp_done_cond); 463 decomp_file = f; 464 for (i = 0; i < thread_count; i++) { 465 if (inflateInit(&decomp_param[i].stream) != Z_OK) { 466 goto exit; 467 } 468 469 size_t compbuf_size = compressBound(qemu_target_page_size()); 470 decomp_param[i].compbuf = g_malloc0(compbuf_size); 471 qemu_mutex_init(&decomp_param[i].mutex); 472 qemu_cond_init(&decomp_param[i].cond); 473 decomp_param[i].done = true; 474 decomp_param[i].quit = false; 475 qemu_thread_create(decompress_threads + i, "decompress", 476 do_data_decompress, decomp_param + i, 477 QEMU_THREAD_JOINABLE); 478 } 479 return 0; 480 exit: 481 compress_threads_load_cleanup(); 482 return -1; 483 } 484 485 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len) 486 { 487 int thread_count = migrate_decompress_threads(); 488 QEMU_LOCK_GUARD(&decomp_done_lock); 489 while (true) { 490 for (int i = 0; i < thread_count; i++) { 491 if (decomp_param[i].done) { 492 decomp_param[i].done = false; 493 qemu_mutex_lock(&decomp_param[i].mutex); 494 qemu_get_buffer(f, decomp_param[i].compbuf, len); 495 decomp_param[i].des = host; 496 decomp_param[i].len = len; 497 qemu_cond_signal(&decomp_param[i].cond); 498 qemu_mutex_unlock(&decomp_param[i].mutex); 499 return; 500 } 501 } 502 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); 503 } 504 } 505 506 void populate_compress(MigrationInfo *info) 507 { 508 if (!migrate_compress()) { 509 return; 510 } 511 info->compression = g_malloc0(sizeof(*info->compression)); 512 info->compression->pages = compression_counters.pages; 513 info->compression->busy = compression_counters.busy; 514 info->compression->busy_rate = compression_counters.busy_rate; 515 info->compression->compressed_size = compression_counters.compressed_size; 516 info->compression->compression_rate = compression_counters.compression_rate; 517 } 518 519 uint64_t compress_ram_pages(void) 520 { 521 return compression_counters.pages; 522 } 523 524 void update_compress_thread_counts(const CompressParam *param, int bytes_xmit) 525 { 526 ram_transferred_add(bytes_xmit); 527 528 if (param->result == RES_ZEROPAGE) { 529 stat64_add(&mig_stats.zero_pages, 1); 530 return; 531 } 532 533 /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */ 534 compression_counters.compressed_size += bytes_xmit - 8; 535 compression_counters.pages++; 536 } 537 538 void compress_update_rates(uint64_t page_count) 539 { 540 if (!migrate_compress()) { 541 return; 542 } 543 compression_counters.busy_rate = (double)(compression_counters.busy - 544 compression_counters.compress_thread_busy_prev) / page_count; 545 compression_counters.compress_thread_busy_prev = 546 compression_counters.busy; 547 548 double compressed_size = compression_counters.compressed_size - 549 compression_counters.compressed_size_prev; 550 if (compressed_size) { 551 double uncompressed_size = (compression_counters.pages - 552 compression_counters.compress_pages_prev) * 553 qemu_target_page_size(); 554 555 /* Compression-Ratio = Uncompressed-size / Compressed-size */ 556 compression_counters.compression_rate = 557 uncompressed_size / compressed_size; 558 559 compression_counters.compress_pages_prev = 560 compression_counters.pages; 561 compression_counters.compressed_size_prev = 562 compression_counters.compressed_size; 563 } 564 } 565