1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2011-2015 Red Hat Inc 6 * 7 * Authors: 8 * Juan Quintela <quintela@redhat.com> 9 * 10 * Permission is hereby granted, free of charge, to any person obtaining a copy 11 * of this software and associated documentation files (the "Software"), to deal 12 * in the Software without restriction, including without limitation the rights 13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 14 * copies of the Software, and to permit persons to whom the Software is 15 * furnished to do so, subject to the following conditions: 16 * 17 * The above copyright notice and this permission notice shall be included in 18 * all copies or substantial portions of the Software. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 */ 28 29 #include "qemu/osdep.h" 30 #include "qemu/cutils.h" 31 32 #include "ram-compress.h" 33 34 #include "qemu/error-report.h" 35 #include "migration.h" 36 #include "options.h" 37 #include "io/channel-null.h" 38 #include "exec/target_page.h" 39 #include "exec/ramblock.h" 40 41 CompressionStats compression_counters; 42 43 static CompressParam *comp_param; 44 static QemuThread *compress_threads; 45 /* comp_done_cond is used to wake up the migration thread when 46 * one of the compression threads has finished the compression. 47 * comp_done_lock is used to co-work with comp_done_cond. 48 */ 49 static QemuMutex comp_done_lock; 50 static QemuCond comp_done_cond; 51 52 struct DecompressParam { 53 bool done; 54 bool quit; 55 QemuMutex mutex; 56 QemuCond cond; 57 void *des; 58 uint8_t *compbuf; 59 int len; 60 z_stream stream; 61 }; 62 typedef struct DecompressParam DecompressParam; 63 64 static QEMUFile *decomp_file; 65 static DecompressParam *decomp_param; 66 static QemuThread *decompress_threads; 67 static QemuMutex decomp_done_lock; 68 static QemuCond decomp_done_cond; 69 70 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, 71 RAMBlock *block, ram_addr_t offset, 72 uint8_t *source_buf); 73 74 static void *do_data_compress(void *opaque) 75 { 76 CompressParam *param = opaque; 77 RAMBlock *block; 78 ram_addr_t offset; 79 CompressResult result; 80 81 qemu_mutex_lock(¶m->mutex); 82 while (!param->quit) { 83 if (param->trigger) { 84 block = param->block; 85 offset = param->offset; 86 param->trigger = false; 87 qemu_mutex_unlock(¶m->mutex); 88 89 result = do_compress_ram_page(param->file, ¶m->stream, 90 block, offset, param->originbuf); 91 92 qemu_mutex_lock(&comp_done_lock); 93 param->done = true; 94 param->result = result; 95 qemu_cond_signal(&comp_done_cond); 96 qemu_mutex_unlock(&comp_done_lock); 97 98 qemu_mutex_lock(¶m->mutex); 99 } else { 100 qemu_cond_wait(¶m->cond, ¶m->mutex); 101 } 102 } 103 qemu_mutex_unlock(¶m->mutex); 104 105 return NULL; 106 } 107 108 void compress_threads_save_cleanup(void) 109 { 110 int i, thread_count; 111 112 if (!migrate_compress() || !comp_param) { 113 return; 114 } 115 116 thread_count = migrate_compress_threads(); 117 for (i = 0; i < thread_count; i++) { 118 /* 119 * we use it as a indicator which shows if the thread is 120 * properly init'd or not 121 */ 122 if (!comp_param[i].file) { 123 break; 124 } 125 126 qemu_mutex_lock(&comp_param[i].mutex); 127 comp_param[i].quit = true; 128 qemu_cond_signal(&comp_param[i].cond); 129 qemu_mutex_unlock(&comp_param[i].mutex); 130 131 qemu_thread_join(compress_threads + i); 132 qemu_mutex_destroy(&comp_param[i].mutex); 133 qemu_cond_destroy(&comp_param[i].cond); 134 deflateEnd(&comp_param[i].stream); 135 g_free(comp_param[i].originbuf); 136 qemu_fclose(comp_param[i].file); 137 comp_param[i].file = NULL; 138 } 139 qemu_mutex_destroy(&comp_done_lock); 140 qemu_cond_destroy(&comp_done_cond); 141 g_free(compress_threads); 142 g_free(comp_param); 143 compress_threads = NULL; 144 comp_param = NULL; 145 } 146 147 int compress_threads_save_setup(void) 148 { 149 int i, thread_count; 150 151 if (!migrate_compress()) { 152 return 0; 153 } 154 thread_count = migrate_compress_threads(); 155 compress_threads = g_new0(QemuThread, thread_count); 156 comp_param = g_new0(CompressParam, thread_count); 157 qemu_cond_init(&comp_done_cond); 158 qemu_mutex_init(&comp_done_lock); 159 for (i = 0; i < thread_count; i++) { 160 comp_param[i].originbuf = g_try_malloc(qemu_target_page_size()); 161 if (!comp_param[i].originbuf) { 162 goto exit; 163 } 164 165 if (deflateInit(&comp_param[i].stream, 166 migrate_compress_level()) != Z_OK) { 167 g_free(comp_param[i].originbuf); 168 goto exit; 169 } 170 171 /* comp_param[i].file is just used as a dummy buffer to save data, 172 * set its ops to empty. 173 */ 174 comp_param[i].file = qemu_file_new_output( 175 QIO_CHANNEL(qio_channel_null_new())); 176 comp_param[i].done = true; 177 comp_param[i].quit = false; 178 qemu_mutex_init(&comp_param[i].mutex); 179 qemu_cond_init(&comp_param[i].cond); 180 qemu_thread_create(compress_threads + i, "compress", 181 do_data_compress, comp_param + i, 182 QEMU_THREAD_JOINABLE); 183 } 184 return 0; 185 186 exit: 187 compress_threads_save_cleanup(); 188 return -1; 189 } 190 191 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, 192 RAMBlock *block, ram_addr_t offset, 193 uint8_t *source_buf) 194 { 195 uint8_t *p = block->host + offset; 196 size_t page_size = qemu_target_page_size(); 197 int ret; 198 199 assert(qemu_file_buffer_empty(f)); 200 201 if (buffer_is_zero(p, page_size)) { 202 return RES_ZEROPAGE; 203 } 204 205 /* 206 * copy it to a internal buffer to avoid it being modified by VM 207 * so that we can catch up the error during compression and 208 * decompression 209 */ 210 memcpy(source_buf, p, page_size); 211 ret = qemu_put_compression_data(f, stream, source_buf, page_size); 212 if (ret < 0) { 213 qemu_file_set_error(migrate_get_current()->to_dst_file, ret); 214 error_report("compressed data failed!"); 215 qemu_fflush(f); 216 return RES_NONE; 217 } 218 return RES_COMPRESS; 219 } 220 221 static inline void compress_reset_result(CompressParam *param) 222 { 223 param->result = RES_NONE; 224 param->block = NULL; 225 param->offset = 0; 226 } 227 228 void flush_compressed_data(int (send_queued_data(CompressParam *))) 229 { 230 int idx, thread_count; 231 232 thread_count = migrate_compress_threads(); 233 234 qemu_mutex_lock(&comp_done_lock); 235 for (idx = 0; idx < thread_count; idx++) { 236 while (!comp_param[idx].done) { 237 qemu_cond_wait(&comp_done_cond, &comp_done_lock); 238 } 239 } 240 qemu_mutex_unlock(&comp_done_lock); 241 242 for (idx = 0; idx < thread_count; idx++) { 243 qemu_mutex_lock(&comp_param[idx].mutex); 244 if (!comp_param[idx].quit) { 245 CompressParam *param = &comp_param[idx]; 246 send_queued_data(param); 247 assert(qemu_file_buffer_empty(param->file)); 248 compress_reset_result(param); 249 } 250 qemu_mutex_unlock(&comp_param[idx].mutex); 251 } 252 } 253 254 static inline void set_compress_params(CompressParam *param, RAMBlock *block, 255 ram_addr_t offset) 256 { 257 param->block = block; 258 param->offset = offset; 259 param->trigger = true; 260 } 261 262 int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, 263 int (send_queued_data(CompressParam *))) 264 { 265 int idx, thread_count, pages = -1; 266 bool wait = migrate_compress_wait_thread(); 267 268 thread_count = migrate_compress_threads(); 269 qemu_mutex_lock(&comp_done_lock); 270 retry: 271 for (idx = 0; idx < thread_count; idx++) { 272 if (comp_param[idx].done) { 273 CompressParam *param = &comp_param[idx]; 274 qemu_mutex_lock(¶m->mutex); 275 param->done = false; 276 send_queued_data(param); 277 assert(qemu_file_buffer_empty(param->file)); 278 compress_reset_result(param); 279 set_compress_params(param, block, offset); 280 281 qemu_cond_signal(¶m->cond); 282 qemu_mutex_unlock(¶m->mutex); 283 pages = 1; 284 break; 285 } 286 } 287 288 /* 289 * wait for the free thread if the user specifies 'compress-wait-thread', 290 * otherwise we will post the page out in the main thread as normal page. 291 */ 292 if (pages < 0 && wait) { 293 qemu_cond_wait(&comp_done_cond, &comp_done_lock); 294 goto retry; 295 } 296 qemu_mutex_unlock(&comp_done_lock); 297 298 return pages; 299 } 300 301 /* return the size after decompression, or negative value on error */ 302 static int 303 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, 304 const uint8_t *source, size_t source_len) 305 { 306 int err; 307 308 err = inflateReset(stream); 309 if (err != Z_OK) { 310 return -1; 311 } 312 313 stream->avail_in = source_len; 314 stream->next_in = (uint8_t *)source; 315 stream->avail_out = dest_len; 316 stream->next_out = dest; 317 318 err = inflate(stream, Z_NO_FLUSH); 319 if (err != Z_STREAM_END) { 320 return -1; 321 } 322 323 return stream->total_out; 324 } 325 326 static void *do_data_decompress(void *opaque) 327 { 328 DecompressParam *param = opaque; 329 unsigned long pagesize; 330 uint8_t *des; 331 int len, ret; 332 333 qemu_mutex_lock(¶m->mutex); 334 while (!param->quit) { 335 if (param->des) { 336 des = param->des; 337 len = param->len; 338 param->des = 0; 339 qemu_mutex_unlock(¶m->mutex); 340 341 pagesize = qemu_target_page_size(); 342 343 ret = qemu_uncompress_data(¶m->stream, des, pagesize, 344 param->compbuf, len); 345 if (ret < 0 && migrate_get_current()->decompress_error_check) { 346 error_report("decompress data failed"); 347 qemu_file_set_error(decomp_file, ret); 348 } 349 350 qemu_mutex_lock(&decomp_done_lock); 351 param->done = true; 352 qemu_cond_signal(&decomp_done_cond); 353 qemu_mutex_unlock(&decomp_done_lock); 354 355 qemu_mutex_lock(¶m->mutex); 356 } else { 357 qemu_cond_wait(¶m->cond, ¶m->mutex); 358 } 359 } 360 qemu_mutex_unlock(¶m->mutex); 361 362 return NULL; 363 } 364 365 int wait_for_decompress_done(void) 366 { 367 int idx, thread_count; 368 369 if (!migrate_compress()) { 370 return 0; 371 } 372 373 thread_count = migrate_decompress_threads(); 374 qemu_mutex_lock(&decomp_done_lock); 375 for (idx = 0; idx < thread_count; idx++) { 376 while (!decomp_param[idx].done) { 377 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); 378 } 379 } 380 qemu_mutex_unlock(&decomp_done_lock); 381 return qemu_file_get_error(decomp_file); 382 } 383 384 void compress_threads_load_cleanup(void) 385 { 386 int i, thread_count; 387 388 if (!migrate_compress()) { 389 return; 390 } 391 thread_count = migrate_decompress_threads(); 392 for (i = 0; i < thread_count; i++) { 393 /* 394 * we use it as a indicator which shows if the thread is 395 * properly init'd or not 396 */ 397 if (!decomp_param[i].compbuf) { 398 break; 399 } 400 401 qemu_mutex_lock(&decomp_param[i].mutex); 402 decomp_param[i].quit = true; 403 qemu_cond_signal(&decomp_param[i].cond); 404 qemu_mutex_unlock(&decomp_param[i].mutex); 405 } 406 for (i = 0; i < thread_count; i++) { 407 if (!decomp_param[i].compbuf) { 408 break; 409 } 410 411 qemu_thread_join(decompress_threads + i); 412 qemu_mutex_destroy(&decomp_param[i].mutex); 413 qemu_cond_destroy(&decomp_param[i].cond); 414 inflateEnd(&decomp_param[i].stream); 415 g_free(decomp_param[i].compbuf); 416 decomp_param[i].compbuf = NULL; 417 } 418 g_free(decompress_threads); 419 g_free(decomp_param); 420 decompress_threads = NULL; 421 decomp_param = NULL; 422 decomp_file = NULL; 423 } 424 425 int compress_threads_load_setup(QEMUFile *f) 426 { 427 int i, thread_count; 428 429 if (!migrate_compress()) { 430 return 0; 431 } 432 433 thread_count = migrate_decompress_threads(); 434 decompress_threads = g_new0(QemuThread, thread_count); 435 decomp_param = g_new0(DecompressParam, thread_count); 436 qemu_mutex_init(&decomp_done_lock); 437 qemu_cond_init(&decomp_done_cond); 438 decomp_file = f; 439 for (i = 0; i < thread_count; i++) { 440 if (inflateInit(&decomp_param[i].stream) != Z_OK) { 441 goto exit; 442 } 443 444 size_t compbuf_size = compressBound(qemu_target_page_size()); 445 decomp_param[i].compbuf = g_malloc0(compbuf_size); 446 qemu_mutex_init(&decomp_param[i].mutex); 447 qemu_cond_init(&decomp_param[i].cond); 448 decomp_param[i].done = true; 449 decomp_param[i].quit = false; 450 qemu_thread_create(decompress_threads + i, "decompress", 451 do_data_decompress, decomp_param + i, 452 QEMU_THREAD_JOINABLE); 453 } 454 return 0; 455 exit: 456 compress_threads_load_cleanup(); 457 return -1; 458 } 459 460 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len) 461 { 462 int idx, thread_count; 463 464 thread_count = migrate_decompress_threads(); 465 QEMU_LOCK_GUARD(&decomp_done_lock); 466 while (true) { 467 for (idx = 0; idx < thread_count; idx++) { 468 if (decomp_param[idx].done) { 469 decomp_param[idx].done = false; 470 qemu_mutex_lock(&decomp_param[idx].mutex); 471 qemu_get_buffer(f, decomp_param[idx].compbuf, len); 472 decomp_param[idx].des = host; 473 decomp_param[idx].len = len; 474 qemu_cond_signal(&decomp_param[idx].cond); 475 qemu_mutex_unlock(&decomp_param[idx].mutex); 476 break; 477 } 478 } 479 if (idx < thread_count) { 480 break; 481 } else { 482 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); 483 } 484 } 485 } 486