xref: /qemu/migration/ram.c (revision 65a109ab)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "cpu.h"
31 #include <zlib.h>
32 #include "qemu/cutils.h"
33 #include "qemu/bitops.h"
34 #include "qemu/bitmap.h"
35 #include "qemu/main-loop.h"
36 #include "qemu/pmem.h"
37 #include "xbzrle.h"
38 #include "ram.h"
39 #include "migration.h"
40 #include "socket.h"
41 #include "migration/register.h"
42 #include "migration/misc.h"
43 #include "qemu-file.h"
44 #include "postcopy-ram.h"
45 #include "page_cache.h"
46 #include "qemu/error-report.h"
47 #include "qapi/error.h"
48 #include "qapi/qapi-events-migration.h"
49 #include "qapi/qmp/qerror.h"
50 #include "trace.h"
51 #include "exec/ram_addr.h"
52 #include "exec/target_page.h"
53 #include "qemu/rcu_queue.h"
54 #include "migration/colo.h"
55 #include "block.h"
56 #include "sysemu/sysemu.h"
57 #include "qemu/uuid.h"
58 #include "savevm.h"
59 #include "qemu/iov.h"
60 
61 /***********************************************************/
62 /* ram save/restore */
63 
64 /* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
65  * worked for pages that where filled with the same char.  We switched
66  * it to only search for the zero value.  And to avoid confusion with
67  * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
68  */
69 
70 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
71 #define RAM_SAVE_FLAG_ZERO     0x02
72 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
73 #define RAM_SAVE_FLAG_PAGE     0x08
74 #define RAM_SAVE_FLAG_EOS      0x10
75 #define RAM_SAVE_FLAG_CONTINUE 0x20
76 #define RAM_SAVE_FLAG_XBZRLE   0x40
77 /* 0x80 is reserved in migration.h start with 0x100 next */
78 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
79 
80 static inline bool is_zero_range(uint8_t *p, uint64_t size)
81 {
82     return buffer_is_zero(p, size);
83 }
84 
85 XBZRLECacheStats xbzrle_counters;
86 
87 /* struct contains XBZRLE cache and a static page
88    used by the compression */
89 static struct {
90     /* buffer used for XBZRLE encoding */
91     uint8_t *encoded_buf;
92     /* buffer for storing page content */
93     uint8_t *current_buf;
94     /* Cache for XBZRLE, Protected by lock. */
95     PageCache *cache;
96     QemuMutex lock;
97     /* it will store a page full of zeros */
98     uint8_t *zero_target_page;
99     /* buffer used for XBZRLE decoding */
100     uint8_t *decoded_buf;
101 } XBZRLE;
102 
103 static void XBZRLE_cache_lock(void)
104 {
105     if (migrate_use_xbzrle())
106         qemu_mutex_lock(&XBZRLE.lock);
107 }
108 
109 static void XBZRLE_cache_unlock(void)
110 {
111     if (migrate_use_xbzrle())
112         qemu_mutex_unlock(&XBZRLE.lock);
113 }
114 
115 /**
116  * xbzrle_cache_resize: resize the xbzrle cache
117  *
118  * This function is called from qmp_migrate_set_cache_size in main
119  * thread, possibly while a migration is in progress.  A running
120  * migration may be using the cache and might finish during this call,
121  * hence changes to the cache are protected by XBZRLE.lock().
122  *
123  * Returns 0 for success or -1 for error
124  *
125  * @new_size: new cache size
126  * @errp: set *errp if the check failed, with reason
127  */
128 int xbzrle_cache_resize(int64_t new_size, Error **errp)
129 {
130     PageCache *new_cache;
131     int64_t ret = 0;
132 
133     /* Check for truncation */
134     if (new_size != (size_t)new_size) {
135         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
136                    "exceeding address space");
137         return -1;
138     }
139 
140     if (new_size == migrate_xbzrle_cache_size()) {
141         /* nothing to do */
142         return 0;
143     }
144 
145     XBZRLE_cache_lock();
146 
147     if (XBZRLE.cache != NULL) {
148         new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
149         if (!new_cache) {
150             ret = -1;
151             goto out;
152         }
153 
154         cache_fini(XBZRLE.cache);
155         XBZRLE.cache = new_cache;
156     }
157 out:
158     XBZRLE_cache_unlock();
159     return ret;
160 }
161 
162 static bool ramblock_is_ignored(RAMBlock *block)
163 {
164     return !qemu_ram_is_migratable(block) ||
165            (migrate_ignore_shared() && qemu_ram_is_shared(block));
166 }
167 
168 /* Should be holding either ram_list.mutex, or the RCU lock. */
169 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
170     INTERNAL_RAMBLOCK_FOREACH(block)                   \
171         if (ramblock_is_ignored(block)) {} else
172 
173 #define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
174     INTERNAL_RAMBLOCK_FOREACH(block)                   \
175         if (!qemu_ram_is_migratable(block)) {} else
176 
177 #undef RAMBLOCK_FOREACH
178 
179 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
180 {
181     RAMBlock *block;
182     int ret = 0;
183 
184     rcu_read_lock();
185     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
186         ret = func(block, opaque);
187         if (ret) {
188             break;
189         }
190     }
191     rcu_read_unlock();
192     return ret;
193 }
194 
195 static void ramblock_recv_map_init(void)
196 {
197     RAMBlock *rb;
198 
199     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
200         assert(!rb->receivedmap);
201         rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
202     }
203 }
204 
205 int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
206 {
207     return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
208                     rb->receivedmap);
209 }
210 
211 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
212 {
213     return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
214 }
215 
216 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
217 {
218     set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
219 }
220 
221 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
222                                     size_t nr)
223 {
224     bitmap_set_atomic(rb->receivedmap,
225                       ramblock_recv_bitmap_offset(host_addr, rb),
226                       nr);
227 }
228 
229 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
230 
231 /*
232  * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
233  *
234  * Returns >0 if success with sent bytes, or <0 if error.
235  */
236 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
237                                   const char *block_name)
238 {
239     RAMBlock *block = qemu_ram_block_by_name(block_name);
240     unsigned long *le_bitmap, nbits;
241     uint64_t size;
242 
243     if (!block) {
244         error_report("%s: invalid block name: %s", __func__, block_name);
245         return -1;
246     }
247 
248     nbits = block->used_length >> TARGET_PAGE_BITS;
249 
250     /*
251      * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
252      * machines we may need 4 more bytes for padding (see below
253      * comment). So extend it a bit before hand.
254      */
255     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
256 
257     /*
258      * Always use little endian when sending the bitmap. This is
259      * required that when source and destination VMs are not using the
260      * same endianess. (Note: big endian won't work.)
261      */
262     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
263 
264     /* Size of the bitmap, in bytes */
265     size = DIV_ROUND_UP(nbits, 8);
266 
267     /*
268      * size is always aligned to 8 bytes for 64bit machines, but it
269      * may not be true for 32bit machines. We need this padding to
270      * make sure the migration can survive even between 32bit and
271      * 64bit machines.
272      */
273     size = ROUND_UP(size, 8);
274 
275     qemu_put_be64(file, size);
276     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
277     /*
278      * Mark as an end, in case the middle part is screwed up due to
279      * some "misterious" reason.
280      */
281     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
282     qemu_fflush(file);
283 
284     g_free(le_bitmap);
285 
286     if (qemu_file_get_error(file)) {
287         return qemu_file_get_error(file);
288     }
289 
290     return size + sizeof(size);
291 }
292 
293 /*
294  * An outstanding page request, on the source, having been received
295  * and queued
296  */
297 struct RAMSrcPageRequest {
298     RAMBlock *rb;
299     hwaddr    offset;
300     hwaddr    len;
301 
302     QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
303 };
304 
305 /* State of RAM for migration */
306 struct RAMState {
307     /* QEMUFile used for this migration */
308     QEMUFile *f;
309     /* Last block that we have visited searching for dirty pages */
310     RAMBlock *last_seen_block;
311     /* Last block from where we have sent data */
312     RAMBlock *last_sent_block;
313     /* Last dirty target page we have sent */
314     ram_addr_t last_page;
315     /* last ram version we have seen */
316     uint32_t last_version;
317     /* We are in the first round */
318     bool ram_bulk_stage;
319     /* The free page optimization is enabled */
320     bool fpo_enabled;
321     /* How many times we have dirty too many pages */
322     int dirty_rate_high_cnt;
323     /* these variables are used for bitmap sync */
324     /* last time we did a full bitmap_sync */
325     int64_t time_last_bitmap_sync;
326     /* bytes transferred at start_time */
327     uint64_t bytes_xfer_prev;
328     /* number of dirty pages since start_time */
329     uint64_t num_dirty_pages_period;
330     /* xbzrle misses since the beginning of the period */
331     uint64_t xbzrle_cache_miss_prev;
332 
333     /* compression statistics since the beginning of the period */
334     /* amount of count that no free thread to compress data */
335     uint64_t compress_thread_busy_prev;
336     /* amount bytes after compression */
337     uint64_t compressed_size_prev;
338     /* amount of compressed pages */
339     uint64_t compress_pages_prev;
340 
341     /* total handled target pages at the beginning of period */
342     uint64_t target_page_count_prev;
343     /* total handled target pages since start */
344     uint64_t target_page_count;
345     /* number of dirty bits in the bitmap */
346     uint64_t migration_dirty_pages;
347     /* Protects modification of the bitmap and migration dirty pages */
348     QemuMutex bitmap_mutex;
349     /* The RAMBlock used in the last src_page_requests */
350     RAMBlock *last_req_rb;
351     /* Queue of outstanding page requests from the destination */
352     QemuMutex src_page_req_mutex;
353     QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
354 };
355 typedef struct RAMState RAMState;
356 
357 static RAMState *ram_state;
358 
359 static NotifierWithReturnList precopy_notifier_list;
360 
361 void precopy_infrastructure_init(void)
362 {
363     notifier_with_return_list_init(&precopy_notifier_list);
364 }
365 
366 void precopy_add_notifier(NotifierWithReturn *n)
367 {
368     notifier_with_return_list_add(&precopy_notifier_list, n);
369 }
370 
371 void precopy_remove_notifier(NotifierWithReturn *n)
372 {
373     notifier_with_return_remove(n);
374 }
375 
376 int precopy_notify(PrecopyNotifyReason reason, Error **errp)
377 {
378     PrecopyNotifyData pnd;
379     pnd.reason = reason;
380     pnd.errp = errp;
381 
382     return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
383 }
384 
385 void precopy_enable_free_page_optimization(void)
386 {
387     if (!ram_state) {
388         return;
389     }
390 
391     ram_state->fpo_enabled = true;
392 }
393 
394 uint64_t ram_bytes_remaining(void)
395 {
396     return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
397                        0;
398 }
399 
400 MigrationStats ram_counters;
401 
402 /* used by the search for pages to send */
403 struct PageSearchStatus {
404     /* Current block being searched */
405     RAMBlock    *block;
406     /* Current page to search from */
407     unsigned long page;
408     /* Set once we wrap around */
409     bool         complete_round;
410 };
411 typedef struct PageSearchStatus PageSearchStatus;
412 
413 CompressionStats compression_counters;
414 
415 struct CompressParam {
416     bool done;
417     bool quit;
418     bool zero_page;
419     QEMUFile *file;
420     QemuMutex mutex;
421     QemuCond cond;
422     RAMBlock *block;
423     ram_addr_t offset;
424 
425     /* internally used fields */
426     z_stream stream;
427     uint8_t *originbuf;
428 };
429 typedef struct CompressParam CompressParam;
430 
431 struct DecompressParam {
432     bool done;
433     bool quit;
434     QemuMutex mutex;
435     QemuCond cond;
436     void *des;
437     uint8_t *compbuf;
438     int len;
439     z_stream stream;
440 };
441 typedef struct DecompressParam DecompressParam;
442 
443 static CompressParam *comp_param;
444 static QemuThread *compress_threads;
445 /* comp_done_cond is used to wake up the migration thread when
446  * one of the compression threads has finished the compression.
447  * comp_done_lock is used to co-work with comp_done_cond.
448  */
449 static QemuMutex comp_done_lock;
450 static QemuCond comp_done_cond;
451 /* The empty QEMUFileOps will be used by file in CompressParam */
452 static const QEMUFileOps empty_ops = { };
453 
454 static QEMUFile *decomp_file;
455 static DecompressParam *decomp_param;
456 static QemuThread *decompress_threads;
457 static QemuMutex decomp_done_lock;
458 static QemuCond decomp_done_cond;
459 
460 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
461                                  ram_addr_t offset, uint8_t *source_buf);
462 
463 static void *do_data_compress(void *opaque)
464 {
465     CompressParam *param = opaque;
466     RAMBlock *block;
467     ram_addr_t offset;
468     bool zero_page;
469 
470     qemu_mutex_lock(&param->mutex);
471     while (!param->quit) {
472         if (param->block) {
473             block = param->block;
474             offset = param->offset;
475             param->block = NULL;
476             qemu_mutex_unlock(&param->mutex);
477 
478             zero_page = do_compress_ram_page(param->file, &param->stream,
479                                              block, offset, param->originbuf);
480 
481             qemu_mutex_lock(&comp_done_lock);
482             param->done = true;
483             param->zero_page = zero_page;
484             qemu_cond_signal(&comp_done_cond);
485             qemu_mutex_unlock(&comp_done_lock);
486 
487             qemu_mutex_lock(&param->mutex);
488         } else {
489             qemu_cond_wait(&param->cond, &param->mutex);
490         }
491     }
492     qemu_mutex_unlock(&param->mutex);
493 
494     return NULL;
495 }
496 
497 static void compress_threads_save_cleanup(void)
498 {
499     int i, thread_count;
500 
501     if (!migrate_use_compression() || !comp_param) {
502         return;
503     }
504 
505     thread_count = migrate_compress_threads();
506     for (i = 0; i < thread_count; i++) {
507         /*
508          * we use it as a indicator which shows if the thread is
509          * properly init'd or not
510          */
511         if (!comp_param[i].file) {
512             break;
513         }
514 
515         qemu_mutex_lock(&comp_param[i].mutex);
516         comp_param[i].quit = true;
517         qemu_cond_signal(&comp_param[i].cond);
518         qemu_mutex_unlock(&comp_param[i].mutex);
519 
520         qemu_thread_join(compress_threads + i);
521         qemu_mutex_destroy(&comp_param[i].mutex);
522         qemu_cond_destroy(&comp_param[i].cond);
523         deflateEnd(&comp_param[i].stream);
524         g_free(comp_param[i].originbuf);
525         qemu_fclose(comp_param[i].file);
526         comp_param[i].file = NULL;
527     }
528     qemu_mutex_destroy(&comp_done_lock);
529     qemu_cond_destroy(&comp_done_cond);
530     g_free(compress_threads);
531     g_free(comp_param);
532     compress_threads = NULL;
533     comp_param = NULL;
534 }
535 
536 static int compress_threads_save_setup(void)
537 {
538     int i, thread_count;
539 
540     if (!migrate_use_compression()) {
541         return 0;
542     }
543     thread_count = migrate_compress_threads();
544     compress_threads = g_new0(QemuThread, thread_count);
545     comp_param = g_new0(CompressParam, thread_count);
546     qemu_cond_init(&comp_done_cond);
547     qemu_mutex_init(&comp_done_lock);
548     for (i = 0; i < thread_count; i++) {
549         comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
550         if (!comp_param[i].originbuf) {
551             goto exit;
552         }
553 
554         if (deflateInit(&comp_param[i].stream,
555                         migrate_compress_level()) != Z_OK) {
556             g_free(comp_param[i].originbuf);
557             goto exit;
558         }
559 
560         /* comp_param[i].file is just used as a dummy buffer to save data,
561          * set its ops to empty.
562          */
563         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
564         comp_param[i].done = true;
565         comp_param[i].quit = false;
566         qemu_mutex_init(&comp_param[i].mutex);
567         qemu_cond_init(&comp_param[i].cond);
568         qemu_thread_create(compress_threads + i, "compress",
569                            do_data_compress, comp_param + i,
570                            QEMU_THREAD_JOINABLE);
571     }
572     return 0;
573 
574 exit:
575     compress_threads_save_cleanup();
576     return -1;
577 }
578 
579 /* Multiple fd's */
580 
581 #define MULTIFD_MAGIC 0x11223344U
582 #define MULTIFD_VERSION 1
583 
584 #define MULTIFD_FLAG_SYNC (1 << 0)
585 
586 /* This value needs to be a multiple of qemu_target_page_size() */
587 #define MULTIFD_PACKET_SIZE (512 * 1024)
588 
589 typedef struct {
590     uint32_t magic;
591     uint32_t version;
592     unsigned char uuid[16]; /* QemuUUID */
593     uint8_t id;
594     uint8_t unused1[7];     /* Reserved for future use */
595     uint64_t unused2[4];    /* Reserved for future use */
596 } __attribute__((packed)) MultiFDInit_t;
597 
598 typedef struct {
599     uint32_t magic;
600     uint32_t version;
601     uint32_t flags;
602     /* maximum number of allocated pages */
603     uint32_t pages_alloc;
604     uint32_t pages_used;
605     /* size of the next packet that contains pages */
606     uint32_t next_packet_size;
607     uint64_t packet_num;
608     uint64_t unused[4];    /* Reserved for future use */
609     char ramblock[256];
610     uint64_t offset[];
611 } __attribute__((packed)) MultiFDPacket_t;
612 
613 typedef struct {
614     /* number of used pages */
615     uint32_t used;
616     /* number of allocated pages */
617     uint32_t allocated;
618     /* global number of generated multifd packets */
619     uint64_t packet_num;
620     /* offset of each page */
621     ram_addr_t *offset;
622     /* pointer to each page */
623     struct iovec *iov;
624     RAMBlock *block;
625 } MultiFDPages_t;
626 
627 typedef struct {
628     /* this fields are not changed once the thread is created */
629     /* channel number */
630     uint8_t id;
631     /* channel thread name */
632     char *name;
633     /* channel thread id */
634     QemuThread thread;
635     /* communication channel */
636     QIOChannel *c;
637     /* sem where to wait for more work */
638     QemuSemaphore sem;
639     /* this mutex protects the following parameters */
640     QemuMutex mutex;
641     /* is this channel thread running */
642     bool running;
643     /* should this thread finish */
644     bool quit;
645     /* thread has work to do */
646     int pending_job;
647     /* array of pages to sent */
648     MultiFDPages_t *pages;
649     /* packet allocated len */
650     uint32_t packet_len;
651     /* pointer to the packet */
652     MultiFDPacket_t *packet;
653     /* multifd flags for each packet */
654     uint32_t flags;
655     /* size of the next packet that contains pages */
656     uint32_t next_packet_size;
657     /* global number of generated multifd packets */
658     uint64_t packet_num;
659     /* thread local variables */
660     /* packets sent through this channel */
661     uint64_t num_packets;
662     /* pages sent through this channel */
663     uint64_t num_pages;
664     /* syncs main thread and channels */
665     QemuSemaphore sem_sync;
666 }  MultiFDSendParams;
667 
668 typedef struct {
669     /* this fields are not changed once the thread is created */
670     /* channel number */
671     uint8_t id;
672     /* channel thread name */
673     char *name;
674     /* channel thread id */
675     QemuThread thread;
676     /* communication channel */
677     QIOChannel *c;
678     /* this mutex protects the following parameters */
679     QemuMutex mutex;
680     /* is this channel thread running */
681     bool running;
682     /* array of pages to receive */
683     MultiFDPages_t *pages;
684     /* packet allocated len */
685     uint32_t packet_len;
686     /* pointer to the packet */
687     MultiFDPacket_t *packet;
688     /* multifd flags for each packet */
689     uint32_t flags;
690     /* global number of generated multifd packets */
691     uint64_t packet_num;
692     /* thread local variables */
693     /* size of the next packet that contains pages */
694     uint32_t next_packet_size;
695     /* packets sent through this channel */
696     uint64_t num_packets;
697     /* pages sent through this channel */
698     uint64_t num_pages;
699     /* syncs main thread and channels */
700     QemuSemaphore sem_sync;
701 } MultiFDRecvParams;
702 
703 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
704 {
705     MultiFDInit_t msg;
706     int ret;
707 
708     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
709     msg.version = cpu_to_be32(MULTIFD_VERSION);
710     msg.id = p->id;
711     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
712 
713     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
714     if (ret != 0) {
715         return -1;
716     }
717     return 0;
718 }
719 
720 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
721 {
722     MultiFDInit_t msg;
723     int ret;
724 
725     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
726     if (ret != 0) {
727         return -1;
728     }
729 
730     msg.magic = be32_to_cpu(msg.magic);
731     msg.version = be32_to_cpu(msg.version);
732 
733     if (msg.magic != MULTIFD_MAGIC) {
734         error_setg(errp, "multifd: received packet magic %x "
735                    "expected %x", msg.magic, MULTIFD_MAGIC);
736         return -1;
737     }
738 
739     if (msg.version != MULTIFD_VERSION) {
740         error_setg(errp, "multifd: received packet version %d "
741                    "expected %d", msg.version, MULTIFD_VERSION);
742         return -1;
743     }
744 
745     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
746         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
747         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
748 
749         error_setg(errp, "multifd: received uuid '%s' and expected "
750                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
751         g_free(uuid);
752         g_free(msg_uuid);
753         return -1;
754     }
755 
756     if (msg.id > migrate_multifd_channels()) {
757         error_setg(errp, "multifd: received channel version %d "
758                    "expected %d", msg.version, MULTIFD_VERSION);
759         return -1;
760     }
761 
762     return msg.id;
763 }
764 
765 static MultiFDPages_t *multifd_pages_init(size_t size)
766 {
767     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
768 
769     pages->allocated = size;
770     pages->iov = g_new0(struct iovec, size);
771     pages->offset = g_new0(ram_addr_t, size);
772 
773     return pages;
774 }
775 
776 static void multifd_pages_clear(MultiFDPages_t *pages)
777 {
778     pages->used = 0;
779     pages->allocated = 0;
780     pages->packet_num = 0;
781     pages->block = NULL;
782     g_free(pages->iov);
783     pages->iov = NULL;
784     g_free(pages->offset);
785     pages->offset = NULL;
786     g_free(pages);
787 }
788 
789 static void multifd_send_fill_packet(MultiFDSendParams *p)
790 {
791     MultiFDPacket_t *packet = p->packet;
792     uint32_t page_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
793     int i;
794 
795     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
796     packet->version = cpu_to_be32(MULTIFD_VERSION);
797     packet->flags = cpu_to_be32(p->flags);
798     packet->pages_alloc = cpu_to_be32(page_max);
799     packet->pages_used = cpu_to_be32(p->pages->used);
800     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
801     packet->packet_num = cpu_to_be64(p->packet_num);
802 
803     if (p->pages->block) {
804         strncpy(packet->ramblock, p->pages->block->idstr, 256);
805     }
806 
807     for (i = 0; i < p->pages->used; i++) {
808         packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
809     }
810 }
811 
812 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
813 {
814     MultiFDPacket_t *packet = p->packet;
815     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
816     RAMBlock *block;
817     int i;
818 
819     packet->magic = be32_to_cpu(packet->magic);
820     if (packet->magic != MULTIFD_MAGIC) {
821         error_setg(errp, "multifd: received packet "
822                    "magic %x and expected magic %x",
823                    packet->magic, MULTIFD_MAGIC);
824         return -1;
825     }
826 
827     packet->version = be32_to_cpu(packet->version);
828     if (packet->version != MULTIFD_VERSION) {
829         error_setg(errp, "multifd: received packet "
830                    "version %d and expected version %d",
831                    packet->version, MULTIFD_VERSION);
832         return -1;
833     }
834 
835     p->flags = be32_to_cpu(packet->flags);
836 
837     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
838     /*
839      * If we recevied a packet that is 100 times bigger than expected
840      * just stop migration.  It is a magic number.
841      */
842     if (packet->pages_alloc > pages_max * 100) {
843         error_setg(errp, "multifd: received packet "
844                    "with size %d and expected a maximum size of %d",
845                    packet->pages_alloc, pages_max * 100) ;
846         return -1;
847     }
848     /*
849      * We received a packet that is bigger than expected but inside
850      * reasonable limits (see previous comment).  Just reallocate.
851      */
852     if (packet->pages_alloc > p->pages->allocated) {
853         multifd_pages_clear(p->pages);
854         p->pages = multifd_pages_init(packet->pages_alloc);
855     }
856 
857     p->pages->used = be32_to_cpu(packet->pages_used);
858     if (p->pages->used > packet->pages_alloc) {
859         error_setg(errp, "multifd: received packet "
860                    "with %d pages and expected maximum pages are %d",
861                    p->pages->used, packet->pages_alloc) ;
862         return -1;
863     }
864 
865     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
866     p->packet_num = be64_to_cpu(packet->packet_num);
867 
868     if (p->pages->used) {
869         /* make sure that ramblock is 0 terminated */
870         packet->ramblock[255] = 0;
871         block = qemu_ram_block_by_name(packet->ramblock);
872         if (!block) {
873             error_setg(errp, "multifd: unknown ram block %s",
874                        packet->ramblock);
875             return -1;
876         }
877     }
878 
879     for (i = 0; i < p->pages->used; i++) {
880         ram_addr_t offset = be64_to_cpu(packet->offset[i]);
881 
882         if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
883             error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
884                        " (max " RAM_ADDR_FMT ")",
885                        offset, block->max_length);
886             return -1;
887         }
888         p->pages->iov[i].iov_base = block->host + offset;
889         p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
890     }
891 
892     return 0;
893 }
894 
895 struct {
896     MultiFDSendParams *params;
897     /* number of created threads */
898     int count;
899     /* array of pages to sent */
900     MultiFDPages_t *pages;
901     /* syncs main thread and channels */
902     QemuSemaphore sem_sync;
903     /* global number of generated multifd packets */
904     uint64_t packet_num;
905     /* send channels ready */
906     QemuSemaphore channels_ready;
907 } *multifd_send_state;
908 
909 /*
910  * How we use multifd_send_state->pages and channel->pages?
911  *
912  * We create a pages for each channel, and a main one.  Each time that
913  * we need to send a batch of pages we interchange the ones between
914  * multifd_send_state and the channel that is sending it.  There are
915  * two reasons for that:
916  *    - to not have to do so many mallocs during migration
917  *    - to make easier to know what to free at the end of migration
918  *
919  * This way we always know who is the owner of each "pages" struct,
920  * and we don't need any loocking.  It belongs to the migration thread
921  * or to the channel thread.  Switching is safe because the migration
922  * thread is using the channel mutex when changing it, and the channel
923  * have to had finish with its own, otherwise pending_job can't be
924  * false.
925  */
926 
927 static void multifd_send_pages(void)
928 {
929     int i;
930     static int next_channel;
931     MultiFDSendParams *p = NULL; /* make happy gcc */
932     MultiFDPages_t *pages = multifd_send_state->pages;
933     uint64_t transferred;
934 
935     qemu_sem_wait(&multifd_send_state->channels_ready);
936     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
937         p = &multifd_send_state->params[i];
938 
939         qemu_mutex_lock(&p->mutex);
940         if (!p->pending_job) {
941             p->pending_job++;
942             next_channel = (i + 1) % migrate_multifd_channels();
943             break;
944         }
945         qemu_mutex_unlock(&p->mutex);
946     }
947     p->pages->used = 0;
948 
949     p->packet_num = multifd_send_state->packet_num++;
950     p->pages->block = NULL;
951     multifd_send_state->pages = p->pages;
952     p->pages = pages;
953     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
954     ram_counters.multifd_bytes += transferred;
955     ram_counters.transferred += transferred;;
956     qemu_mutex_unlock(&p->mutex);
957     qemu_sem_post(&p->sem);
958 }
959 
960 static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
961 {
962     MultiFDPages_t *pages = multifd_send_state->pages;
963 
964     if (!pages->block) {
965         pages->block = block;
966     }
967 
968     if (pages->block == block) {
969         pages->offset[pages->used] = offset;
970         pages->iov[pages->used].iov_base = block->host + offset;
971         pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
972         pages->used++;
973 
974         if (pages->used < pages->allocated) {
975             return;
976         }
977     }
978 
979     multifd_send_pages();
980 
981     if (pages->block != block) {
982         multifd_queue_page(block, offset);
983     }
984 }
985 
986 static void multifd_send_terminate_threads(Error *err)
987 {
988     int i;
989 
990     if (err) {
991         MigrationState *s = migrate_get_current();
992         migrate_set_error(s, err);
993         if (s->state == MIGRATION_STATUS_SETUP ||
994             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
995             s->state == MIGRATION_STATUS_DEVICE ||
996             s->state == MIGRATION_STATUS_ACTIVE) {
997             migrate_set_state(&s->state, s->state,
998                               MIGRATION_STATUS_FAILED);
999         }
1000     }
1001 
1002     for (i = 0; i < migrate_multifd_channels(); i++) {
1003         MultiFDSendParams *p = &multifd_send_state->params[i];
1004 
1005         qemu_mutex_lock(&p->mutex);
1006         p->quit = true;
1007         qemu_sem_post(&p->sem);
1008         qemu_mutex_unlock(&p->mutex);
1009     }
1010 }
1011 
1012 void multifd_save_cleanup(void)
1013 {
1014     int i;
1015 
1016     if (!migrate_use_multifd()) {
1017         return;
1018     }
1019     multifd_send_terminate_threads(NULL);
1020     for (i = 0; i < migrate_multifd_channels(); i++) {
1021         MultiFDSendParams *p = &multifd_send_state->params[i];
1022 
1023         if (p->running) {
1024             qemu_thread_join(&p->thread);
1025         }
1026         socket_send_channel_destroy(p->c);
1027         p->c = NULL;
1028         qemu_mutex_destroy(&p->mutex);
1029         qemu_sem_destroy(&p->sem);
1030         qemu_sem_destroy(&p->sem_sync);
1031         g_free(p->name);
1032         p->name = NULL;
1033         multifd_pages_clear(p->pages);
1034         p->pages = NULL;
1035         p->packet_len = 0;
1036         g_free(p->packet);
1037         p->packet = NULL;
1038     }
1039     qemu_sem_destroy(&multifd_send_state->channels_ready);
1040     qemu_sem_destroy(&multifd_send_state->sem_sync);
1041     g_free(multifd_send_state->params);
1042     multifd_send_state->params = NULL;
1043     multifd_pages_clear(multifd_send_state->pages);
1044     multifd_send_state->pages = NULL;
1045     g_free(multifd_send_state);
1046     multifd_send_state = NULL;
1047 }
1048 
1049 static void multifd_send_sync_main(void)
1050 {
1051     int i;
1052 
1053     if (!migrate_use_multifd()) {
1054         return;
1055     }
1056     if (multifd_send_state->pages->used) {
1057         multifd_send_pages();
1058     }
1059     for (i = 0; i < migrate_multifd_channels(); i++) {
1060         MultiFDSendParams *p = &multifd_send_state->params[i];
1061 
1062         trace_multifd_send_sync_main_signal(p->id);
1063 
1064         qemu_mutex_lock(&p->mutex);
1065 
1066         p->packet_num = multifd_send_state->packet_num++;
1067         p->flags |= MULTIFD_FLAG_SYNC;
1068         p->pending_job++;
1069         qemu_mutex_unlock(&p->mutex);
1070         qemu_sem_post(&p->sem);
1071     }
1072     for (i = 0; i < migrate_multifd_channels(); i++) {
1073         MultiFDSendParams *p = &multifd_send_state->params[i];
1074 
1075         trace_multifd_send_sync_main_wait(p->id);
1076         qemu_sem_wait(&multifd_send_state->sem_sync);
1077     }
1078     trace_multifd_send_sync_main(multifd_send_state->packet_num);
1079 }
1080 
1081 static void *multifd_send_thread(void *opaque)
1082 {
1083     MultiFDSendParams *p = opaque;
1084     Error *local_err = NULL;
1085     int ret;
1086 
1087     trace_multifd_send_thread_start(p->id);
1088     rcu_register_thread();
1089 
1090     if (multifd_send_initial_packet(p, &local_err) < 0) {
1091         goto out;
1092     }
1093     /* initial packet */
1094     p->num_packets = 1;
1095 
1096     while (true) {
1097         qemu_sem_wait(&p->sem);
1098         qemu_mutex_lock(&p->mutex);
1099 
1100         if (p->pending_job) {
1101             uint32_t used = p->pages->used;
1102             uint64_t packet_num = p->packet_num;
1103             uint32_t flags = p->flags;
1104 
1105             p->next_packet_size = used * qemu_target_page_size();
1106             multifd_send_fill_packet(p);
1107             p->flags = 0;
1108             p->num_packets++;
1109             p->num_pages += used;
1110             p->pages->used = 0;
1111             qemu_mutex_unlock(&p->mutex);
1112 
1113             trace_multifd_send(p->id, packet_num, used, flags,
1114                                p->next_packet_size);
1115 
1116             ret = qio_channel_write_all(p->c, (void *)p->packet,
1117                                         p->packet_len, &local_err);
1118             if (ret != 0) {
1119                 break;
1120             }
1121 
1122             if (used) {
1123                 ret = qio_channel_writev_all(p->c, p->pages->iov,
1124                                              used, &local_err);
1125                 if (ret != 0) {
1126                     break;
1127                 }
1128             }
1129 
1130             qemu_mutex_lock(&p->mutex);
1131             p->pending_job--;
1132             qemu_mutex_unlock(&p->mutex);
1133 
1134             if (flags & MULTIFD_FLAG_SYNC) {
1135                 qemu_sem_post(&multifd_send_state->sem_sync);
1136             }
1137             qemu_sem_post(&multifd_send_state->channels_ready);
1138         } else if (p->quit) {
1139             qemu_mutex_unlock(&p->mutex);
1140             break;
1141         } else {
1142             qemu_mutex_unlock(&p->mutex);
1143             /* sometimes there are spurious wakeups */
1144         }
1145     }
1146 
1147 out:
1148     if (local_err) {
1149         multifd_send_terminate_threads(local_err);
1150     }
1151 
1152     qemu_mutex_lock(&p->mutex);
1153     p->running = false;
1154     qemu_mutex_unlock(&p->mutex);
1155 
1156     rcu_unregister_thread();
1157     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1158 
1159     return NULL;
1160 }
1161 
1162 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1163 {
1164     MultiFDSendParams *p = opaque;
1165     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1166     Error *local_err = NULL;
1167 
1168     if (qio_task_propagate_error(task, &local_err)) {
1169         migrate_set_error(migrate_get_current(), local_err);
1170         multifd_save_cleanup();
1171     } else {
1172         p->c = QIO_CHANNEL(sioc);
1173         qio_channel_set_delay(p->c, false);
1174         p->running = true;
1175         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1176                            QEMU_THREAD_JOINABLE);
1177 
1178         atomic_inc(&multifd_send_state->count);
1179     }
1180 }
1181 
1182 int multifd_save_setup(void)
1183 {
1184     int thread_count;
1185     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1186     uint8_t i;
1187 
1188     if (!migrate_use_multifd()) {
1189         return 0;
1190     }
1191     thread_count = migrate_multifd_channels();
1192     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1193     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1194     atomic_set(&multifd_send_state->count, 0);
1195     multifd_send_state->pages = multifd_pages_init(page_count);
1196     qemu_sem_init(&multifd_send_state->sem_sync, 0);
1197     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1198 
1199     for (i = 0; i < thread_count; i++) {
1200         MultiFDSendParams *p = &multifd_send_state->params[i];
1201 
1202         qemu_mutex_init(&p->mutex);
1203         qemu_sem_init(&p->sem, 0);
1204         qemu_sem_init(&p->sem_sync, 0);
1205         p->quit = false;
1206         p->pending_job = 0;
1207         p->id = i;
1208         p->pages = multifd_pages_init(page_count);
1209         p->packet_len = sizeof(MultiFDPacket_t)
1210                       + sizeof(ram_addr_t) * page_count;
1211         p->packet = g_malloc0(p->packet_len);
1212         p->name = g_strdup_printf("multifdsend_%d", i);
1213         socket_send_channel_create(multifd_new_send_channel_async, p);
1214     }
1215     return 0;
1216 }
1217 
1218 struct {
1219     MultiFDRecvParams *params;
1220     /* number of created threads */
1221     int count;
1222     /* syncs main thread and channels */
1223     QemuSemaphore sem_sync;
1224     /* global number of generated multifd packets */
1225     uint64_t packet_num;
1226 } *multifd_recv_state;
1227 
1228 static void multifd_recv_terminate_threads(Error *err)
1229 {
1230     int i;
1231 
1232     if (err) {
1233         MigrationState *s = migrate_get_current();
1234         migrate_set_error(s, err);
1235         if (s->state == MIGRATION_STATUS_SETUP ||
1236             s->state == MIGRATION_STATUS_ACTIVE) {
1237             migrate_set_state(&s->state, s->state,
1238                               MIGRATION_STATUS_FAILED);
1239         }
1240     }
1241 
1242     for (i = 0; i < migrate_multifd_channels(); i++) {
1243         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1244 
1245         qemu_mutex_lock(&p->mutex);
1246         /* We could arrive here for two reasons:
1247            - normal quit, i.e. everything went fine, just finished
1248            - error quit: We close the channels so the channel threads
1249              finish the qio_channel_read_all_eof() */
1250         qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1251         qemu_mutex_unlock(&p->mutex);
1252     }
1253 }
1254 
1255 int multifd_load_cleanup(Error **errp)
1256 {
1257     int i;
1258     int ret = 0;
1259 
1260     if (!migrate_use_multifd()) {
1261         return 0;
1262     }
1263     multifd_recv_terminate_threads(NULL);
1264     for (i = 0; i < migrate_multifd_channels(); i++) {
1265         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1266 
1267         if (p->running) {
1268             qemu_thread_join(&p->thread);
1269         }
1270         object_unref(OBJECT(p->c));
1271         p->c = NULL;
1272         qemu_mutex_destroy(&p->mutex);
1273         qemu_sem_destroy(&p->sem_sync);
1274         g_free(p->name);
1275         p->name = NULL;
1276         multifd_pages_clear(p->pages);
1277         p->pages = NULL;
1278         p->packet_len = 0;
1279         g_free(p->packet);
1280         p->packet = NULL;
1281     }
1282     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1283     g_free(multifd_recv_state->params);
1284     multifd_recv_state->params = NULL;
1285     g_free(multifd_recv_state);
1286     multifd_recv_state = NULL;
1287 
1288     return ret;
1289 }
1290 
1291 static void multifd_recv_sync_main(void)
1292 {
1293     int i;
1294 
1295     if (!migrate_use_multifd()) {
1296         return;
1297     }
1298     for (i = 0; i < migrate_multifd_channels(); i++) {
1299         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1300 
1301         trace_multifd_recv_sync_main_wait(p->id);
1302         qemu_sem_wait(&multifd_recv_state->sem_sync);
1303         qemu_mutex_lock(&p->mutex);
1304         if (multifd_recv_state->packet_num < p->packet_num) {
1305             multifd_recv_state->packet_num = p->packet_num;
1306         }
1307         qemu_mutex_unlock(&p->mutex);
1308     }
1309     for (i = 0; i < migrate_multifd_channels(); i++) {
1310         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1311 
1312         trace_multifd_recv_sync_main_signal(p->id);
1313         qemu_sem_post(&p->sem_sync);
1314     }
1315     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1316 }
1317 
1318 static void *multifd_recv_thread(void *opaque)
1319 {
1320     MultiFDRecvParams *p = opaque;
1321     Error *local_err = NULL;
1322     int ret;
1323 
1324     trace_multifd_recv_thread_start(p->id);
1325     rcu_register_thread();
1326 
1327     while (true) {
1328         uint32_t used;
1329         uint32_t flags;
1330 
1331         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1332                                        p->packet_len, &local_err);
1333         if (ret == 0) {   /* EOF */
1334             break;
1335         }
1336         if (ret == -1) {   /* Error */
1337             break;
1338         }
1339 
1340         qemu_mutex_lock(&p->mutex);
1341         ret = multifd_recv_unfill_packet(p, &local_err);
1342         if (ret) {
1343             qemu_mutex_unlock(&p->mutex);
1344             break;
1345         }
1346 
1347         used = p->pages->used;
1348         flags = p->flags;
1349         trace_multifd_recv(p->id, p->packet_num, used, flags,
1350                            p->next_packet_size);
1351         p->num_packets++;
1352         p->num_pages += used;
1353         qemu_mutex_unlock(&p->mutex);
1354 
1355         if (used) {
1356             ret = qio_channel_readv_all(p->c, p->pages->iov,
1357                                         used, &local_err);
1358             if (ret != 0) {
1359                 break;
1360             }
1361         }
1362 
1363         if (flags & MULTIFD_FLAG_SYNC) {
1364             qemu_sem_post(&multifd_recv_state->sem_sync);
1365             qemu_sem_wait(&p->sem_sync);
1366         }
1367     }
1368 
1369     if (local_err) {
1370         multifd_recv_terminate_threads(local_err);
1371     }
1372     qemu_mutex_lock(&p->mutex);
1373     p->running = false;
1374     qemu_mutex_unlock(&p->mutex);
1375 
1376     rcu_unregister_thread();
1377     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1378 
1379     return NULL;
1380 }
1381 
1382 int multifd_load_setup(void)
1383 {
1384     int thread_count;
1385     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1386     uint8_t i;
1387 
1388     if (!migrate_use_multifd()) {
1389         return 0;
1390     }
1391     thread_count = migrate_multifd_channels();
1392     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1393     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1394     atomic_set(&multifd_recv_state->count, 0);
1395     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1396 
1397     for (i = 0; i < thread_count; i++) {
1398         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1399 
1400         qemu_mutex_init(&p->mutex);
1401         qemu_sem_init(&p->sem_sync, 0);
1402         p->id = i;
1403         p->pages = multifd_pages_init(page_count);
1404         p->packet_len = sizeof(MultiFDPacket_t)
1405                       + sizeof(ram_addr_t) * page_count;
1406         p->packet = g_malloc0(p->packet_len);
1407         p->name = g_strdup_printf("multifdrecv_%d", i);
1408     }
1409     return 0;
1410 }
1411 
1412 bool multifd_recv_all_channels_created(void)
1413 {
1414     int thread_count = migrate_multifd_channels();
1415 
1416     if (!migrate_use_multifd()) {
1417         return true;
1418     }
1419 
1420     return thread_count == atomic_read(&multifd_recv_state->count);
1421 }
1422 
1423 /*
1424  * Try to receive all multifd channels to get ready for the migration.
1425  * - Return true and do not set @errp when correctly receving all channels;
1426  * - Return false and do not set @errp when correctly receiving the current one;
1427  * - Return false and set @errp when failing to receive the current channel.
1428  */
1429 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1430 {
1431     MultiFDRecvParams *p;
1432     Error *local_err = NULL;
1433     int id;
1434 
1435     id = multifd_recv_initial_packet(ioc, &local_err);
1436     if (id < 0) {
1437         multifd_recv_terminate_threads(local_err);
1438         error_propagate_prepend(errp, local_err,
1439                                 "failed to receive packet"
1440                                 " via multifd channel %d: ",
1441                                 atomic_read(&multifd_recv_state->count));
1442         return false;
1443     }
1444 
1445     p = &multifd_recv_state->params[id];
1446     if (p->c != NULL) {
1447         error_setg(&local_err, "multifd: received id '%d' already setup'",
1448                    id);
1449         multifd_recv_terminate_threads(local_err);
1450         error_propagate(errp, local_err);
1451         return false;
1452     }
1453     p->c = ioc;
1454     object_ref(OBJECT(ioc));
1455     /* initial packet */
1456     p->num_packets = 1;
1457 
1458     p->running = true;
1459     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1460                        QEMU_THREAD_JOINABLE);
1461     atomic_inc(&multifd_recv_state->count);
1462     return atomic_read(&multifd_recv_state->count) ==
1463            migrate_multifd_channels();
1464 }
1465 
1466 /**
1467  * save_page_header: write page header to wire
1468  *
1469  * If this is the 1st block, it also writes the block identification
1470  *
1471  * Returns the number of bytes written
1472  *
1473  * @f: QEMUFile where to send the data
1474  * @block: block that contains the page we want to send
1475  * @offset: offset inside the block for the page
1476  *          in the lower bits, it contains flags
1477  */
1478 static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1479                                ram_addr_t offset)
1480 {
1481     size_t size, len;
1482 
1483     if (block == rs->last_sent_block) {
1484         offset |= RAM_SAVE_FLAG_CONTINUE;
1485     }
1486     qemu_put_be64(f, offset);
1487     size = 8;
1488 
1489     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1490         len = strlen(block->idstr);
1491         qemu_put_byte(f, len);
1492         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1493         size += 1 + len;
1494         rs->last_sent_block = block;
1495     }
1496     return size;
1497 }
1498 
1499 /**
1500  * mig_throttle_guest_down: throotle down the guest
1501  *
1502  * Reduce amount of guest cpu execution to hopefully slow down memory
1503  * writes. If guest dirty memory rate is reduced below the rate at
1504  * which we can transfer pages to the destination then we should be
1505  * able to complete migration. Some workloads dirty memory way too
1506  * fast and will not effectively converge, even with auto-converge.
1507  */
1508 static void mig_throttle_guest_down(void)
1509 {
1510     MigrationState *s = migrate_get_current();
1511     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1512     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1513     int pct_max = s->parameters.max_cpu_throttle;
1514 
1515     /* We have not started throttling yet. Let's start it. */
1516     if (!cpu_throttle_active()) {
1517         cpu_throttle_set(pct_initial);
1518     } else {
1519         /* Throttling already on, just increase the rate */
1520         cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1521                          pct_max));
1522     }
1523 }
1524 
1525 /**
1526  * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1527  *
1528  * @rs: current RAM state
1529  * @current_addr: address for the zero page
1530  *
1531  * Update the xbzrle cache to reflect a page that's been sent as all 0.
1532  * The important thing is that a stale (not-yet-0'd) page be replaced
1533  * by the new data.
1534  * As a bonus, if the page wasn't in the cache it gets added so that
1535  * when a small write is made into the 0'd page it gets XBZRLE sent.
1536  */
1537 static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1538 {
1539     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1540         return;
1541     }
1542 
1543     /* We don't care if this fails to allocate a new cache page
1544      * as long as it updated an old one */
1545     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1546                  ram_counters.dirty_sync_count);
1547 }
1548 
1549 #define ENCODING_FLAG_XBZRLE 0x1
1550 
1551 /**
1552  * save_xbzrle_page: compress and send current page
1553  *
1554  * Returns: 1 means that we wrote the page
1555  *          0 means that page is identical to the one already sent
1556  *          -1 means that xbzrle would be longer than normal
1557  *
1558  * @rs: current RAM state
1559  * @current_data: pointer to the address of the page contents
1560  * @current_addr: addr of the page
1561  * @block: block that contains the page we want to send
1562  * @offset: offset inside the block for the page
1563  * @last_stage: if we are at the completion stage
1564  */
1565 static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1566                             ram_addr_t current_addr, RAMBlock *block,
1567                             ram_addr_t offset, bool last_stage)
1568 {
1569     int encoded_len = 0, bytes_xbzrle;
1570     uint8_t *prev_cached_page;
1571 
1572     if (!cache_is_cached(XBZRLE.cache, current_addr,
1573                          ram_counters.dirty_sync_count)) {
1574         xbzrle_counters.cache_miss++;
1575         if (!last_stage) {
1576             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1577                              ram_counters.dirty_sync_count) == -1) {
1578                 return -1;
1579             } else {
1580                 /* update *current_data when the page has been
1581                    inserted into cache */
1582                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
1583             }
1584         }
1585         return -1;
1586     }
1587 
1588     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1589 
1590     /* save current buffer into memory */
1591     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1592 
1593     /* XBZRLE encoding (if there is no overflow) */
1594     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1595                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1596                                        TARGET_PAGE_SIZE);
1597     if (encoded_len == 0) {
1598         trace_save_xbzrle_page_skipping();
1599         return 0;
1600     } else if (encoded_len == -1) {
1601         trace_save_xbzrle_page_overflow();
1602         xbzrle_counters.overflow++;
1603         /* update data in the cache */
1604         if (!last_stage) {
1605             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
1606             *current_data = prev_cached_page;
1607         }
1608         return -1;
1609     }
1610 
1611     /* we need to update the data in the cache, in order to get the same data */
1612     if (!last_stage) {
1613         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1614     }
1615 
1616     /* Send XBZRLE based compressed page */
1617     bytes_xbzrle = save_page_header(rs, rs->f, block,
1618                                     offset | RAM_SAVE_FLAG_XBZRLE);
1619     qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1620     qemu_put_be16(rs->f, encoded_len);
1621     qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1622     bytes_xbzrle += encoded_len + 1 + 2;
1623     xbzrle_counters.pages++;
1624     xbzrle_counters.bytes += bytes_xbzrle;
1625     ram_counters.transferred += bytes_xbzrle;
1626 
1627     return 1;
1628 }
1629 
1630 /**
1631  * migration_bitmap_find_dirty: find the next dirty page from start
1632  *
1633  * Called with rcu_read_lock() to protect migration_bitmap
1634  *
1635  * Returns the byte offset within memory region of the start of a dirty page
1636  *
1637  * @rs: current RAM state
1638  * @rb: RAMBlock where to search for dirty pages
1639  * @start: page where we start the search
1640  */
1641 static inline
1642 unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1643                                           unsigned long start)
1644 {
1645     unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1646     unsigned long *bitmap = rb->bmap;
1647     unsigned long next;
1648 
1649     if (ramblock_is_ignored(rb)) {
1650         return size;
1651     }
1652 
1653     /*
1654      * When the free page optimization is enabled, we need to check the bitmap
1655      * to send the non-free pages rather than all the pages in the bulk stage.
1656      */
1657     if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1658         next = start + 1;
1659     } else {
1660         next = find_next_bit(bitmap, size, start);
1661     }
1662 
1663     return next;
1664 }
1665 
1666 static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1667                                                 RAMBlock *rb,
1668                                                 unsigned long page)
1669 {
1670     bool ret;
1671 
1672     qemu_mutex_lock(&rs->bitmap_mutex);
1673     ret = test_and_clear_bit(page, rb->bmap);
1674 
1675     if (ret) {
1676         rs->migration_dirty_pages--;
1677     }
1678     qemu_mutex_unlock(&rs->bitmap_mutex);
1679 
1680     return ret;
1681 }
1682 
1683 static void migration_bitmap_sync_range(RAMState *rs, RAMBlock *rb,
1684                                         ram_addr_t start, ram_addr_t length)
1685 {
1686     rs->migration_dirty_pages +=
1687         cpu_physical_memory_sync_dirty_bitmap(rb, start, length,
1688                                               &rs->num_dirty_pages_period);
1689 }
1690 
1691 /**
1692  * ram_pagesize_summary: calculate all the pagesizes of a VM
1693  *
1694  * Returns a summary bitmap of the page sizes of all RAMBlocks
1695  *
1696  * For VMs with just normal pages this is equivalent to the host page
1697  * size. If it's got some huge pages then it's the OR of all the
1698  * different page sizes.
1699  */
1700 uint64_t ram_pagesize_summary(void)
1701 {
1702     RAMBlock *block;
1703     uint64_t summary = 0;
1704 
1705     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1706         summary |= block->page_size;
1707     }
1708 
1709     return summary;
1710 }
1711 
1712 uint64_t ram_get_total_transferred_pages(void)
1713 {
1714     return  ram_counters.normal + ram_counters.duplicate +
1715                 compression_counters.pages + xbzrle_counters.pages;
1716 }
1717 
1718 static void migration_update_rates(RAMState *rs, int64_t end_time)
1719 {
1720     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1721     double compressed_size;
1722 
1723     /* calculate period counters */
1724     ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1725                 / (end_time - rs->time_last_bitmap_sync);
1726 
1727     if (!page_count) {
1728         return;
1729     }
1730 
1731     if (migrate_use_xbzrle()) {
1732         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1733             rs->xbzrle_cache_miss_prev) / page_count;
1734         rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1735     }
1736 
1737     if (migrate_use_compression()) {
1738         compression_counters.busy_rate = (double)(compression_counters.busy -
1739             rs->compress_thread_busy_prev) / page_count;
1740         rs->compress_thread_busy_prev = compression_counters.busy;
1741 
1742         compressed_size = compression_counters.compressed_size -
1743                           rs->compressed_size_prev;
1744         if (compressed_size) {
1745             double uncompressed_size = (compression_counters.pages -
1746                                     rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1747 
1748             /* Compression-Ratio = Uncompressed-size / Compressed-size */
1749             compression_counters.compression_rate =
1750                                         uncompressed_size / compressed_size;
1751 
1752             rs->compress_pages_prev = compression_counters.pages;
1753             rs->compressed_size_prev = compression_counters.compressed_size;
1754         }
1755     }
1756 }
1757 
1758 static void migration_bitmap_sync(RAMState *rs)
1759 {
1760     RAMBlock *block;
1761     int64_t end_time;
1762     uint64_t bytes_xfer_now;
1763 
1764     ram_counters.dirty_sync_count++;
1765 
1766     if (!rs->time_last_bitmap_sync) {
1767         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1768     }
1769 
1770     trace_migration_bitmap_sync_start();
1771     memory_global_dirty_log_sync();
1772 
1773     qemu_mutex_lock(&rs->bitmap_mutex);
1774     rcu_read_lock();
1775     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1776         migration_bitmap_sync_range(rs, block, 0, block->used_length);
1777     }
1778     ram_counters.remaining = ram_bytes_remaining();
1779     rcu_read_unlock();
1780     qemu_mutex_unlock(&rs->bitmap_mutex);
1781 
1782     trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1783 
1784     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1785 
1786     /* more than 1 second = 1000 millisecons */
1787     if (end_time > rs->time_last_bitmap_sync + 1000) {
1788         bytes_xfer_now = ram_counters.transferred;
1789 
1790         /* During block migration the auto-converge logic incorrectly detects
1791          * that ram migration makes no progress. Avoid this by disabling the
1792          * throttling logic during the bulk phase of block migration. */
1793         if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1794             /* The following detection logic can be refined later. For now:
1795                Check to see if the dirtied bytes is 50% more than the approx.
1796                amount of bytes that just got transferred since the last time we
1797                were in this routine. If that happens twice, start or increase
1798                throttling */
1799 
1800             if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1801                    (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1802                 (++rs->dirty_rate_high_cnt >= 2)) {
1803                     trace_migration_throttle();
1804                     rs->dirty_rate_high_cnt = 0;
1805                     mig_throttle_guest_down();
1806             }
1807         }
1808 
1809         migration_update_rates(rs, end_time);
1810 
1811         rs->target_page_count_prev = rs->target_page_count;
1812 
1813         /* reset period counters */
1814         rs->time_last_bitmap_sync = end_time;
1815         rs->num_dirty_pages_period = 0;
1816         rs->bytes_xfer_prev = bytes_xfer_now;
1817     }
1818     if (migrate_use_events()) {
1819         qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1820     }
1821 }
1822 
1823 static void migration_bitmap_sync_precopy(RAMState *rs)
1824 {
1825     Error *local_err = NULL;
1826 
1827     /*
1828      * The current notifier usage is just an optimization to migration, so we
1829      * don't stop the normal migration process in the error case.
1830      */
1831     if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1832         error_report_err(local_err);
1833     }
1834 
1835     migration_bitmap_sync(rs);
1836 
1837     if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1838         error_report_err(local_err);
1839     }
1840 }
1841 
1842 /**
1843  * save_zero_page_to_file: send the zero page to the file
1844  *
1845  * Returns the size of data written to the file, 0 means the page is not
1846  * a zero page
1847  *
1848  * @rs: current RAM state
1849  * @file: the file where the data is saved
1850  * @block: block that contains the page we want to send
1851  * @offset: offset inside the block for the page
1852  */
1853 static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1854                                   RAMBlock *block, ram_addr_t offset)
1855 {
1856     uint8_t *p = block->host + offset;
1857     int len = 0;
1858 
1859     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1860         len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1861         qemu_put_byte(file, 0);
1862         len += 1;
1863     }
1864     return len;
1865 }
1866 
1867 /**
1868  * save_zero_page: send the zero page to the stream
1869  *
1870  * Returns the number of pages written.
1871  *
1872  * @rs: current RAM state
1873  * @block: block that contains the page we want to send
1874  * @offset: offset inside the block for the page
1875  */
1876 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1877 {
1878     int len = save_zero_page_to_file(rs, rs->f, block, offset);
1879 
1880     if (len) {
1881         ram_counters.duplicate++;
1882         ram_counters.transferred += len;
1883         return 1;
1884     }
1885     return -1;
1886 }
1887 
1888 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1889 {
1890     if (!migrate_release_ram() || !migration_in_postcopy()) {
1891         return;
1892     }
1893 
1894     ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1895 }
1896 
1897 /*
1898  * @pages: the number of pages written by the control path,
1899  *        < 0 - error
1900  *        > 0 - number of pages written
1901  *
1902  * Return true if the pages has been saved, otherwise false is returned.
1903  */
1904 static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1905                               int *pages)
1906 {
1907     uint64_t bytes_xmit = 0;
1908     int ret;
1909 
1910     *pages = -1;
1911     ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1912                                 &bytes_xmit);
1913     if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1914         return false;
1915     }
1916 
1917     if (bytes_xmit) {
1918         ram_counters.transferred += bytes_xmit;
1919         *pages = 1;
1920     }
1921 
1922     if (ret == RAM_SAVE_CONTROL_DELAYED) {
1923         return true;
1924     }
1925 
1926     if (bytes_xmit > 0) {
1927         ram_counters.normal++;
1928     } else if (bytes_xmit == 0) {
1929         ram_counters.duplicate++;
1930     }
1931 
1932     return true;
1933 }
1934 
1935 /*
1936  * directly send the page to the stream
1937  *
1938  * Returns the number of pages written.
1939  *
1940  * @rs: current RAM state
1941  * @block: block that contains the page we want to send
1942  * @offset: offset inside the block for the page
1943  * @buf: the page to be sent
1944  * @async: send to page asyncly
1945  */
1946 static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1947                             uint8_t *buf, bool async)
1948 {
1949     ram_counters.transferred += save_page_header(rs, rs->f, block,
1950                                                  offset | RAM_SAVE_FLAG_PAGE);
1951     if (async) {
1952         qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
1953                               migrate_release_ram() &
1954                               migration_in_postcopy());
1955     } else {
1956         qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
1957     }
1958     ram_counters.transferred += TARGET_PAGE_SIZE;
1959     ram_counters.normal++;
1960     return 1;
1961 }
1962 
1963 /**
1964  * ram_save_page: send the given page to the stream
1965  *
1966  * Returns the number of pages written.
1967  *          < 0 - error
1968  *          >=0 - Number of pages written - this might legally be 0
1969  *                if xbzrle noticed the page was the same.
1970  *
1971  * @rs: current RAM state
1972  * @block: block that contains the page we want to send
1973  * @offset: offset inside the block for the page
1974  * @last_stage: if we are at the completion stage
1975  */
1976 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
1977 {
1978     int pages = -1;
1979     uint8_t *p;
1980     bool send_async = true;
1981     RAMBlock *block = pss->block;
1982     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
1983     ram_addr_t current_addr = block->offset + offset;
1984 
1985     p = block->host + offset;
1986     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
1987 
1988     XBZRLE_cache_lock();
1989     if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
1990         migrate_use_xbzrle()) {
1991         pages = save_xbzrle_page(rs, &p, current_addr, block,
1992                                  offset, last_stage);
1993         if (!last_stage) {
1994             /* Can't send this cached data async, since the cache page
1995              * might get updated before it gets to the wire
1996              */
1997             send_async = false;
1998         }
1999     }
2000 
2001     /* XBZRLE overflow or normal page */
2002     if (pages == -1) {
2003         pages = save_normal_page(rs, block, offset, p, send_async);
2004     }
2005 
2006     XBZRLE_cache_unlock();
2007 
2008     return pages;
2009 }
2010 
2011 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2012                                  ram_addr_t offset)
2013 {
2014     multifd_queue_page(block, offset);
2015     ram_counters.normal++;
2016 
2017     return 1;
2018 }
2019 
2020 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2021                                  ram_addr_t offset, uint8_t *source_buf)
2022 {
2023     RAMState *rs = ram_state;
2024     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2025     bool zero_page = false;
2026     int ret;
2027 
2028     if (save_zero_page_to_file(rs, f, block, offset)) {
2029         zero_page = true;
2030         goto exit;
2031     }
2032 
2033     save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2034 
2035     /*
2036      * copy it to a internal buffer to avoid it being modified by VM
2037      * so that we can catch up the error during compression and
2038      * decompression
2039      */
2040     memcpy(source_buf, p, TARGET_PAGE_SIZE);
2041     ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2042     if (ret < 0) {
2043         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2044         error_report("compressed data failed!");
2045         return false;
2046     }
2047 
2048 exit:
2049     ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2050     return zero_page;
2051 }
2052 
2053 static void
2054 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2055 {
2056     ram_counters.transferred += bytes_xmit;
2057 
2058     if (param->zero_page) {
2059         ram_counters.duplicate++;
2060         return;
2061     }
2062 
2063     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2064     compression_counters.compressed_size += bytes_xmit - 8;
2065     compression_counters.pages++;
2066 }
2067 
2068 static bool save_page_use_compression(RAMState *rs);
2069 
2070 static void flush_compressed_data(RAMState *rs)
2071 {
2072     int idx, len, thread_count;
2073 
2074     if (!save_page_use_compression(rs)) {
2075         return;
2076     }
2077     thread_count = migrate_compress_threads();
2078 
2079     qemu_mutex_lock(&comp_done_lock);
2080     for (idx = 0; idx < thread_count; idx++) {
2081         while (!comp_param[idx].done) {
2082             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2083         }
2084     }
2085     qemu_mutex_unlock(&comp_done_lock);
2086 
2087     for (idx = 0; idx < thread_count; idx++) {
2088         qemu_mutex_lock(&comp_param[idx].mutex);
2089         if (!comp_param[idx].quit) {
2090             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2091             /*
2092              * it's safe to fetch zero_page without holding comp_done_lock
2093              * as there is no further request submitted to the thread,
2094              * i.e, the thread should be waiting for a request at this point.
2095              */
2096             update_compress_thread_counts(&comp_param[idx], len);
2097         }
2098         qemu_mutex_unlock(&comp_param[idx].mutex);
2099     }
2100 }
2101 
2102 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2103                                        ram_addr_t offset)
2104 {
2105     param->block = block;
2106     param->offset = offset;
2107 }
2108 
2109 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2110                                            ram_addr_t offset)
2111 {
2112     int idx, thread_count, bytes_xmit = -1, pages = -1;
2113     bool wait = migrate_compress_wait_thread();
2114 
2115     thread_count = migrate_compress_threads();
2116     qemu_mutex_lock(&comp_done_lock);
2117 retry:
2118     for (idx = 0; idx < thread_count; idx++) {
2119         if (comp_param[idx].done) {
2120             comp_param[idx].done = false;
2121             bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2122             qemu_mutex_lock(&comp_param[idx].mutex);
2123             set_compress_params(&comp_param[idx], block, offset);
2124             qemu_cond_signal(&comp_param[idx].cond);
2125             qemu_mutex_unlock(&comp_param[idx].mutex);
2126             pages = 1;
2127             update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2128             break;
2129         }
2130     }
2131 
2132     /*
2133      * wait for the free thread if the user specifies 'compress-wait-thread',
2134      * otherwise we will post the page out in the main thread as normal page.
2135      */
2136     if (pages < 0 && wait) {
2137         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2138         goto retry;
2139     }
2140     qemu_mutex_unlock(&comp_done_lock);
2141 
2142     return pages;
2143 }
2144 
2145 /**
2146  * find_dirty_block: find the next dirty page and update any state
2147  * associated with the search process.
2148  *
2149  * Returns if a page is found
2150  *
2151  * @rs: current RAM state
2152  * @pss: data about the state of the current dirty page scan
2153  * @again: set to false if the search has scanned the whole of RAM
2154  */
2155 static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2156 {
2157     pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2158     if (pss->complete_round && pss->block == rs->last_seen_block &&
2159         pss->page >= rs->last_page) {
2160         /*
2161          * We've been once around the RAM and haven't found anything.
2162          * Give up.
2163          */
2164         *again = false;
2165         return false;
2166     }
2167     if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2168         /* Didn't find anything in this RAM Block */
2169         pss->page = 0;
2170         pss->block = QLIST_NEXT_RCU(pss->block, next);
2171         if (!pss->block) {
2172             /*
2173              * If memory migration starts over, we will meet a dirtied page
2174              * which may still exists in compression threads's ring, so we
2175              * should flush the compressed data to make sure the new page
2176              * is not overwritten by the old one in the destination.
2177              *
2178              * Also If xbzrle is on, stop using the data compression at this
2179              * point. In theory, xbzrle can do better than compression.
2180              */
2181             flush_compressed_data(rs);
2182 
2183             /* Hit the end of the list */
2184             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2185             /* Flag that we've looped */
2186             pss->complete_round = true;
2187             rs->ram_bulk_stage = false;
2188         }
2189         /* Didn't find anything this time, but try again on the new block */
2190         *again = true;
2191         return false;
2192     } else {
2193         /* Can go around again, but... */
2194         *again = true;
2195         /* We've found something so probably don't need to */
2196         return true;
2197     }
2198 }
2199 
2200 /**
2201  * unqueue_page: gets a page of the queue
2202  *
2203  * Helper for 'get_queued_page' - gets a page off the queue
2204  *
2205  * Returns the block of the page (or NULL if none available)
2206  *
2207  * @rs: current RAM state
2208  * @offset: used to return the offset within the RAMBlock
2209  */
2210 static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2211 {
2212     RAMBlock *block = NULL;
2213 
2214     if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2215         return NULL;
2216     }
2217 
2218     qemu_mutex_lock(&rs->src_page_req_mutex);
2219     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2220         struct RAMSrcPageRequest *entry =
2221                                 QSIMPLEQ_FIRST(&rs->src_page_requests);
2222         block = entry->rb;
2223         *offset = entry->offset;
2224 
2225         if (entry->len > TARGET_PAGE_SIZE) {
2226             entry->len -= TARGET_PAGE_SIZE;
2227             entry->offset += TARGET_PAGE_SIZE;
2228         } else {
2229             memory_region_unref(block->mr);
2230             QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2231             g_free(entry);
2232             migration_consume_urgent_request();
2233         }
2234     }
2235     qemu_mutex_unlock(&rs->src_page_req_mutex);
2236 
2237     return block;
2238 }
2239 
2240 /**
2241  * get_queued_page: unqueue a page from the postocpy requests
2242  *
2243  * Skips pages that are already sent (!dirty)
2244  *
2245  * Returns if a queued page is found
2246  *
2247  * @rs: current RAM state
2248  * @pss: data about the state of the current dirty page scan
2249  */
2250 static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2251 {
2252     RAMBlock  *block;
2253     ram_addr_t offset;
2254     bool dirty;
2255 
2256     do {
2257         block = unqueue_page(rs, &offset);
2258         /*
2259          * We're sending this page, and since it's postcopy nothing else
2260          * will dirty it, and we must make sure it doesn't get sent again
2261          * even if this queue request was received after the background
2262          * search already sent it.
2263          */
2264         if (block) {
2265             unsigned long page;
2266 
2267             page = offset >> TARGET_PAGE_BITS;
2268             dirty = test_bit(page, block->bmap);
2269             if (!dirty) {
2270                 trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2271                        page, test_bit(page, block->unsentmap));
2272             } else {
2273                 trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2274             }
2275         }
2276 
2277     } while (block && !dirty);
2278 
2279     if (block) {
2280         /*
2281          * As soon as we start servicing pages out of order, then we have
2282          * to kill the bulk stage, since the bulk stage assumes
2283          * in (migration_bitmap_find_and_reset_dirty) that every page is
2284          * dirty, that's no longer true.
2285          */
2286         rs->ram_bulk_stage = false;
2287 
2288         /*
2289          * We want the background search to continue from the queued page
2290          * since the guest is likely to want other pages near to the page
2291          * it just requested.
2292          */
2293         pss->block = block;
2294         pss->page = offset >> TARGET_PAGE_BITS;
2295     }
2296 
2297     return !!block;
2298 }
2299 
2300 /**
2301  * migration_page_queue_free: drop any remaining pages in the ram
2302  * request queue
2303  *
2304  * It should be empty at the end anyway, but in error cases there may
2305  * be some left.  in case that there is any page left, we drop it.
2306  *
2307  */
2308 static void migration_page_queue_free(RAMState *rs)
2309 {
2310     struct RAMSrcPageRequest *mspr, *next_mspr;
2311     /* This queue generally should be empty - but in the case of a failed
2312      * migration might have some droppings in.
2313      */
2314     rcu_read_lock();
2315     QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2316         memory_region_unref(mspr->rb->mr);
2317         QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2318         g_free(mspr);
2319     }
2320     rcu_read_unlock();
2321 }
2322 
2323 /**
2324  * ram_save_queue_pages: queue the page for transmission
2325  *
2326  * A request from postcopy destination for example.
2327  *
2328  * Returns zero on success or negative on error
2329  *
2330  * @rbname: Name of the RAMBLock of the request. NULL means the
2331  *          same that last one.
2332  * @start: starting address from the start of the RAMBlock
2333  * @len: length (in bytes) to send
2334  */
2335 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2336 {
2337     RAMBlock *ramblock;
2338     RAMState *rs = ram_state;
2339 
2340     ram_counters.postcopy_requests++;
2341     rcu_read_lock();
2342     if (!rbname) {
2343         /* Reuse last RAMBlock */
2344         ramblock = rs->last_req_rb;
2345 
2346         if (!ramblock) {
2347             /*
2348              * Shouldn't happen, we can't reuse the last RAMBlock if
2349              * it's the 1st request.
2350              */
2351             error_report("ram_save_queue_pages no previous block");
2352             goto err;
2353         }
2354     } else {
2355         ramblock = qemu_ram_block_by_name(rbname);
2356 
2357         if (!ramblock) {
2358             /* We shouldn't be asked for a non-existent RAMBlock */
2359             error_report("ram_save_queue_pages no block '%s'", rbname);
2360             goto err;
2361         }
2362         rs->last_req_rb = ramblock;
2363     }
2364     trace_ram_save_queue_pages(ramblock->idstr, start, len);
2365     if (start+len > ramblock->used_length) {
2366         error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2367                      RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2368                      __func__, start, len, ramblock->used_length);
2369         goto err;
2370     }
2371 
2372     struct RAMSrcPageRequest *new_entry =
2373         g_malloc0(sizeof(struct RAMSrcPageRequest));
2374     new_entry->rb = ramblock;
2375     new_entry->offset = start;
2376     new_entry->len = len;
2377 
2378     memory_region_ref(ramblock->mr);
2379     qemu_mutex_lock(&rs->src_page_req_mutex);
2380     QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2381     migration_make_urgent_request();
2382     qemu_mutex_unlock(&rs->src_page_req_mutex);
2383     rcu_read_unlock();
2384 
2385     return 0;
2386 
2387 err:
2388     rcu_read_unlock();
2389     return -1;
2390 }
2391 
2392 static bool save_page_use_compression(RAMState *rs)
2393 {
2394     if (!migrate_use_compression()) {
2395         return false;
2396     }
2397 
2398     /*
2399      * If xbzrle is on, stop using the data compression after first
2400      * round of migration even if compression is enabled. In theory,
2401      * xbzrle can do better than compression.
2402      */
2403     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2404         return true;
2405     }
2406 
2407     return false;
2408 }
2409 
2410 /*
2411  * try to compress the page before posting it out, return true if the page
2412  * has been properly handled by compression, otherwise needs other
2413  * paths to handle it
2414  */
2415 static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2416 {
2417     if (!save_page_use_compression(rs)) {
2418         return false;
2419     }
2420 
2421     /*
2422      * When starting the process of a new block, the first page of
2423      * the block should be sent out before other pages in the same
2424      * block, and all the pages in last block should have been sent
2425      * out, keeping this order is important, because the 'cont' flag
2426      * is used to avoid resending the block name.
2427      *
2428      * We post the fist page as normal page as compression will take
2429      * much CPU resource.
2430      */
2431     if (block != rs->last_sent_block) {
2432         flush_compressed_data(rs);
2433         return false;
2434     }
2435 
2436     if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2437         return true;
2438     }
2439 
2440     compression_counters.busy++;
2441     return false;
2442 }
2443 
2444 /**
2445  * ram_save_target_page: save one target page
2446  *
2447  * Returns the number of pages written
2448  *
2449  * @rs: current RAM state
2450  * @pss: data about the page we want to send
2451  * @last_stage: if we are at the completion stage
2452  */
2453 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2454                                 bool last_stage)
2455 {
2456     RAMBlock *block = pss->block;
2457     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2458     int res;
2459 
2460     if (control_save_page(rs, block, offset, &res)) {
2461         return res;
2462     }
2463 
2464     if (save_compress_page(rs, block, offset)) {
2465         return 1;
2466     }
2467 
2468     res = save_zero_page(rs, block, offset);
2469     if (res > 0) {
2470         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2471          * page would be stale
2472          */
2473         if (!save_page_use_compression(rs)) {
2474             XBZRLE_cache_lock();
2475             xbzrle_cache_zero_page(rs, block->offset + offset);
2476             XBZRLE_cache_unlock();
2477         }
2478         ram_release_pages(block->idstr, offset, res);
2479         return res;
2480     }
2481 
2482     /*
2483      * do not use multifd for compression as the first page in the new
2484      * block should be posted out before sending the compressed page
2485      */
2486     if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2487         return ram_save_multifd_page(rs, block, offset);
2488     }
2489 
2490     return ram_save_page(rs, pss, last_stage);
2491 }
2492 
2493 /**
2494  * ram_save_host_page: save a whole host page
2495  *
2496  * Starting at *offset send pages up to the end of the current host
2497  * page. It's valid for the initial offset to point into the middle of
2498  * a host page in which case the remainder of the hostpage is sent.
2499  * Only dirty target pages are sent. Note that the host page size may
2500  * be a huge page for this block.
2501  * The saving stops at the boundary of the used_length of the block
2502  * if the RAMBlock isn't a multiple of the host page size.
2503  *
2504  * Returns the number of pages written or negative on error
2505  *
2506  * @rs: current RAM state
2507  * @ms: current migration state
2508  * @pss: data about the page we want to send
2509  * @last_stage: if we are at the completion stage
2510  */
2511 static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2512                               bool last_stage)
2513 {
2514     int tmppages, pages = 0;
2515     size_t pagesize_bits =
2516         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2517 
2518     if (ramblock_is_ignored(pss->block)) {
2519         error_report("block %s should not be migrated !", pss->block->idstr);
2520         return 0;
2521     }
2522 
2523     do {
2524         /* Check the pages is dirty and if it is send it */
2525         if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2526             pss->page++;
2527             continue;
2528         }
2529 
2530         tmppages = ram_save_target_page(rs, pss, last_stage);
2531         if (tmppages < 0) {
2532             return tmppages;
2533         }
2534 
2535         pages += tmppages;
2536         if (pss->block->unsentmap) {
2537             clear_bit(pss->page, pss->block->unsentmap);
2538         }
2539 
2540         pss->page++;
2541     } while ((pss->page & (pagesize_bits - 1)) &&
2542              offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2543 
2544     /* The offset we leave with is the last one we looked at */
2545     pss->page--;
2546     return pages;
2547 }
2548 
2549 /**
2550  * ram_find_and_save_block: finds a dirty page and sends it to f
2551  *
2552  * Called within an RCU critical section.
2553  *
2554  * Returns the number of pages written where zero means no dirty pages,
2555  * or negative on error
2556  *
2557  * @rs: current RAM state
2558  * @last_stage: if we are at the completion stage
2559  *
2560  * On systems where host-page-size > target-page-size it will send all the
2561  * pages in a host page that are dirty.
2562  */
2563 
2564 static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2565 {
2566     PageSearchStatus pss;
2567     int pages = 0;
2568     bool again, found;
2569 
2570     /* No dirty page as there is zero RAM */
2571     if (!ram_bytes_total()) {
2572         return pages;
2573     }
2574 
2575     pss.block = rs->last_seen_block;
2576     pss.page = rs->last_page;
2577     pss.complete_round = false;
2578 
2579     if (!pss.block) {
2580         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2581     }
2582 
2583     do {
2584         again = true;
2585         found = get_queued_page(rs, &pss);
2586 
2587         if (!found) {
2588             /* priority queue empty, so just search for something dirty */
2589             found = find_dirty_block(rs, &pss, &again);
2590         }
2591 
2592         if (found) {
2593             pages = ram_save_host_page(rs, &pss, last_stage);
2594         }
2595     } while (!pages && again);
2596 
2597     rs->last_seen_block = pss.block;
2598     rs->last_page = pss.page;
2599 
2600     return pages;
2601 }
2602 
2603 void acct_update_position(QEMUFile *f, size_t size, bool zero)
2604 {
2605     uint64_t pages = size / TARGET_PAGE_SIZE;
2606 
2607     if (zero) {
2608         ram_counters.duplicate += pages;
2609     } else {
2610         ram_counters.normal += pages;
2611         ram_counters.transferred += size;
2612         qemu_update_position(f, size);
2613     }
2614 }
2615 
2616 static uint64_t ram_bytes_total_common(bool count_ignored)
2617 {
2618     RAMBlock *block;
2619     uint64_t total = 0;
2620 
2621     rcu_read_lock();
2622     if (count_ignored) {
2623         RAMBLOCK_FOREACH_MIGRATABLE(block) {
2624             total += block->used_length;
2625         }
2626     } else {
2627         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2628             total += block->used_length;
2629         }
2630     }
2631     rcu_read_unlock();
2632     return total;
2633 }
2634 
2635 uint64_t ram_bytes_total(void)
2636 {
2637     return ram_bytes_total_common(false);
2638 }
2639 
2640 static void xbzrle_load_setup(void)
2641 {
2642     XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2643 }
2644 
2645 static void xbzrle_load_cleanup(void)
2646 {
2647     g_free(XBZRLE.decoded_buf);
2648     XBZRLE.decoded_buf = NULL;
2649 }
2650 
2651 static void ram_state_cleanup(RAMState **rsp)
2652 {
2653     if (*rsp) {
2654         migration_page_queue_free(*rsp);
2655         qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2656         qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2657         g_free(*rsp);
2658         *rsp = NULL;
2659     }
2660 }
2661 
2662 static void xbzrle_cleanup(void)
2663 {
2664     XBZRLE_cache_lock();
2665     if (XBZRLE.cache) {
2666         cache_fini(XBZRLE.cache);
2667         g_free(XBZRLE.encoded_buf);
2668         g_free(XBZRLE.current_buf);
2669         g_free(XBZRLE.zero_target_page);
2670         XBZRLE.cache = NULL;
2671         XBZRLE.encoded_buf = NULL;
2672         XBZRLE.current_buf = NULL;
2673         XBZRLE.zero_target_page = NULL;
2674     }
2675     XBZRLE_cache_unlock();
2676 }
2677 
2678 static void ram_save_cleanup(void *opaque)
2679 {
2680     RAMState **rsp = opaque;
2681     RAMBlock *block;
2682 
2683     /* caller have hold iothread lock or is in a bh, so there is
2684      * no writing race against this migration_bitmap
2685      */
2686     memory_global_dirty_log_stop();
2687 
2688     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2689         g_free(block->bmap);
2690         block->bmap = NULL;
2691         g_free(block->unsentmap);
2692         block->unsentmap = NULL;
2693     }
2694 
2695     xbzrle_cleanup();
2696     compress_threads_save_cleanup();
2697     ram_state_cleanup(rsp);
2698 }
2699 
2700 static void ram_state_reset(RAMState *rs)
2701 {
2702     rs->last_seen_block = NULL;
2703     rs->last_sent_block = NULL;
2704     rs->last_page = 0;
2705     rs->last_version = ram_list.version;
2706     rs->ram_bulk_stage = true;
2707     rs->fpo_enabled = false;
2708 }
2709 
2710 #define MAX_WAIT 50 /* ms, half buffered_file limit */
2711 
2712 /*
2713  * 'expected' is the value you expect the bitmap mostly to be full
2714  * of; it won't bother printing lines that are all this value.
2715  * If 'todump' is null the migration bitmap is dumped.
2716  */
2717 void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2718                            unsigned long pages)
2719 {
2720     int64_t cur;
2721     int64_t linelen = 128;
2722     char linebuf[129];
2723 
2724     for (cur = 0; cur < pages; cur += linelen) {
2725         int64_t curb;
2726         bool found = false;
2727         /*
2728          * Last line; catch the case where the line length
2729          * is longer than remaining ram
2730          */
2731         if (cur + linelen > pages) {
2732             linelen = pages - cur;
2733         }
2734         for (curb = 0; curb < linelen; curb++) {
2735             bool thisbit = test_bit(cur + curb, todump);
2736             linebuf[curb] = thisbit ? '1' : '.';
2737             found = found || (thisbit != expected);
2738         }
2739         if (found) {
2740             linebuf[curb] = '\0';
2741             fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2742         }
2743     }
2744 }
2745 
2746 /* **** functions for postcopy ***** */
2747 
2748 void ram_postcopy_migrated_memory_release(MigrationState *ms)
2749 {
2750     struct RAMBlock *block;
2751 
2752     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2753         unsigned long *bitmap = block->bmap;
2754         unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2755         unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2756 
2757         while (run_start < range) {
2758             unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2759             ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2760                               (run_end - run_start) << TARGET_PAGE_BITS);
2761             run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2762         }
2763     }
2764 }
2765 
2766 /**
2767  * postcopy_send_discard_bm_ram: discard a RAMBlock
2768  *
2769  * Returns zero on success
2770  *
2771  * Callback from postcopy_each_ram_send_discard for each RAMBlock
2772  * Note: At this point the 'unsentmap' is the processed bitmap combined
2773  *       with the dirtymap; so a '1' means it's either dirty or unsent.
2774  *
2775  * @ms: current migration state
2776  * @pds: state for postcopy
2777  * @start: RAMBlock starting page
2778  * @length: RAMBlock size
2779  */
2780 static int postcopy_send_discard_bm_ram(MigrationState *ms,
2781                                         PostcopyDiscardState *pds,
2782                                         RAMBlock *block)
2783 {
2784     unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2785     unsigned long current;
2786     unsigned long *unsentmap = block->unsentmap;
2787 
2788     for (current = 0; current < end; ) {
2789         unsigned long one = find_next_bit(unsentmap, end, current);
2790 
2791         if (one <= end) {
2792             unsigned long zero = find_next_zero_bit(unsentmap, end, one + 1);
2793             unsigned long discard_length;
2794 
2795             if (zero >= end) {
2796                 discard_length = end - one;
2797             } else {
2798                 discard_length = zero - one;
2799             }
2800             if (discard_length) {
2801                 postcopy_discard_send_range(ms, pds, one, discard_length);
2802             }
2803             current = one + discard_length;
2804         } else {
2805             current = one;
2806         }
2807     }
2808 
2809     return 0;
2810 }
2811 
2812 /**
2813  * postcopy_each_ram_send_discard: discard all RAMBlocks
2814  *
2815  * Returns 0 for success or negative for error
2816  *
2817  * Utility for the outgoing postcopy code.
2818  *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2819  *   passing it bitmap indexes and name.
2820  * (qemu_ram_foreach_block ends up passing unscaled lengths
2821  *  which would mean postcopy code would have to deal with target page)
2822  *
2823  * @ms: current migration state
2824  */
2825 static int postcopy_each_ram_send_discard(MigrationState *ms)
2826 {
2827     struct RAMBlock *block;
2828     int ret;
2829 
2830     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2831         PostcopyDiscardState *pds =
2832             postcopy_discard_send_init(ms, block->idstr);
2833 
2834         /*
2835          * Postcopy sends chunks of bitmap over the wire, but it
2836          * just needs indexes at this point, avoids it having
2837          * target page specific code.
2838          */
2839         ret = postcopy_send_discard_bm_ram(ms, pds, block);
2840         postcopy_discard_send_finish(ms, pds);
2841         if (ret) {
2842             return ret;
2843         }
2844     }
2845 
2846     return 0;
2847 }
2848 
2849 /**
2850  * postcopy_chunk_hostpages_pass: canocalize bitmap in hostpages
2851  *
2852  * Helper for postcopy_chunk_hostpages; it's called twice to
2853  * canonicalize the two bitmaps, that are similar, but one is
2854  * inverted.
2855  *
2856  * Postcopy requires that all target pages in a hostpage are dirty or
2857  * clean, not a mix.  This function canonicalizes the bitmaps.
2858  *
2859  * @ms: current migration state
2860  * @unsent_pass: if true we need to canonicalize partially unsent host pages
2861  *               otherwise we need to canonicalize partially dirty host pages
2862  * @block: block that contains the page we want to canonicalize
2863  * @pds: state for postcopy
2864  */
2865 static void postcopy_chunk_hostpages_pass(MigrationState *ms, bool unsent_pass,
2866                                           RAMBlock *block,
2867                                           PostcopyDiscardState *pds)
2868 {
2869     RAMState *rs = ram_state;
2870     unsigned long *bitmap = block->bmap;
2871     unsigned long *unsentmap = block->unsentmap;
2872     unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2873     unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2874     unsigned long run_start;
2875 
2876     if (block->page_size == TARGET_PAGE_SIZE) {
2877         /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2878         return;
2879     }
2880 
2881     if (unsent_pass) {
2882         /* Find a sent page */
2883         run_start = find_next_zero_bit(unsentmap, pages, 0);
2884     } else {
2885         /* Find a dirty page */
2886         run_start = find_next_bit(bitmap, pages, 0);
2887     }
2888 
2889     while (run_start < pages) {
2890         bool do_fixup = false;
2891         unsigned long fixup_start_addr;
2892         unsigned long host_offset;
2893 
2894         /*
2895          * If the start of this run of pages is in the middle of a host
2896          * page, then we need to fixup this host page.
2897          */
2898         host_offset = run_start % host_ratio;
2899         if (host_offset) {
2900             do_fixup = true;
2901             run_start -= host_offset;
2902             fixup_start_addr = run_start;
2903             /* For the next pass */
2904             run_start = run_start + host_ratio;
2905         } else {
2906             /* Find the end of this run */
2907             unsigned long run_end;
2908             if (unsent_pass) {
2909                 run_end = find_next_bit(unsentmap, pages, run_start + 1);
2910             } else {
2911                 run_end = find_next_zero_bit(bitmap, pages, run_start + 1);
2912             }
2913             /*
2914              * If the end isn't at the start of a host page, then the
2915              * run doesn't finish at the end of a host page
2916              * and we need to discard.
2917              */
2918             host_offset = run_end % host_ratio;
2919             if (host_offset) {
2920                 do_fixup = true;
2921                 fixup_start_addr = run_end - host_offset;
2922                 /*
2923                  * This host page has gone, the next loop iteration starts
2924                  * from after the fixup
2925                  */
2926                 run_start = fixup_start_addr + host_ratio;
2927             } else {
2928                 /*
2929                  * No discards on this iteration, next loop starts from
2930                  * next sent/dirty page
2931                  */
2932                 run_start = run_end + 1;
2933             }
2934         }
2935 
2936         if (do_fixup) {
2937             unsigned long page;
2938 
2939             /* Tell the destination to discard this page */
2940             if (unsent_pass || !test_bit(fixup_start_addr, unsentmap)) {
2941                 /* For the unsent_pass we:
2942                  *     discard partially sent pages
2943                  * For the !unsent_pass (dirty) we:
2944                  *     discard partially dirty pages that were sent
2945                  *     (any partially sent pages were already discarded
2946                  *     by the previous unsent_pass)
2947                  */
2948                 postcopy_discard_send_range(ms, pds, fixup_start_addr,
2949                                             host_ratio);
2950             }
2951 
2952             /* Clean up the bitmap */
2953             for (page = fixup_start_addr;
2954                  page < fixup_start_addr + host_ratio; page++) {
2955                 /* All pages in this host page are now not sent */
2956                 set_bit(page, unsentmap);
2957 
2958                 /*
2959                  * Remark them as dirty, updating the count for any pages
2960                  * that weren't previously dirty.
2961                  */
2962                 rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
2963             }
2964         }
2965 
2966         if (unsent_pass) {
2967             /* Find the next sent page for the next iteration */
2968             run_start = find_next_zero_bit(unsentmap, pages, run_start);
2969         } else {
2970             /* Find the next dirty page for the next iteration */
2971             run_start = find_next_bit(bitmap, pages, run_start);
2972         }
2973     }
2974 }
2975 
2976 /**
2977  * postcopy_chuck_hostpages: discrad any partially sent host page
2978  *
2979  * Utility for the outgoing postcopy code.
2980  *
2981  * Discard any partially sent host-page size chunks, mark any partially
2982  * dirty host-page size chunks as all dirty.  In this case the host-page
2983  * is the host-page for the particular RAMBlock, i.e. it might be a huge page
2984  *
2985  * Returns zero on success
2986  *
2987  * @ms: current migration state
2988  * @block: block we want to work with
2989  */
2990 static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
2991 {
2992     PostcopyDiscardState *pds =
2993         postcopy_discard_send_init(ms, block->idstr);
2994 
2995     /* First pass: Discard all partially sent host pages */
2996     postcopy_chunk_hostpages_pass(ms, true, block, pds);
2997     /*
2998      * Second pass: Ensure that all partially dirty host pages are made
2999      * fully dirty.
3000      */
3001     postcopy_chunk_hostpages_pass(ms, false, block, pds);
3002 
3003     postcopy_discard_send_finish(ms, pds);
3004     return 0;
3005 }
3006 
3007 /**
3008  * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3009  *
3010  * Returns zero on success
3011  *
3012  * Transmit the set of pages to be discarded after precopy to the target
3013  * these are pages that:
3014  *     a) Have been previously transmitted but are now dirty again
3015  *     b) Pages that have never been transmitted, this ensures that
3016  *        any pages on the destination that have been mapped by background
3017  *        tasks get discarded (transparent huge pages is the specific concern)
3018  * Hopefully this is pretty sparse
3019  *
3020  * @ms: current migration state
3021  */
3022 int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3023 {
3024     RAMState *rs = ram_state;
3025     RAMBlock *block;
3026     int ret;
3027 
3028     rcu_read_lock();
3029 
3030     /* This should be our last sync, the src is now paused */
3031     migration_bitmap_sync(rs);
3032 
3033     /* Easiest way to make sure we don't resume in the middle of a host-page */
3034     rs->last_seen_block = NULL;
3035     rs->last_sent_block = NULL;
3036     rs->last_page = 0;
3037 
3038     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3039         unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
3040         unsigned long *bitmap = block->bmap;
3041         unsigned long *unsentmap = block->unsentmap;
3042 
3043         if (!unsentmap) {
3044             /* We don't have a safe way to resize the sentmap, so
3045              * if the bitmap was resized it will be NULL at this
3046              * point.
3047              */
3048             error_report("migration ram resized during precopy phase");
3049             rcu_read_unlock();
3050             return -EINVAL;
3051         }
3052         /* Deal with TPS != HPS and huge pages */
3053         ret = postcopy_chunk_hostpages(ms, block);
3054         if (ret) {
3055             rcu_read_unlock();
3056             return ret;
3057         }
3058 
3059         /*
3060          * Update the unsentmap to be unsentmap = unsentmap | dirty
3061          */
3062         bitmap_or(unsentmap, unsentmap, bitmap, pages);
3063 #ifdef DEBUG_POSTCOPY
3064         ram_debug_dump_bitmap(unsentmap, true, pages);
3065 #endif
3066     }
3067     trace_ram_postcopy_send_discard_bitmap();
3068 
3069     ret = postcopy_each_ram_send_discard(ms);
3070     rcu_read_unlock();
3071 
3072     return ret;
3073 }
3074 
3075 /**
3076  * ram_discard_range: discard dirtied pages at the beginning of postcopy
3077  *
3078  * Returns zero on success
3079  *
3080  * @rbname: name of the RAMBlock of the request. NULL means the
3081  *          same that last one.
3082  * @start: RAMBlock starting page
3083  * @length: RAMBlock size
3084  */
3085 int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3086 {
3087     int ret = -1;
3088 
3089     trace_ram_discard_range(rbname, start, length);
3090 
3091     rcu_read_lock();
3092     RAMBlock *rb = qemu_ram_block_by_name(rbname);
3093 
3094     if (!rb) {
3095         error_report("ram_discard_range: Failed to find block '%s'", rbname);
3096         goto err;
3097     }
3098 
3099     /*
3100      * On source VM, we don't need to update the received bitmap since
3101      * we don't even have one.
3102      */
3103     if (rb->receivedmap) {
3104         bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3105                      length >> qemu_target_page_bits());
3106     }
3107 
3108     ret = ram_block_discard_range(rb, start, length);
3109 
3110 err:
3111     rcu_read_unlock();
3112 
3113     return ret;
3114 }
3115 
3116 /*
3117  * For every allocation, we will try not to crash the VM if the
3118  * allocation failed.
3119  */
3120 static int xbzrle_init(void)
3121 {
3122     Error *local_err = NULL;
3123 
3124     if (!migrate_use_xbzrle()) {
3125         return 0;
3126     }
3127 
3128     XBZRLE_cache_lock();
3129 
3130     XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3131     if (!XBZRLE.zero_target_page) {
3132         error_report("%s: Error allocating zero page", __func__);
3133         goto err_out;
3134     }
3135 
3136     XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3137                               TARGET_PAGE_SIZE, &local_err);
3138     if (!XBZRLE.cache) {
3139         error_report_err(local_err);
3140         goto free_zero_page;
3141     }
3142 
3143     XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3144     if (!XBZRLE.encoded_buf) {
3145         error_report("%s: Error allocating encoded_buf", __func__);
3146         goto free_cache;
3147     }
3148 
3149     XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3150     if (!XBZRLE.current_buf) {
3151         error_report("%s: Error allocating current_buf", __func__);
3152         goto free_encoded_buf;
3153     }
3154 
3155     /* We are all good */
3156     XBZRLE_cache_unlock();
3157     return 0;
3158 
3159 free_encoded_buf:
3160     g_free(XBZRLE.encoded_buf);
3161     XBZRLE.encoded_buf = NULL;
3162 free_cache:
3163     cache_fini(XBZRLE.cache);
3164     XBZRLE.cache = NULL;
3165 free_zero_page:
3166     g_free(XBZRLE.zero_target_page);
3167     XBZRLE.zero_target_page = NULL;
3168 err_out:
3169     XBZRLE_cache_unlock();
3170     return -ENOMEM;
3171 }
3172 
3173 static int ram_state_init(RAMState **rsp)
3174 {
3175     *rsp = g_try_new0(RAMState, 1);
3176 
3177     if (!*rsp) {
3178         error_report("%s: Init ramstate fail", __func__);
3179         return -1;
3180     }
3181 
3182     qemu_mutex_init(&(*rsp)->bitmap_mutex);
3183     qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3184     QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3185 
3186     /*
3187      * Count the total number of pages used by ram blocks not including any
3188      * gaps due to alignment or unplugs.
3189      */
3190     (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3191 
3192     ram_state_reset(*rsp);
3193 
3194     return 0;
3195 }
3196 
3197 static void ram_list_init_bitmaps(void)
3198 {
3199     RAMBlock *block;
3200     unsigned long pages;
3201 
3202     /* Skip setting bitmap if there is no RAM */
3203     if (ram_bytes_total()) {
3204         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3205             pages = block->max_length >> TARGET_PAGE_BITS;
3206             block->bmap = bitmap_new(pages);
3207             bitmap_set(block->bmap, 0, pages);
3208             if (migrate_postcopy_ram()) {
3209                 block->unsentmap = bitmap_new(pages);
3210                 bitmap_set(block->unsentmap, 0, pages);
3211             }
3212         }
3213     }
3214 }
3215 
3216 static void ram_init_bitmaps(RAMState *rs)
3217 {
3218     /* For memory_global_dirty_log_start below.  */
3219     qemu_mutex_lock_iothread();
3220     qemu_mutex_lock_ramlist();
3221     rcu_read_lock();
3222 
3223     ram_list_init_bitmaps();
3224     memory_global_dirty_log_start();
3225     migration_bitmap_sync_precopy(rs);
3226 
3227     rcu_read_unlock();
3228     qemu_mutex_unlock_ramlist();
3229     qemu_mutex_unlock_iothread();
3230 }
3231 
3232 static int ram_init_all(RAMState **rsp)
3233 {
3234     if (ram_state_init(rsp)) {
3235         return -1;
3236     }
3237 
3238     if (xbzrle_init()) {
3239         ram_state_cleanup(rsp);
3240         return -1;
3241     }
3242 
3243     ram_init_bitmaps(*rsp);
3244 
3245     return 0;
3246 }
3247 
3248 static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3249 {
3250     RAMBlock *block;
3251     uint64_t pages = 0;
3252 
3253     /*
3254      * Postcopy is not using xbzrle/compression, so no need for that.
3255      * Also, since source are already halted, we don't need to care
3256      * about dirty page logging as well.
3257      */
3258 
3259     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3260         pages += bitmap_count_one(block->bmap,
3261                                   block->used_length >> TARGET_PAGE_BITS);
3262     }
3263 
3264     /* This may not be aligned with current bitmaps. Recalculate. */
3265     rs->migration_dirty_pages = pages;
3266 
3267     rs->last_seen_block = NULL;
3268     rs->last_sent_block = NULL;
3269     rs->last_page = 0;
3270     rs->last_version = ram_list.version;
3271     /*
3272      * Disable the bulk stage, otherwise we'll resend the whole RAM no
3273      * matter what we have sent.
3274      */
3275     rs->ram_bulk_stage = false;
3276 
3277     /* Update RAMState cache of output QEMUFile */
3278     rs->f = out;
3279 
3280     trace_ram_state_resume_prepare(pages);
3281 }
3282 
3283 /*
3284  * This function clears bits of the free pages reported by the caller from the
3285  * migration dirty bitmap. @addr is the host address corresponding to the
3286  * start of the continuous guest free pages, and @len is the total bytes of
3287  * those pages.
3288  */
3289 void qemu_guest_free_page_hint(void *addr, size_t len)
3290 {
3291     RAMBlock *block;
3292     ram_addr_t offset;
3293     size_t used_len, start, npages;
3294     MigrationState *s = migrate_get_current();
3295 
3296     /* This function is currently expected to be used during live migration */
3297     if (!migration_is_setup_or_active(s->state)) {
3298         return;
3299     }
3300 
3301     for (; len > 0; len -= used_len, addr += used_len) {
3302         block = qemu_ram_block_from_host(addr, false, &offset);
3303         if (unlikely(!block || offset >= block->used_length)) {
3304             /*
3305              * The implementation might not support RAMBlock resize during
3306              * live migration, but it could happen in theory with future
3307              * updates. So we add a check here to capture that case.
3308              */
3309             error_report_once("%s unexpected error", __func__);
3310             return;
3311         }
3312 
3313         if (len <= block->used_length - offset) {
3314             used_len = len;
3315         } else {
3316             used_len = block->used_length - offset;
3317         }
3318 
3319         start = offset >> TARGET_PAGE_BITS;
3320         npages = used_len >> TARGET_PAGE_BITS;
3321 
3322         qemu_mutex_lock(&ram_state->bitmap_mutex);
3323         ram_state->migration_dirty_pages -=
3324                       bitmap_count_one_with_offset(block->bmap, start, npages);
3325         bitmap_clear(block->bmap, start, npages);
3326         qemu_mutex_unlock(&ram_state->bitmap_mutex);
3327     }
3328 }
3329 
3330 /*
3331  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3332  * long-running RCU critical section.  When rcu-reclaims in the code
3333  * start to become numerous it will be necessary to reduce the
3334  * granularity of these critical sections.
3335  */
3336 
3337 /**
3338  * ram_save_setup: Setup RAM for migration
3339  *
3340  * Returns zero to indicate success and negative for error
3341  *
3342  * @f: QEMUFile where to send the data
3343  * @opaque: RAMState pointer
3344  */
3345 static int ram_save_setup(QEMUFile *f, void *opaque)
3346 {
3347     RAMState **rsp = opaque;
3348     RAMBlock *block;
3349 
3350     if (compress_threads_save_setup()) {
3351         return -1;
3352     }
3353 
3354     /* migration has already setup the bitmap, reuse it. */
3355     if (!migration_in_colo_state()) {
3356         if (ram_init_all(rsp) != 0) {
3357             compress_threads_save_cleanup();
3358             return -1;
3359         }
3360     }
3361     (*rsp)->f = f;
3362 
3363     rcu_read_lock();
3364 
3365     qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3366 
3367     RAMBLOCK_FOREACH_MIGRATABLE(block) {
3368         qemu_put_byte(f, strlen(block->idstr));
3369         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3370         qemu_put_be64(f, block->used_length);
3371         if (migrate_postcopy_ram() && block->page_size != qemu_host_page_size) {
3372             qemu_put_be64(f, block->page_size);
3373         }
3374         if (migrate_ignore_shared()) {
3375             qemu_put_be64(f, block->mr->addr);
3376             qemu_put_byte(f, ramblock_is_ignored(block) ? 1 : 0);
3377         }
3378     }
3379 
3380     rcu_read_unlock();
3381 
3382     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3383     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3384 
3385     multifd_send_sync_main();
3386     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3387     qemu_fflush(f);
3388 
3389     return 0;
3390 }
3391 
3392 /**
3393  * ram_save_iterate: iterative stage for migration
3394  *
3395  * Returns zero to indicate success and negative for error
3396  *
3397  * @f: QEMUFile where to send the data
3398  * @opaque: RAMState pointer
3399  */
3400 static int ram_save_iterate(QEMUFile *f, void *opaque)
3401 {
3402     RAMState **temp = opaque;
3403     RAMState *rs = *temp;
3404     int ret;
3405     int i;
3406     int64_t t0;
3407     int done = 0;
3408 
3409     if (blk_mig_bulk_active()) {
3410         /* Avoid transferring ram during bulk phase of block migration as
3411          * the bulk phase will usually take a long time and transferring
3412          * ram updates during that time is pointless. */
3413         goto out;
3414     }
3415 
3416     rcu_read_lock();
3417     if (ram_list.version != rs->last_version) {
3418         ram_state_reset(rs);
3419     }
3420 
3421     /* Read version before ram_list.blocks */
3422     smp_rmb();
3423 
3424     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3425 
3426     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3427     i = 0;
3428     while ((ret = qemu_file_rate_limit(f)) == 0 ||
3429             !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3430         int pages;
3431 
3432         if (qemu_file_get_error(f)) {
3433             break;
3434         }
3435 
3436         pages = ram_find_and_save_block(rs, false);
3437         /* no more pages to sent */
3438         if (pages == 0) {
3439             done = 1;
3440             break;
3441         }
3442 
3443         if (pages < 0) {
3444             qemu_file_set_error(f, pages);
3445             break;
3446         }
3447 
3448         rs->target_page_count += pages;
3449 
3450         /* we want to check in the 1st loop, just in case it was the 1st time
3451            and we had to sync the dirty bitmap.
3452            qemu_get_clock_ns() is a bit expensive, so we only check each some
3453            iterations
3454         */
3455         if ((i & 63) == 0) {
3456             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
3457             if (t1 > MAX_WAIT) {
3458                 trace_ram_save_iterate_big_wait(t1, i);
3459                 break;
3460             }
3461         }
3462         i++;
3463     }
3464     rcu_read_unlock();
3465 
3466     /*
3467      * Must occur before EOS (or any QEMUFile operation)
3468      * because of RDMA protocol.
3469      */
3470     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3471 
3472     multifd_send_sync_main();
3473 out:
3474     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3475     qemu_fflush(f);
3476     ram_counters.transferred += 8;
3477 
3478     ret = qemu_file_get_error(f);
3479     if (ret < 0) {
3480         return ret;
3481     }
3482 
3483     return done;
3484 }
3485 
3486 /**
3487  * ram_save_complete: function called to send the remaining amount of ram
3488  *
3489  * Returns zero to indicate success or negative on error
3490  *
3491  * Called with iothread lock
3492  *
3493  * @f: QEMUFile where to send the data
3494  * @opaque: RAMState pointer
3495  */
3496 static int ram_save_complete(QEMUFile *f, void *opaque)
3497 {
3498     RAMState **temp = opaque;
3499     RAMState *rs = *temp;
3500     int ret = 0;
3501 
3502     rcu_read_lock();
3503 
3504     if (!migration_in_postcopy()) {
3505         migration_bitmap_sync_precopy(rs);
3506     }
3507 
3508     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3509 
3510     /* try transferring iterative blocks of memory */
3511 
3512     /* flush all remaining blocks regardless of rate limiting */
3513     while (true) {
3514         int pages;
3515 
3516         pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3517         /* no more blocks to sent */
3518         if (pages == 0) {
3519             break;
3520         }
3521         if (pages < 0) {
3522             ret = pages;
3523             break;
3524         }
3525     }
3526 
3527     flush_compressed_data(rs);
3528     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3529 
3530     rcu_read_unlock();
3531 
3532     multifd_send_sync_main();
3533     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3534     qemu_fflush(f);
3535 
3536     return ret;
3537 }
3538 
3539 static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3540                              uint64_t *res_precopy_only,
3541                              uint64_t *res_compatible,
3542                              uint64_t *res_postcopy_only)
3543 {
3544     RAMState **temp = opaque;
3545     RAMState *rs = *temp;
3546     uint64_t remaining_size;
3547 
3548     remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3549 
3550     if (!migration_in_postcopy() &&
3551         remaining_size < max_size) {
3552         qemu_mutex_lock_iothread();
3553         rcu_read_lock();
3554         migration_bitmap_sync_precopy(rs);
3555         rcu_read_unlock();
3556         qemu_mutex_unlock_iothread();
3557         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3558     }
3559 
3560     if (migrate_postcopy_ram()) {
3561         /* We can do postcopy, and all the data is postcopiable */
3562         *res_compatible += remaining_size;
3563     } else {
3564         *res_precopy_only += remaining_size;
3565     }
3566 }
3567 
3568 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3569 {
3570     unsigned int xh_len;
3571     int xh_flags;
3572     uint8_t *loaded_data;
3573 
3574     /* extract RLE header */
3575     xh_flags = qemu_get_byte(f);
3576     xh_len = qemu_get_be16(f);
3577 
3578     if (xh_flags != ENCODING_FLAG_XBZRLE) {
3579         error_report("Failed to load XBZRLE page - wrong compression!");
3580         return -1;
3581     }
3582 
3583     if (xh_len > TARGET_PAGE_SIZE) {
3584         error_report("Failed to load XBZRLE page - len overflow!");
3585         return -1;
3586     }
3587     loaded_data = XBZRLE.decoded_buf;
3588     /* load data and decode */
3589     /* it can change loaded_data to point to an internal buffer */
3590     qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3591 
3592     /* decode RLE */
3593     if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3594                              TARGET_PAGE_SIZE) == -1) {
3595         error_report("Failed to load XBZRLE page - decode error!");
3596         return -1;
3597     }
3598 
3599     return 0;
3600 }
3601 
3602 /**
3603  * ram_block_from_stream: read a RAMBlock id from the migration stream
3604  *
3605  * Must be called from within a rcu critical section.
3606  *
3607  * Returns a pointer from within the RCU-protected ram_list.
3608  *
3609  * @f: QEMUFile where to read the data from
3610  * @flags: Page flags (mostly to see if it's a continuation of previous block)
3611  */
3612 static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3613 {
3614     static RAMBlock *block = NULL;
3615     char id[256];
3616     uint8_t len;
3617 
3618     if (flags & RAM_SAVE_FLAG_CONTINUE) {
3619         if (!block) {
3620             error_report("Ack, bad migration stream!");
3621             return NULL;
3622         }
3623         return block;
3624     }
3625 
3626     len = qemu_get_byte(f);
3627     qemu_get_buffer(f, (uint8_t *)id, len);
3628     id[len] = 0;
3629 
3630     block = qemu_ram_block_by_name(id);
3631     if (!block) {
3632         error_report("Can't find block %s", id);
3633         return NULL;
3634     }
3635 
3636     if (ramblock_is_ignored(block)) {
3637         error_report("block %s should not be migrated !", id);
3638         return NULL;
3639     }
3640 
3641     return block;
3642 }
3643 
3644 static inline void *host_from_ram_block_offset(RAMBlock *block,
3645                                                ram_addr_t offset)
3646 {
3647     if (!offset_in_ramblock(block, offset)) {
3648         return NULL;
3649     }
3650 
3651     return block->host + offset;
3652 }
3653 
3654 static inline void *colo_cache_from_block_offset(RAMBlock *block,
3655                                                  ram_addr_t offset)
3656 {
3657     if (!offset_in_ramblock(block, offset)) {
3658         return NULL;
3659     }
3660     if (!block->colo_cache) {
3661         error_report("%s: colo_cache is NULL in block :%s",
3662                      __func__, block->idstr);
3663         return NULL;
3664     }
3665 
3666     /*
3667     * During colo checkpoint, we need bitmap of these migrated pages.
3668     * It help us to decide which pages in ram cache should be flushed
3669     * into VM's RAM later.
3670     */
3671     if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3672         ram_state->migration_dirty_pages++;
3673     }
3674     return block->colo_cache + offset;
3675 }
3676 
3677 /**
3678  * ram_handle_compressed: handle the zero page case
3679  *
3680  * If a page (or a whole RDMA chunk) has been
3681  * determined to be zero, then zap it.
3682  *
3683  * @host: host address for the zero page
3684  * @ch: what the page is filled from.  We only support zero
3685  * @size: size of the zero page
3686  */
3687 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3688 {
3689     if (ch != 0 || !is_zero_range(host, size)) {
3690         memset(host, ch, size);
3691     }
3692 }
3693 
3694 /* return the size after decompression, or negative value on error */
3695 static int
3696 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3697                      const uint8_t *source, size_t source_len)
3698 {
3699     int err;
3700 
3701     err = inflateReset(stream);
3702     if (err != Z_OK) {
3703         return -1;
3704     }
3705 
3706     stream->avail_in = source_len;
3707     stream->next_in = (uint8_t *)source;
3708     stream->avail_out = dest_len;
3709     stream->next_out = dest;
3710 
3711     err = inflate(stream, Z_NO_FLUSH);
3712     if (err != Z_STREAM_END) {
3713         return -1;
3714     }
3715 
3716     return stream->total_out;
3717 }
3718 
3719 static void *do_data_decompress(void *opaque)
3720 {
3721     DecompressParam *param = opaque;
3722     unsigned long pagesize;
3723     uint8_t *des;
3724     int len, ret;
3725 
3726     qemu_mutex_lock(&param->mutex);
3727     while (!param->quit) {
3728         if (param->des) {
3729             des = param->des;
3730             len = param->len;
3731             param->des = 0;
3732             qemu_mutex_unlock(&param->mutex);
3733 
3734             pagesize = TARGET_PAGE_SIZE;
3735 
3736             ret = qemu_uncompress_data(&param->stream, des, pagesize,
3737                                        param->compbuf, len);
3738             if (ret < 0 && migrate_get_current()->decompress_error_check) {
3739                 error_report("decompress data failed");
3740                 qemu_file_set_error(decomp_file, ret);
3741             }
3742 
3743             qemu_mutex_lock(&decomp_done_lock);
3744             param->done = true;
3745             qemu_cond_signal(&decomp_done_cond);
3746             qemu_mutex_unlock(&decomp_done_lock);
3747 
3748             qemu_mutex_lock(&param->mutex);
3749         } else {
3750             qemu_cond_wait(&param->cond, &param->mutex);
3751         }
3752     }
3753     qemu_mutex_unlock(&param->mutex);
3754 
3755     return NULL;
3756 }
3757 
3758 static int wait_for_decompress_done(void)
3759 {
3760     int idx, thread_count;
3761 
3762     if (!migrate_use_compression()) {
3763         return 0;
3764     }
3765 
3766     thread_count = migrate_decompress_threads();
3767     qemu_mutex_lock(&decomp_done_lock);
3768     for (idx = 0; idx < thread_count; idx++) {
3769         while (!decomp_param[idx].done) {
3770             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3771         }
3772     }
3773     qemu_mutex_unlock(&decomp_done_lock);
3774     return qemu_file_get_error(decomp_file);
3775 }
3776 
3777 static void compress_threads_load_cleanup(void)
3778 {
3779     int i, thread_count;
3780 
3781     if (!migrate_use_compression()) {
3782         return;
3783     }
3784     thread_count = migrate_decompress_threads();
3785     for (i = 0; i < thread_count; i++) {
3786         /*
3787          * we use it as a indicator which shows if the thread is
3788          * properly init'd or not
3789          */
3790         if (!decomp_param[i].compbuf) {
3791             break;
3792         }
3793 
3794         qemu_mutex_lock(&decomp_param[i].mutex);
3795         decomp_param[i].quit = true;
3796         qemu_cond_signal(&decomp_param[i].cond);
3797         qemu_mutex_unlock(&decomp_param[i].mutex);
3798     }
3799     for (i = 0; i < thread_count; i++) {
3800         if (!decomp_param[i].compbuf) {
3801             break;
3802         }
3803 
3804         qemu_thread_join(decompress_threads + i);
3805         qemu_mutex_destroy(&decomp_param[i].mutex);
3806         qemu_cond_destroy(&decomp_param[i].cond);
3807         inflateEnd(&decomp_param[i].stream);
3808         g_free(decomp_param[i].compbuf);
3809         decomp_param[i].compbuf = NULL;
3810     }
3811     g_free(decompress_threads);
3812     g_free(decomp_param);
3813     decompress_threads = NULL;
3814     decomp_param = NULL;
3815     decomp_file = NULL;
3816 }
3817 
3818 static int compress_threads_load_setup(QEMUFile *f)
3819 {
3820     int i, thread_count;
3821 
3822     if (!migrate_use_compression()) {
3823         return 0;
3824     }
3825 
3826     thread_count = migrate_decompress_threads();
3827     decompress_threads = g_new0(QemuThread, thread_count);
3828     decomp_param = g_new0(DecompressParam, thread_count);
3829     qemu_mutex_init(&decomp_done_lock);
3830     qemu_cond_init(&decomp_done_cond);
3831     decomp_file = f;
3832     for (i = 0; i < thread_count; i++) {
3833         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3834             goto exit;
3835         }
3836 
3837         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3838         qemu_mutex_init(&decomp_param[i].mutex);
3839         qemu_cond_init(&decomp_param[i].cond);
3840         decomp_param[i].done = true;
3841         decomp_param[i].quit = false;
3842         qemu_thread_create(decompress_threads + i, "decompress",
3843                            do_data_decompress, decomp_param + i,
3844                            QEMU_THREAD_JOINABLE);
3845     }
3846     return 0;
3847 exit:
3848     compress_threads_load_cleanup();
3849     return -1;
3850 }
3851 
3852 static void decompress_data_with_multi_threads(QEMUFile *f,
3853                                                void *host, int len)
3854 {
3855     int idx, thread_count;
3856 
3857     thread_count = migrate_decompress_threads();
3858     qemu_mutex_lock(&decomp_done_lock);
3859     while (true) {
3860         for (idx = 0; idx < thread_count; idx++) {
3861             if (decomp_param[idx].done) {
3862                 decomp_param[idx].done = false;
3863                 qemu_mutex_lock(&decomp_param[idx].mutex);
3864                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3865                 decomp_param[idx].des = host;
3866                 decomp_param[idx].len = len;
3867                 qemu_cond_signal(&decomp_param[idx].cond);
3868                 qemu_mutex_unlock(&decomp_param[idx].mutex);
3869                 break;
3870             }
3871         }
3872         if (idx < thread_count) {
3873             break;
3874         } else {
3875             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3876         }
3877     }
3878     qemu_mutex_unlock(&decomp_done_lock);
3879 }
3880 
3881 /*
3882  * colo cache: this is for secondary VM, we cache the whole
3883  * memory of the secondary VM, it is need to hold the global lock
3884  * to call this helper.
3885  */
3886 int colo_init_ram_cache(void)
3887 {
3888     RAMBlock *block;
3889 
3890     rcu_read_lock();
3891     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3892         block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3893                                                 NULL,
3894                                                 false);
3895         if (!block->colo_cache) {
3896             error_report("%s: Can't alloc memory for COLO cache of block %s,"
3897                          "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3898                          block->used_length);
3899             goto out_locked;
3900         }
3901         memcpy(block->colo_cache, block->host, block->used_length);
3902     }
3903     rcu_read_unlock();
3904     /*
3905     * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3906     * with to decide which page in cache should be flushed into SVM's RAM. Here
3907     * we use the same name 'ram_bitmap' as for migration.
3908     */
3909     if (ram_bytes_total()) {
3910         RAMBlock *block;
3911 
3912         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3913             unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3914 
3915             block->bmap = bitmap_new(pages);
3916             bitmap_set(block->bmap, 0, pages);
3917         }
3918     }
3919     ram_state = g_new0(RAMState, 1);
3920     ram_state->migration_dirty_pages = 0;
3921     qemu_mutex_init(&ram_state->bitmap_mutex);
3922     memory_global_dirty_log_start();
3923 
3924     return 0;
3925 
3926 out_locked:
3927 
3928     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3929         if (block->colo_cache) {
3930             qemu_anon_ram_free(block->colo_cache, block->used_length);
3931             block->colo_cache = NULL;
3932         }
3933     }
3934 
3935     rcu_read_unlock();
3936     return -errno;
3937 }
3938 
3939 /* It is need to hold the global lock to call this helper */
3940 void colo_release_ram_cache(void)
3941 {
3942     RAMBlock *block;
3943 
3944     memory_global_dirty_log_stop();
3945     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3946         g_free(block->bmap);
3947         block->bmap = NULL;
3948     }
3949 
3950     rcu_read_lock();
3951 
3952     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3953         if (block->colo_cache) {
3954             qemu_anon_ram_free(block->colo_cache, block->used_length);
3955             block->colo_cache = NULL;
3956         }
3957     }
3958 
3959     rcu_read_unlock();
3960     qemu_mutex_destroy(&ram_state->bitmap_mutex);
3961     g_free(ram_state);
3962     ram_state = NULL;
3963 }
3964 
3965 /**
3966  * ram_load_setup: Setup RAM for migration incoming side
3967  *
3968  * Returns zero to indicate success and negative for error
3969  *
3970  * @f: QEMUFile where to receive the data
3971  * @opaque: RAMState pointer
3972  */
3973 static int ram_load_setup(QEMUFile *f, void *opaque)
3974 {
3975     if (compress_threads_load_setup(f)) {
3976         return -1;
3977     }
3978 
3979     xbzrle_load_setup();
3980     ramblock_recv_map_init();
3981 
3982     return 0;
3983 }
3984 
3985 static int ram_load_cleanup(void *opaque)
3986 {
3987     RAMBlock *rb;
3988 
3989     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3990         if (ramblock_is_pmem(rb)) {
3991             pmem_persist(rb->host, rb->used_length);
3992         }
3993     }
3994 
3995     xbzrle_load_cleanup();
3996     compress_threads_load_cleanup();
3997 
3998     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3999         g_free(rb->receivedmap);
4000         rb->receivedmap = NULL;
4001     }
4002 
4003     return 0;
4004 }
4005 
4006 /**
4007  * ram_postcopy_incoming_init: allocate postcopy data structures
4008  *
4009  * Returns 0 for success and negative if there was one error
4010  *
4011  * @mis: current migration incoming state
4012  *
4013  * Allocate data structures etc needed by incoming migration with
4014  * postcopy-ram. postcopy-ram's similarly names
4015  * postcopy_ram_incoming_init does the work.
4016  */
4017 int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4018 {
4019     return postcopy_ram_incoming_init(mis);
4020 }
4021 
4022 /**
4023  * ram_load_postcopy: load a page in postcopy case
4024  *
4025  * Returns 0 for success or -errno in case of error
4026  *
4027  * Called in postcopy mode by ram_load().
4028  * rcu_read_lock is taken prior to this being called.
4029  *
4030  * @f: QEMUFile where to send the data
4031  */
4032 static int ram_load_postcopy(QEMUFile *f)
4033 {
4034     int flags = 0, ret = 0;
4035     bool place_needed = false;
4036     bool matches_target_page_size = false;
4037     MigrationIncomingState *mis = migration_incoming_get_current();
4038     /* Temporary page that is later 'placed' */
4039     void *postcopy_host_page = postcopy_get_tmp_page(mis);
4040     void *last_host = NULL;
4041     bool all_zero = false;
4042 
4043     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4044         ram_addr_t addr;
4045         void *host = NULL;
4046         void *page_buffer = NULL;
4047         void *place_source = NULL;
4048         RAMBlock *block = NULL;
4049         uint8_t ch;
4050 
4051         addr = qemu_get_be64(f);
4052 
4053         /*
4054          * If qemu file error, we should stop here, and then "addr"
4055          * may be invalid
4056          */
4057         ret = qemu_file_get_error(f);
4058         if (ret) {
4059             break;
4060         }
4061 
4062         flags = addr & ~TARGET_PAGE_MASK;
4063         addr &= TARGET_PAGE_MASK;
4064 
4065         trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4066         place_needed = false;
4067         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
4068             block = ram_block_from_stream(f, flags);
4069 
4070             host = host_from_ram_block_offset(block, addr);
4071             if (!host) {
4072                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4073                 ret = -EINVAL;
4074                 break;
4075             }
4076             matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4077             /*
4078              * Postcopy requires that we place whole host pages atomically;
4079              * these may be huge pages for RAMBlocks that are backed by
4080              * hugetlbfs.
4081              * To make it atomic, the data is read into a temporary page
4082              * that's moved into place later.
4083              * The migration protocol uses,  possibly smaller, target-pages
4084              * however the source ensures it always sends all the components
4085              * of a host page in order.
4086              */
4087             page_buffer = postcopy_host_page +
4088                           ((uintptr_t)host & (block->page_size - 1));
4089             /* If all TP are zero then we can optimise the place */
4090             if (!((uintptr_t)host & (block->page_size - 1))) {
4091                 all_zero = true;
4092             } else {
4093                 /* not the 1st TP within the HP */
4094                 if (host != (last_host + TARGET_PAGE_SIZE)) {
4095                     error_report("Non-sequential target page %p/%p",
4096                                   host, last_host);
4097                     ret = -EINVAL;
4098                     break;
4099                 }
4100             }
4101 
4102 
4103             /*
4104              * If it's the last part of a host page then we place the host
4105              * page
4106              */
4107             place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
4108                                      (block->page_size - 1)) == 0;
4109             place_source = postcopy_host_page;
4110         }
4111         last_host = host;
4112 
4113         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4114         case RAM_SAVE_FLAG_ZERO:
4115             ch = qemu_get_byte(f);
4116             memset(page_buffer, ch, TARGET_PAGE_SIZE);
4117             if (ch) {
4118                 all_zero = false;
4119             }
4120             break;
4121 
4122         case RAM_SAVE_FLAG_PAGE:
4123             all_zero = false;
4124             if (!matches_target_page_size) {
4125                 /* For huge pages, we always use temporary buffer */
4126                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4127             } else {
4128                 /*
4129                  * For small pages that matches target page size, we
4130                  * avoid the qemu_file copy.  Instead we directly use
4131                  * the buffer of QEMUFile to place the page.  Note: we
4132                  * cannot do any QEMUFile operation before using that
4133                  * buffer to make sure the buffer is valid when
4134                  * placing the page.
4135                  */
4136                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4137                                          TARGET_PAGE_SIZE);
4138             }
4139             break;
4140         case RAM_SAVE_FLAG_EOS:
4141             /* normal exit */
4142             multifd_recv_sync_main();
4143             break;
4144         default:
4145             error_report("Unknown combination of migration flags: %#x"
4146                          " (postcopy mode)", flags);
4147             ret = -EINVAL;
4148             break;
4149         }
4150 
4151         /* Detect for any possible file errors */
4152         if (!ret && qemu_file_get_error(f)) {
4153             ret = qemu_file_get_error(f);
4154         }
4155 
4156         if (!ret && place_needed) {
4157             /* This gets called at the last target page in the host page */
4158             void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
4159 
4160             if (all_zero) {
4161                 ret = postcopy_place_page_zero(mis, place_dest,
4162                                                block);
4163             } else {
4164                 ret = postcopy_place_page(mis, place_dest,
4165                                           place_source, block);
4166             }
4167         }
4168     }
4169 
4170     return ret;
4171 }
4172 
4173 static bool postcopy_is_advised(void)
4174 {
4175     PostcopyState ps = postcopy_state_get();
4176     return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4177 }
4178 
4179 static bool postcopy_is_running(void)
4180 {
4181     PostcopyState ps = postcopy_state_get();
4182     return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4183 }
4184 
4185 /*
4186  * Flush content of RAM cache into SVM's memory.
4187  * Only flush the pages that be dirtied by PVM or SVM or both.
4188  */
4189 static void colo_flush_ram_cache(void)
4190 {
4191     RAMBlock *block = NULL;
4192     void *dst_host;
4193     void *src_host;
4194     unsigned long offset = 0;
4195 
4196     memory_global_dirty_log_sync();
4197     rcu_read_lock();
4198     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4199         migration_bitmap_sync_range(ram_state, block, 0, block->used_length);
4200     }
4201     rcu_read_unlock();
4202 
4203     trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4204     rcu_read_lock();
4205     block = QLIST_FIRST_RCU(&ram_list.blocks);
4206 
4207     while (block) {
4208         offset = migration_bitmap_find_dirty(ram_state, block, offset);
4209 
4210         if (offset << TARGET_PAGE_BITS >= block->used_length) {
4211             offset = 0;
4212             block = QLIST_NEXT_RCU(block, next);
4213         } else {
4214             migration_bitmap_clear_dirty(ram_state, block, offset);
4215             dst_host = block->host + (offset << TARGET_PAGE_BITS);
4216             src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4217             memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4218         }
4219     }
4220 
4221     rcu_read_unlock();
4222     trace_colo_flush_ram_cache_end();
4223 }
4224 
4225 static int ram_load(QEMUFile *f, void *opaque, int version_id)
4226 {
4227     int flags = 0, ret = 0, invalid_flags = 0;
4228     static uint64_t seq_iter;
4229     int len = 0;
4230     /*
4231      * If system is running in postcopy mode, page inserts to host memory must
4232      * be atomic
4233      */
4234     bool postcopy_running = postcopy_is_running();
4235     /* ADVISE is earlier, it shows the source has the postcopy capability on */
4236     bool postcopy_advised = postcopy_is_advised();
4237 
4238     seq_iter++;
4239 
4240     if (version_id != 4) {
4241         ret = -EINVAL;
4242     }
4243 
4244     if (!migrate_use_compression()) {
4245         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4246     }
4247     /* This RCU critical section can be very long running.
4248      * When RCU reclaims in the code start to become numerous,
4249      * it will be necessary to reduce the granularity of this
4250      * critical section.
4251      */
4252     rcu_read_lock();
4253 
4254     if (postcopy_running) {
4255         ret = ram_load_postcopy(f);
4256     }
4257 
4258     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4259         ram_addr_t addr, total_ram_bytes;
4260         void *host = NULL;
4261         uint8_t ch;
4262 
4263         addr = qemu_get_be64(f);
4264         flags = addr & ~TARGET_PAGE_MASK;
4265         addr &= TARGET_PAGE_MASK;
4266 
4267         if (flags & invalid_flags) {
4268             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4269                 error_report("Received an unexpected compressed page");
4270             }
4271 
4272             ret = -EINVAL;
4273             break;
4274         }
4275 
4276         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4277                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4278             RAMBlock *block = ram_block_from_stream(f, flags);
4279 
4280             /*
4281              * After going into COLO, we should load the Page into colo_cache.
4282              */
4283             if (migration_incoming_in_colo_state()) {
4284                 host = colo_cache_from_block_offset(block, addr);
4285             } else {
4286                 host = host_from_ram_block_offset(block, addr);
4287             }
4288             if (!host) {
4289                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4290                 ret = -EINVAL;
4291                 break;
4292             }
4293 
4294             if (!migration_incoming_in_colo_state()) {
4295                 ramblock_recv_bitmap_set(block, host);
4296             }
4297 
4298             trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4299         }
4300 
4301         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4302         case RAM_SAVE_FLAG_MEM_SIZE:
4303             /* Synchronize RAM block list */
4304             total_ram_bytes = addr;
4305             while (!ret && total_ram_bytes) {
4306                 RAMBlock *block;
4307                 char id[256];
4308                 ram_addr_t length;
4309 
4310                 len = qemu_get_byte(f);
4311                 qemu_get_buffer(f, (uint8_t *)id, len);
4312                 id[len] = 0;
4313                 length = qemu_get_be64(f);
4314 
4315                 block = qemu_ram_block_by_name(id);
4316                 if (block && !qemu_ram_is_migratable(block)) {
4317                     error_report("block %s should not be migrated !", id);
4318                     ret = -EINVAL;
4319                 } else if (block) {
4320                     if (length != block->used_length) {
4321                         Error *local_err = NULL;
4322 
4323                         ret = qemu_ram_resize(block, length,
4324                                               &local_err);
4325                         if (local_err) {
4326                             error_report_err(local_err);
4327                         }
4328                     }
4329                     /* For postcopy we need to check hugepage sizes match */
4330                     if (postcopy_advised &&
4331                         block->page_size != qemu_host_page_size) {
4332                         uint64_t remote_page_size = qemu_get_be64(f);
4333                         if (remote_page_size != block->page_size) {
4334                             error_report("Mismatched RAM page size %s "
4335                                          "(local) %zd != %" PRId64,
4336                                          id, block->page_size,
4337                                          remote_page_size);
4338                             ret = -EINVAL;
4339                         }
4340                     }
4341                     if (migrate_ignore_shared()) {
4342                         hwaddr addr = qemu_get_be64(f);
4343                         bool ignored = qemu_get_byte(f);
4344                         if (ignored != ramblock_is_ignored(block)) {
4345                             error_report("RAM block %s should %s be migrated",
4346                                          id, ignored ? "" : "not");
4347                             ret = -EINVAL;
4348                         }
4349                         if (ramblock_is_ignored(block) &&
4350                             block->mr->addr != addr) {
4351                             error_report("Mismatched GPAs for block %s "
4352                                          "%" PRId64 "!= %" PRId64,
4353                                          id, (uint64_t)addr,
4354                                          (uint64_t)block->mr->addr);
4355                             ret = -EINVAL;
4356                         }
4357                     }
4358                     ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4359                                           block->idstr);
4360                 } else {
4361                     error_report("Unknown ramblock \"%s\", cannot "
4362                                  "accept migration", id);
4363                     ret = -EINVAL;
4364                 }
4365 
4366                 total_ram_bytes -= length;
4367             }
4368             break;
4369 
4370         case RAM_SAVE_FLAG_ZERO:
4371             ch = qemu_get_byte(f);
4372             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4373             break;
4374 
4375         case RAM_SAVE_FLAG_PAGE:
4376             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4377             break;
4378 
4379         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4380             len = qemu_get_be32(f);
4381             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4382                 error_report("Invalid compressed data length: %d", len);
4383                 ret = -EINVAL;
4384                 break;
4385             }
4386             decompress_data_with_multi_threads(f, host, len);
4387             break;
4388 
4389         case RAM_SAVE_FLAG_XBZRLE:
4390             if (load_xbzrle(f, addr, host) < 0) {
4391                 error_report("Failed to decompress XBZRLE page at "
4392                              RAM_ADDR_FMT, addr);
4393                 ret = -EINVAL;
4394                 break;
4395             }
4396             break;
4397         case RAM_SAVE_FLAG_EOS:
4398             /* normal exit */
4399             multifd_recv_sync_main();
4400             break;
4401         default:
4402             if (flags & RAM_SAVE_FLAG_HOOK) {
4403                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4404             } else {
4405                 error_report("Unknown combination of migration flags: %#x",
4406                              flags);
4407                 ret = -EINVAL;
4408             }
4409         }
4410         if (!ret) {
4411             ret = qemu_file_get_error(f);
4412         }
4413     }
4414 
4415     ret |= wait_for_decompress_done();
4416     rcu_read_unlock();
4417     trace_ram_load_complete(ret, seq_iter);
4418 
4419     if (!ret  && migration_incoming_in_colo_state()) {
4420         colo_flush_ram_cache();
4421     }
4422     return ret;
4423 }
4424 
4425 static bool ram_has_postcopy(void *opaque)
4426 {
4427     RAMBlock *rb;
4428     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4429         if (ramblock_is_pmem(rb)) {
4430             info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4431                          "is not supported now!", rb->idstr, rb->host);
4432             return false;
4433         }
4434     }
4435 
4436     return migrate_postcopy_ram();
4437 }
4438 
4439 /* Sync all the dirty bitmap with destination VM.  */
4440 static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4441 {
4442     RAMBlock *block;
4443     QEMUFile *file = s->to_dst_file;
4444     int ramblock_count = 0;
4445 
4446     trace_ram_dirty_bitmap_sync_start();
4447 
4448     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4449         qemu_savevm_send_recv_bitmap(file, block->idstr);
4450         trace_ram_dirty_bitmap_request(block->idstr);
4451         ramblock_count++;
4452     }
4453 
4454     trace_ram_dirty_bitmap_sync_wait();
4455 
4456     /* Wait until all the ramblocks' dirty bitmap synced */
4457     while (ramblock_count--) {
4458         qemu_sem_wait(&s->rp_state.rp_sem);
4459     }
4460 
4461     trace_ram_dirty_bitmap_sync_complete();
4462 
4463     return 0;
4464 }
4465 
4466 static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4467 {
4468     qemu_sem_post(&s->rp_state.rp_sem);
4469 }
4470 
4471 /*
4472  * Read the received bitmap, revert it as the initial dirty bitmap.
4473  * This is only used when the postcopy migration is paused but wants
4474  * to resume from a middle point.
4475  */
4476 int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4477 {
4478     int ret = -EINVAL;
4479     QEMUFile *file = s->rp_state.from_dst_file;
4480     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4481     uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4482     uint64_t size, end_mark;
4483 
4484     trace_ram_dirty_bitmap_reload_begin(block->idstr);
4485 
4486     if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4487         error_report("%s: incorrect state %s", __func__,
4488                      MigrationStatus_str(s->state));
4489         return -EINVAL;
4490     }
4491 
4492     /*
4493      * Note: see comments in ramblock_recv_bitmap_send() on why we
4494      * need the endianess convertion, and the paddings.
4495      */
4496     local_size = ROUND_UP(local_size, 8);
4497 
4498     /* Add paddings */
4499     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4500 
4501     size = qemu_get_be64(file);
4502 
4503     /* The size of the bitmap should match with our ramblock */
4504     if (size != local_size) {
4505         error_report("%s: ramblock '%s' bitmap size mismatch "
4506                      "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4507                      block->idstr, size, local_size);
4508         ret = -EINVAL;
4509         goto out;
4510     }
4511 
4512     size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4513     end_mark = qemu_get_be64(file);
4514 
4515     ret = qemu_file_get_error(file);
4516     if (ret || size != local_size) {
4517         error_report("%s: read bitmap failed for ramblock '%s': %d"
4518                      " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4519                      __func__, block->idstr, ret, local_size, size);
4520         ret = -EIO;
4521         goto out;
4522     }
4523 
4524     if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4525         error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4526                      __func__, block->idstr, end_mark);
4527         ret = -EINVAL;
4528         goto out;
4529     }
4530 
4531     /*
4532      * Endianess convertion. We are during postcopy (though paused).
4533      * The dirty bitmap won't change. We can directly modify it.
4534      */
4535     bitmap_from_le(block->bmap, le_bitmap, nbits);
4536 
4537     /*
4538      * What we received is "received bitmap". Revert it as the initial
4539      * dirty bitmap for this ramblock.
4540      */
4541     bitmap_complement(block->bmap, block->bmap, nbits);
4542 
4543     trace_ram_dirty_bitmap_reload_complete(block->idstr);
4544 
4545     /*
4546      * We succeeded to sync bitmap for current ramblock. If this is
4547      * the last one to sync, we need to notify the main send thread.
4548      */
4549     ram_dirty_bitmap_reload_notify(s);
4550 
4551     ret = 0;
4552 out:
4553     g_free(le_bitmap);
4554     return ret;
4555 }
4556 
4557 static int ram_resume_prepare(MigrationState *s, void *opaque)
4558 {
4559     RAMState *rs = *(RAMState **)opaque;
4560     int ret;
4561 
4562     ret = ram_dirty_bitmap_sync_all(s, rs);
4563     if (ret) {
4564         return ret;
4565     }
4566 
4567     ram_state_resume_prepare(rs, s->to_dst_file);
4568 
4569     return 0;
4570 }
4571 
4572 static SaveVMHandlers savevm_ram_handlers = {
4573     .save_setup = ram_save_setup,
4574     .save_live_iterate = ram_save_iterate,
4575     .save_live_complete_postcopy = ram_save_complete,
4576     .save_live_complete_precopy = ram_save_complete,
4577     .has_postcopy = ram_has_postcopy,
4578     .save_live_pending = ram_save_pending,
4579     .load_state = ram_load,
4580     .save_cleanup = ram_save_cleanup,
4581     .load_setup = ram_load_setup,
4582     .load_cleanup = ram_load_cleanup,
4583     .resume_prepare = ram_resume_prepare,
4584 };
4585 
4586 void ram_mig_init(void)
4587 {
4588     qemu_mutex_init(&XBZRLE.lock);
4589     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, &ram_state);
4590 }
4591