xref: /qemu/migration/ram.c (revision 71920809)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "cpu.h"
31 #include <zlib.h>
32 #include "qemu/cutils.h"
33 #include "qemu/bitops.h"
34 #include "qemu/bitmap.h"
35 #include "qemu/main-loop.h"
36 #include "xbzrle.h"
37 #include "ram.h"
38 #include "migration.h"
39 #include "socket.h"
40 #include "migration/register.h"
41 #include "migration/misc.h"
42 #include "qemu-file.h"
43 #include "postcopy-ram.h"
44 #include "page_cache.h"
45 #include "qemu/error-report.h"
46 #include "qapi/error.h"
47 #include "qapi/qapi-events-migration.h"
48 #include "qapi/qmp/qerror.h"
49 #include "trace.h"
50 #include "exec/ram_addr.h"
51 #include "exec/target_page.h"
52 #include "qemu/rcu_queue.h"
53 #include "migration/colo.h"
54 #include "block.h"
55 #include "sysemu/sysemu.h"
56 #include "qemu/uuid.h"
57 #include "savevm.h"
58 #include "qemu/iov.h"
59 
60 /***********************************************************/
61 /* ram save/restore */
62 
63 /* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
64  * worked for pages that where filled with the same char.  We switched
65  * it to only search for the zero value.  And to avoid confusion with
66  * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
67  */
68 
69 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
70 #define RAM_SAVE_FLAG_ZERO     0x02
71 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
72 #define RAM_SAVE_FLAG_PAGE     0x08
73 #define RAM_SAVE_FLAG_EOS      0x10
74 #define RAM_SAVE_FLAG_CONTINUE 0x20
75 #define RAM_SAVE_FLAG_XBZRLE   0x40
76 /* 0x80 is reserved in migration.h start with 0x100 next */
77 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
78 
79 static inline bool is_zero_range(uint8_t *p, uint64_t size)
80 {
81     return buffer_is_zero(p, size);
82 }
83 
84 XBZRLECacheStats xbzrle_counters;
85 
86 /* struct contains XBZRLE cache and a static page
87    used by the compression */
88 static struct {
89     /* buffer used for XBZRLE encoding */
90     uint8_t *encoded_buf;
91     /* buffer for storing page content */
92     uint8_t *current_buf;
93     /* Cache for XBZRLE, Protected by lock. */
94     PageCache *cache;
95     QemuMutex lock;
96     /* it will store a page full of zeros */
97     uint8_t *zero_target_page;
98     /* buffer used for XBZRLE decoding */
99     uint8_t *decoded_buf;
100 } XBZRLE;
101 
102 static void XBZRLE_cache_lock(void)
103 {
104     if (migrate_use_xbzrle())
105         qemu_mutex_lock(&XBZRLE.lock);
106 }
107 
108 static void XBZRLE_cache_unlock(void)
109 {
110     if (migrate_use_xbzrle())
111         qemu_mutex_unlock(&XBZRLE.lock);
112 }
113 
114 /**
115  * xbzrle_cache_resize: resize the xbzrle cache
116  *
117  * This function is called from qmp_migrate_set_cache_size in main
118  * thread, possibly while a migration is in progress.  A running
119  * migration may be using the cache and might finish during this call,
120  * hence changes to the cache are protected by XBZRLE.lock().
121  *
122  * Returns 0 for success or -1 for error
123  *
124  * @new_size: new cache size
125  * @errp: set *errp if the check failed, with reason
126  */
127 int xbzrle_cache_resize(int64_t new_size, Error **errp)
128 {
129     PageCache *new_cache;
130     int64_t ret = 0;
131 
132     /* Check for truncation */
133     if (new_size != (size_t)new_size) {
134         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
135                    "exceeding address space");
136         return -1;
137     }
138 
139     if (new_size == migrate_xbzrle_cache_size()) {
140         /* nothing to do */
141         return 0;
142     }
143 
144     XBZRLE_cache_lock();
145 
146     if (XBZRLE.cache != NULL) {
147         new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
148         if (!new_cache) {
149             ret = -1;
150             goto out;
151         }
152 
153         cache_fini(XBZRLE.cache);
154         XBZRLE.cache = new_cache;
155     }
156 out:
157     XBZRLE_cache_unlock();
158     return ret;
159 }
160 
161 static bool ramblock_is_ignored(RAMBlock *block)
162 {
163     return !qemu_ram_is_migratable(block) ||
164            (migrate_ignore_shared() && qemu_ram_is_shared(block));
165 }
166 
167 /* Should be holding either ram_list.mutex, or the RCU lock. */
168 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
169     INTERNAL_RAMBLOCK_FOREACH(block)                   \
170         if (ramblock_is_ignored(block)) {} else
171 
172 #define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
173     INTERNAL_RAMBLOCK_FOREACH(block)                   \
174         if (!qemu_ram_is_migratable(block)) {} else
175 
176 #undef RAMBLOCK_FOREACH
177 
178 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
179 {
180     RAMBlock *block;
181     int ret = 0;
182 
183     RCU_READ_LOCK_GUARD();
184 
185     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
186         ret = func(block, opaque);
187         if (ret) {
188             break;
189         }
190     }
191     return ret;
192 }
193 
194 static void ramblock_recv_map_init(void)
195 {
196     RAMBlock *rb;
197 
198     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
199         assert(!rb->receivedmap);
200         rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
201     }
202 }
203 
204 int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
205 {
206     return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
207                     rb->receivedmap);
208 }
209 
210 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
211 {
212     return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
213 }
214 
215 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
216 {
217     set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
218 }
219 
220 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
221                                     size_t nr)
222 {
223     bitmap_set_atomic(rb->receivedmap,
224                       ramblock_recv_bitmap_offset(host_addr, rb),
225                       nr);
226 }
227 
228 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
229 
230 /*
231  * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
232  *
233  * Returns >0 if success with sent bytes, or <0 if error.
234  */
235 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
236                                   const char *block_name)
237 {
238     RAMBlock *block = qemu_ram_block_by_name(block_name);
239     unsigned long *le_bitmap, nbits;
240     uint64_t size;
241 
242     if (!block) {
243         error_report("%s: invalid block name: %s", __func__, block_name);
244         return -1;
245     }
246 
247     nbits = block->used_length >> TARGET_PAGE_BITS;
248 
249     /*
250      * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
251      * machines we may need 4 more bytes for padding (see below
252      * comment). So extend it a bit before hand.
253      */
254     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
255 
256     /*
257      * Always use little endian when sending the bitmap. This is
258      * required that when source and destination VMs are not using the
259      * same endianess. (Note: big endian won't work.)
260      */
261     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
262 
263     /* Size of the bitmap, in bytes */
264     size = DIV_ROUND_UP(nbits, 8);
265 
266     /*
267      * size is always aligned to 8 bytes for 64bit machines, but it
268      * may not be true for 32bit machines. We need this padding to
269      * make sure the migration can survive even between 32bit and
270      * 64bit machines.
271      */
272     size = ROUND_UP(size, 8);
273 
274     qemu_put_be64(file, size);
275     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
276     /*
277      * Mark as an end, in case the middle part is screwed up due to
278      * some "misterious" reason.
279      */
280     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
281     qemu_fflush(file);
282 
283     g_free(le_bitmap);
284 
285     if (qemu_file_get_error(file)) {
286         return qemu_file_get_error(file);
287     }
288 
289     return size + sizeof(size);
290 }
291 
292 /*
293  * An outstanding page request, on the source, having been received
294  * and queued
295  */
296 struct RAMSrcPageRequest {
297     RAMBlock *rb;
298     hwaddr    offset;
299     hwaddr    len;
300 
301     QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
302 };
303 
304 /* State of RAM for migration */
305 struct RAMState {
306     /* QEMUFile used for this migration */
307     QEMUFile *f;
308     /* Last block that we have visited searching for dirty pages */
309     RAMBlock *last_seen_block;
310     /* Last block from where we have sent data */
311     RAMBlock *last_sent_block;
312     /* Last dirty target page we have sent */
313     ram_addr_t last_page;
314     /* last ram version we have seen */
315     uint32_t last_version;
316     /* We are in the first round */
317     bool ram_bulk_stage;
318     /* The free page optimization is enabled */
319     bool fpo_enabled;
320     /* How many times we have dirty too many pages */
321     int dirty_rate_high_cnt;
322     /* these variables are used for bitmap sync */
323     /* last time we did a full bitmap_sync */
324     int64_t time_last_bitmap_sync;
325     /* bytes transferred at start_time */
326     uint64_t bytes_xfer_prev;
327     /* number of dirty pages since start_time */
328     uint64_t num_dirty_pages_period;
329     /* xbzrle misses since the beginning of the period */
330     uint64_t xbzrle_cache_miss_prev;
331 
332     /* compression statistics since the beginning of the period */
333     /* amount of count that no free thread to compress data */
334     uint64_t compress_thread_busy_prev;
335     /* amount bytes after compression */
336     uint64_t compressed_size_prev;
337     /* amount of compressed pages */
338     uint64_t compress_pages_prev;
339 
340     /* total handled target pages at the beginning of period */
341     uint64_t target_page_count_prev;
342     /* total handled target pages since start */
343     uint64_t target_page_count;
344     /* number of dirty bits in the bitmap */
345     uint64_t migration_dirty_pages;
346     /* Protects modification of the bitmap and migration dirty pages */
347     QemuMutex bitmap_mutex;
348     /* The RAMBlock used in the last src_page_requests */
349     RAMBlock *last_req_rb;
350     /* Queue of outstanding page requests from the destination */
351     QemuMutex src_page_req_mutex;
352     QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
353 };
354 typedef struct RAMState RAMState;
355 
356 static RAMState *ram_state;
357 
358 static NotifierWithReturnList precopy_notifier_list;
359 
360 void precopy_infrastructure_init(void)
361 {
362     notifier_with_return_list_init(&precopy_notifier_list);
363 }
364 
365 void precopy_add_notifier(NotifierWithReturn *n)
366 {
367     notifier_with_return_list_add(&precopy_notifier_list, n);
368 }
369 
370 void precopy_remove_notifier(NotifierWithReturn *n)
371 {
372     notifier_with_return_remove(n);
373 }
374 
375 int precopy_notify(PrecopyNotifyReason reason, Error **errp)
376 {
377     PrecopyNotifyData pnd;
378     pnd.reason = reason;
379     pnd.errp = errp;
380 
381     return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
382 }
383 
384 void precopy_enable_free_page_optimization(void)
385 {
386     if (!ram_state) {
387         return;
388     }
389 
390     ram_state->fpo_enabled = true;
391 }
392 
393 uint64_t ram_bytes_remaining(void)
394 {
395     return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
396                        0;
397 }
398 
399 MigrationStats ram_counters;
400 
401 /* used by the search for pages to send */
402 struct PageSearchStatus {
403     /* Current block being searched */
404     RAMBlock    *block;
405     /* Current page to search from */
406     unsigned long page;
407     /* Set once we wrap around */
408     bool         complete_round;
409 };
410 typedef struct PageSearchStatus PageSearchStatus;
411 
412 CompressionStats compression_counters;
413 
414 struct CompressParam {
415     bool done;
416     bool quit;
417     bool zero_page;
418     QEMUFile *file;
419     QemuMutex mutex;
420     QemuCond cond;
421     RAMBlock *block;
422     ram_addr_t offset;
423 
424     /* internally used fields */
425     z_stream stream;
426     uint8_t *originbuf;
427 };
428 typedef struct CompressParam CompressParam;
429 
430 struct DecompressParam {
431     bool done;
432     bool quit;
433     QemuMutex mutex;
434     QemuCond cond;
435     void *des;
436     uint8_t *compbuf;
437     int len;
438     z_stream stream;
439 };
440 typedef struct DecompressParam DecompressParam;
441 
442 static CompressParam *comp_param;
443 static QemuThread *compress_threads;
444 /* comp_done_cond is used to wake up the migration thread when
445  * one of the compression threads has finished the compression.
446  * comp_done_lock is used to co-work with comp_done_cond.
447  */
448 static QemuMutex comp_done_lock;
449 static QemuCond comp_done_cond;
450 /* The empty QEMUFileOps will be used by file in CompressParam */
451 static const QEMUFileOps empty_ops = { };
452 
453 static QEMUFile *decomp_file;
454 static DecompressParam *decomp_param;
455 static QemuThread *decompress_threads;
456 static QemuMutex decomp_done_lock;
457 static QemuCond decomp_done_cond;
458 
459 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
460                                  ram_addr_t offset, uint8_t *source_buf);
461 
462 static void *do_data_compress(void *opaque)
463 {
464     CompressParam *param = opaque;
465     RAMBlock *block;
466     ram_addr_t offset;
467     bool zero_page;
468 
469     qemu_mutex_lock(&param->mutex);
470     while (!param->quit) {
471         if (param->block) {
472             block = param->block;
473             offset = param->offset;
474             param->block = NULL;
475             qemu_mutex_unlock(&param->mutex);
476 
477             zero_page = do_compress_ram_page(param->file, &param->stream,
478                                              block, offset, param->originbuf);
479 
480             qemu_mutex_lock(&comp_done_lock);
481             param->done = true;
482             param->zero_page = zero_page;
483             qemu_cond_signal(&comp_done_cond);
484             qemu_mutex_unlock(&comp_done_lock);
485 
486             qemu_mutex_lock(&param->mutex);
487         } else {
488             qemu_cond_wait(&param->cond, &param->mutex);
489         }
490     }
491     qemu_mutex_unlock(&param->mutex);
492 
493     return NULL;
494 }
495 
496 static void compress_threads_save_cleanup(void)
497 {
498     int i, thread_count;
499 
500     if (!migrate_use_compression() || !comp_param) {
501         return;
502     }
503 
504     thread_count = migrate_compress_threads();
505     for (i = 0; i < thread_count; i++) {
506         /*
507          * we use it as a indicator which shows if the thread is
508          * properly init'd or not
509          */
510         if (!comp_param[i].file) {
511             break;
512         }
513 
514         qemu_mutex_lock(&comp_param[i].mutex);
515         comp_param[i].quit = true;
516         qemu_cond_signal(&comp_param[i].cond);
517         qemu_mutex_unlock(&comp_param[i].mutex);
518 
519         qemu_thread_join(compress_threads + i);
520         qemu_mutex_destroy(&comp_param[i].mutex);
521         qemu_cond_destroy(&comp_param[i].cond);
522         deflateEnd(&comp_param[i].stream);
523         g_free(comp_param[i].originbuf);
524         qemu_fclose(comp_param[i].file);
525         comp_param[i].file = NULL;
526     }
527     qemu_mutex_destroy(&comp_done_lock);
528     qemu_cond_destroy(&comp_done_cond);
529     g_free(compress_threads);
530     g_free(comp_param);
531     compress_threads = NULL;
532     comp_param = NULL;
533 }
534 
535 static int compress_threads_save_setup(void)
536 {
537     int i, thread_count;
538 
539     if (!migrate_use_compression()) {
540         return 0;
541     }
542     thread_count = migrate_compress_threads();
543     compress_threads = g_new0(QemuThread, thread_count);
544     comp_param = g_new0(CompressParam, thread_count);
545     qemu_cond_init(&comp_done_cond);
546     qemu_mutex_init(&comp_done_lock);
547     for (i = 0; i < thread_count; i++) {
548         comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
549         if (!comp_param[i].originbuf) {
550             goto exit;
551         }
552 
553         if (deflateInit(&comp_param[i].stream,
554                         migrate_compress_level()) != Z_OK) {
555             g_free(comp_param[i].originbuf);
556             goto exit;
557         }
558 
559         /* comp_param[i].file is just used as a dummy buffer to save data,
560          * set its ops to empty.
561          */
562         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
563         comp_param[i].done = true;
564         comp_param[i].quit = false;
565         qemu_mutex_init(&comp_param[i].mutex);
566         qemu_cond_init(&comp_param[i].cond);
567         qemu_thread_create(compress_threads + i, "compress",
568                            do_data_compress, comp_param + i,
569                            QEMU_THREAD_JOINABLE);
570     }
571     return 0;
572 
573 exit:
574     compress_threads_save_cleanup();
575     return -1;
576 }
577 
578 /* Multiple fd's */
579 
580 #define MULTIFD_MAGIC 0x11223344U
581 #define MULTIFD_VERSION 1
582 
583 #define MULTIFD_FLAG_SYNC (1 << 0)
584 
585 /* This value needs to be a multiple of qemu_target_page_size() */
586 #define MULTIFD_PACKET_SIZE (512 * 1024)
587 
588 typedef struct {
589     uint32_t magic;
590     uint32_t version;
591     unsigned char uuid[16]; /* QemuUUID */
592     uint8_t id;
593     uint8_t unused1[7];     /* Reserved for future use */
594     uint64_t unused2[4];    /* Reserved for future use */
595 } __attribute__((packed)) MultiFDInit_t;
596 
597 typedef struct {
598     uint32_t magic;
599     uint32_t version;
600     uint32_t flags;
601     /* maximum number of allocated pages */
602     uint32_t pages_alloc;
603     uint32_t pages_used;
604     /* size of the next packet that contains pages */
605     uint32_t next_packet_size;
606     uint64_t packet_num;
607     uint64_t unused[4];    /* Reserved for future use */
608     char ramblock[256];
609     uint64_t offset[];
610 } __attribute__((packed)) MultiFDPacket_t;
611 
612 typedef struct {
613     /* number of used pages */
614     uint32_t used;
615     /* number of allocated pages */
616     uint32_t allocated;
617     /* global number of generated multifd packets */
618     uint64_t packet_num;
619     /* offset of each page */
620     ram_addr_t *offset;
621     /* pointer to each page */
622     struct iovec *iov;
623     RAMBlock *block;
624 } MultiFDPages_t;
625 
626 typedef struct {
627     /* this fields are not changed once the thread is created */
628     /* channel number */
629     uint8_t id;
630     /* channel thread name */
631     char *name;
632     /* channel thread id */
633     QemuThread thread;
634     /* communication channel */
635     QIOChannel *c;
636     /* sem where to wait for more work */
637     QemuSemaphore sem;
638     /* this mutex protects the following parameters */
639     QemuMutex mutex;
640     /* is this channel thread running */
641     bool running;
642     /* should this thread finish */
643     bool quit;
644     /* thread has work to do */
645     int pending_job;
646     /* array of pages to sent */
647     MultiFDPages_t *pages;
648     /* packet allocated len */
649     uint32_t packet_len;
650     /* pointer to the packet */
651     MultiFDPacket_t *packet;
652     /* multifd flags for each packet */
653     uint32_t flags;
654     /* size of the next packet that contains pages */
655     uint32_t next_packet_size;
656     /* global number of generated multifd packets */
657     uint64_t packet_num;
658     /* thread local variables */
659     /* packets sent through this channel */
660     uint64_t num_packets;
661     /* pages sent through this channel */
662     uint64_t num_pages;
663     /* syncs main thread and channels */
664     QemuSemaphore sem_sync;
665 }  MultiFDSendParams;
666 
667 typedef struct {
668     /* this fields are not changed once the thread is created */
669     /* channel number */
670     uint8_t id;
671     /* channel thread name */
672     char *name;
673     /* channel thread id */
674     QemuThread thread;
675     /* communication channel */
676     QIOChannel *c;
677     /* this mutex protects the following parameters */
678     QemuMutex mutex;
679     /* is this channel thread running */
680     bool running;
681     /* should this thread finish */
682     bool quit;
683     /* array of pages to receive */
684     MultiFDPages_t *pages;
685     /* packet allocated len */
686     uint32_t packet_len;
687     /* pointer to the packet */
688     MultiFDPacket_t *packet;
689     /* multifd flags for each packet */
690     uint32_t flags;
691     /* global number of generated multifd packets */
692     uint64_t packet_num;
693     /* thread local variables */
694     /* size of the next packet that contains pages */
695     uint32_t next_packet_size;
696     /* packets sent through this channel */
697     uint64_t num_packets;
698     /* pages sent through this channel */
699     uint64_t num_pages;
700     /* syncs main thread and channels */
701     QemuSemaphore sem_sync;
702 } MultiFDRecvParams;
703 
704 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
705 {
706     MultiFDInit_t msg = {};
707     int ret;
708 
709     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
710     msg.version = cpu_to_be32(MULTIFD_VERSION);
711     msg.id = p->id;
712     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
713 
714     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
715     if (ret != 0) {
716         return -1;
717     }
718     return 0;
719 }
720 
721 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
722 {
723     MultiFDInit_t msg;
724     int ret;
725 
726     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
727     if (ret != 0) {
728         return -1;
729     }
730 
731     msg.magic = be32_to_cpu(msg.magic);
732     msg.version = be32_to_cpu(msg.version);
733 
734     if (msg.magic != MULTIFD_MAGIC) {
735         error_setg(errp, "multifd: received packet magic %x "
736                    "expected %x", msg.magic, MULTIFD_MAGIC);
737         return -1;
738     }
739 
740     if (msg.version != MULTIFD_VERSION) {
741         error_setg(errp, "multifd: received packet version %d "
742                    "expected %d", msg.version, MULTIFD_VERSION);
743         return -1;
744     }
745 
746     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
747         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
748         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
749 
750         error_setg(errp, "multifd: received uuid '%s' and expected "
751                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
752         g_free(uuid);
753         g_free(msg_uuid);
754         return -1;
755     }
756 
757     if (msg.id > migrate_multifd_channels()) {
758         error_setg(errp, "multifd: received channel version %d "
759                    "expected %d", msg.version, MULTIFD_VERSION);
760         return -1;
761     }
762 
763     return msg.id;
764 }
765 
766 static MultiFDPages_t *multifd_pages_init(size_t size)
767 {
768     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
769 
770     pages->allocated = size;
771     pages->iov = g_new0(struct iovec, size);
772     pages->offset = g_new0(ram_addr_t, size);
773 
774     return pages;
775 }
776 
777 static void multifd_pages_clear(MultiFDPages_t *pages)
778 {
779     pages->used = 0;
780     pages->allocated = 0;
781     pages->packet_num = 0;
782     pages->block = NULL;
783     g_free(pages->iov);
784     pages->iov = NULL;
785     g_free(pages->offset);
786     pages->offset = NULL;
787     g_free(pages);
788 }
789 
790 static void multifd_send_fill_packet(MultiFDSendParams *p)
791 {
792     MultiFDPacket_t *packet = p->packet;
793     int i;
794 
795     packet->flags = cpu_to_be32(p->flags);
796     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
797     packet->pages_used = cpu_to_be32(p->pages->used);
798     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
799     packet->packet_num = cpu_to_be64(p->packet_num);
800 
801     if (p->pages->block) {
802         strncpy(packet->ramblock, p->pages->block->idstr, 256);
803     }
804 
805     for (i = 0; i < p->pages->used; i++) {
806         /* there are architectures where ram_addr_t is 32 bit */
807         uint64_t temp = p->pages->offset[i];
808 
809         packet->offset[i] = cpu_to_be64(temp);
810     }
811 }
812 
813 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
814 {
815     MultiFDPacket_t *packet = p->packet;
816     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
817     RAMBlock *block;
818     int i;
819 
820     packet->magic = be32_to_cpu(packet->magic);
821     if (packet->magic != MULTIFD_MAGIC) {
822         error_setg(errp, "multifd: received packet "
823                    "magic %x and expected magic %x",
824                    packet->magic, MULTIFD_MAGIC);
825         return -1;
826     }
827 
828     packet->version = be32_to_cpu(packet->version);
829     if (packet->version != MULTIFD_VERSION) {
830         error_setg(errp, "multifd: received packet "
831                    "version %d and expected version %d",
832                    packet->version, MULTIFD_VERSION);
833         return -1;
834     }
835 
836     p->flags = be32_to_cpu(packet->flags);
837 
838     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
839     /*
840      * If we received a packet that is 100 times bigger than expected
841      * just stop migration.  It is a magic number.
842      */
843     if (packet->pages_alloc > pages_max * 100) {
844         error_setg(errp, "multifd: received packet "
845                    "with size %d and expected a maximum size of %d",
846                    packet->pages_alloc, pages_max * 100) ;
847         return -1;
848     }
849     /*
850      * We received a packet that is bigger than expected but inside
851      * reasonable limits (see previous comment).  Just reallocate.
852      */
853     if (packet->pages_alloc > p->pages->allocated) {
854         multifd_pages_clear(p->pages);
855         p->pages = multifd_pages_init(packet->pages_alloc);
856     }
857 
858     p->pages->used = be32_to_cpu(packet->pages_used);
859     if (p->pages->used > packet->pages_alloc) {
860         error_setg(errp, "multifd: received packet "
861                    "with %d pages and expected maximum pages are %d",
862                    p->pages->used, packet->pages_alloc) ;
863         return -1;
864     }
865 
866     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
867     p->packet_num = be64_to_cpu(packet->packet_num);
868 
869     if (p->pages->used == 0) {
870         return 0;
871     }
872 
873     /* make sure that ramblock is 0 terminated */
874     packet->ramblock[255] = 0;
875     block = qemu_ram_block_by_name(packet->ramblock);
876     if (!block) {
877         error_setg(errp, "multifd: unknown ram block %s",
878                    packet->ramblock);
879         return -1;
880     }
881 
882     for (i = 0; i < p->pages->used; i++) {
883         uint64_t offset = be64_to_cpu(packet->offset[i]);
884 
885         if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
886             error_setg(errp, "multifd: offset too long %" PRIu64
887                        " (max " RAM_ADDR_FMT ")",
888                        offset, block->max_length);
889             return -1;
890         }
891         p->pages->iov[i].iov_base = block->host + offset;
892         p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
893     }
894 
895     return 0;
896 }
897 
898 struct {
899     MultiFDSendParams *params;
900     /* array of pages to sent */
901     MultiFDPages_t *pages;
902     /* global number of generated multifd packets */
903     uint64_t packet_num;
904     /* send channels ready */
905     QemuSemaphore channels_ready;
906     /*
907      * Have we already run terminate threads.  There is a race when it
908      * happens that we got one error while we are exiting.
909      * We will use atomic operations.  Only valid values are 0 and 1.
910      */
911     int exiting;
912 } *multifd_send_state;
913 
914 /*
915  * How we use multifd_send_state->pages and channel->pages?
916  *
917  * We create a pages for each channel, and a main one.  Each time that
918  * we need to send a batch of pages we interchange the ones between
919  * multifd_send_state and the channel that is sending it.  There are
920  * two reasons for that:
921  *    - to not have to do so many mallocs during migration
922  *    - to make easier to know what to free at the end of migration
923  *
924  * This way we always know who is the owner of each "pages" struct,
925  * and we don't need any locking.  It belongs to the migration thread
926  * or to the channel thread.  Switching is safe because the migration
927  * thread is using the channel mutex when changing it, and the channel
928  * have to had finish with its own, otherwise pending_job can't be
929  * false.
930  */
931 
932 static int multifd_send_pages(RAMState *rs)
933 {
934     int i;
935     static int next_channel;
936     MultiFDSendParams *p = NULL; /* make happy gcc */
937     MultiFDPages_t *pages = multifd_send_state->pages;
938     uint64_t transferred;
939 
940     if (atomic_read(&multifd_send_state->exiting)) {
941         return -1;
942     }
943 
944     qemu_sem_wait(&multifd_send_state->channels_ready);
945     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
946         p = &multifd_send_state->params[i];
947 
948         qemu_mutex_lock(&p->mutex);
949         if (p->quit) {
950             error_report("%s: channel %d has already quit!", __func__, i);
951             qemu_mutex_unlock(&p->mutex);
952             return -1;
953         }
954         if (!p->pending_job) {
955             p->pending_job++;
956             next_channel = (i + 1) % migrate_multifd_channels();
957             break;
958         }
959         qemu_mutex_unlock(&p->mutex);
960     }
961     assert(!p->pages->used);
962     assert(!p->pages->block);
963 
964     p->packet_num = multifd_send_state->packet_num++;
965     multifd_send_state->pages = p->pages;
966     p->pages = pages;
967     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
968     qemu_file_update_transfer(rs->f, transferred);
969     ram_counters.multifd_bytes += transferred;
970     ram_counters.transferred += transferred;;
971     qemu_mutex_unlock(&p->mutex);
972     qemu_sem_post(&p->sem);
973 
974     return 1;
975 }
976 
977 static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
978 {
979     MultiFDPages_t *pages = multifd_send_state->pages;
980 
981     if (!pages->block) {
982         pages->block = block;
983     }
984 
985     if (pages->block == block) {
986         pages->offset[pages->used] = offset;
987         pages->iov[pages->used].iov_base = block->host + offset;
988         pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
989         pages->used++;
990 
991         if (pages->used < pages->allocated) {
992             return 1;
993         }
994     }
995 
996     if (multifd_send_pages(rs) < 0) {
997         return -1;
998     }
999 
1000     if (pages->block != block) {
1001         return  multifd_queue_page(rs, block, offset);
1002     }
1003 
1004     return 1;
1005 }
1006 
1007 static void multifd_send_terminate_threads(Error *err)
1008 {
1009     int i;
1010 
1011     trace_multifd_send_terminate_threads(err != NULL);
1012 
1013     if (err) {
1014         MigrationState *s = migrate_get_current();
1015         migrate_set_error(s, err);
1016         if (s->state == MIGRATION_STATUS_SETUP ||
1017             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
1018             s->state == MIGRATION_STATUS_DEVICE ||
1019             s->state == MIGRATION_STATUS_ACTIVE) {
1020             migrate_set_state(&s->state, s->state,
1021                               MIGRATION_STATUS_FAILED);
1022         }
1023     }
1024 
1025     /*
1026      * We don't want to exit each threads twice.  Depending on where
1027      * we get the error, or if there are two independent errors in two
1028      * threads at the same time, we can end calling this function
1029      * twice.
1030      */
1031     if (atomic_xchg(&multifd_send_state->exiting, 1)) {
1032         return;
1033     }
1034 
1035     for (i = 0; i < migrate_multifd_channels(); i++) {
1036         MultiFDSendParams *p = &multifd_send_state->params[i];
1037 
1038         qemu_mutex_lock(&p->mutex);
1039         p->quit = true;
1040         qemu_sem_post(&p->sem);
1041         qemu_mutex_unlock(&p->mutex);
1042     }
1043 }
1044 
1045 void multifd_save_cleanup(void)
1046 {
1047     int i;
1048 
1049     if (!migrate_use_multifd()) {
1050         return;
1051     }
1052     multifd_send_terminate_threads(NULL);
1053     for (i = 0; i < migrate_multifd_channels(); i++) {
1054         MultiFDSendParams *p = &multifd_send_state->params[i];
1055 
1056         if (p->running) {
1057             qemu_thread_join(&p->thread);
1058         }
1059     }
1060     for (i = 0; i < migrate_multifd_channels(); i++) {
1061         MultiFDSendParams *p = &multifd_send_state->params[i];
1062 
1063         socket_send_channel_destroy(p->c);
1064         p->c = NULL;
1065         qemu_mutex_destroy(&p->mutex);
1066         qemu_sem_destroy(&p->sem);
1067         qemu_sem_destroy(&p->sem_sync);
1068         g_free(p->name);
1069         p->name = NULL;
1070         multifd_pages_clear(p->pages);
1071         p->pages = NULL;
1072         p->packet_len = 0;
1073         g_free(p->packet);
1074         p->packet = NULL;
1075     }
1076     qemu_sem_destroy(&multifd_send_state->channels_ready);
1077     g_free(multifd_send_state->params);
1078     multifd_send_state->params = NULL;
1079     multifd_pages_clear(multifd_send_state->pages);
1080     multifd_send_state->pages = NULL;
1081     g_free(multifd_send_state);
1082     multifd_send_state = NULL;
1083 }
1084 
1085 static void multifd_send_sync_main(RAMState *rs)
1086 {
1087     int i;
1088 
1089     if (!migrate_use_multifd()) {
1090         return;
1091     }
1092     if (multifd_send_state->pages->used) {
1093         if (multifd_send_pages(rs) < 0) {
1094             error_report("%s: multifd_send_pages fail", __func__);
1095             return;
1096         }
1097     }
1098     for (i = 0; i < migrate_multifd_channels(); i++) {
1099         MultiFDSendParams *p = &multifd_send_state->params[i];
1100 
1101         trace_multifd_send_sync_main_signal(p->id);
1102 
1103         qemu_mutex_lock(&p->mutex);
1104 
1105         if (p->quit) {
1106             error_report("%s: channel %d has already quit", __func__, i);
1107             qemu_mutex_unlock(&p->mutex);
1108             return;
1109         }
1110 
1111         p->packet_num = multifd_send_state->packet_num++;
1112         p->flags |= MULTIFD_FLAG_SYNC;
1113         p->pending_job++;
1114         qemu_file_update_transfer(rs->f, p->packet_len);
1115         ram_counters.multifd_bytes += p->packet_len;
1116         ram_counters.transferred += p->packet_len;
1117         qemu_mutex_unlock(&p->mutex);
1118         qemu_sem_post(&p->sem);
1119     }
1120     for (i = 0; i < migrate_multifd_channels(); i++) {
1121         MultiFDSendParams *p = &multifd_send_state->params[i];
1122 
1123         trace_multifd_send_sync_main_wait(p->id);
1124         qemu_sem_wait(&p->sem_sync);
1125     }
1126     trace_multifd_send_sync_main(multifd_send_state->packet_num);
1127 }
1128 
1129 static void *multifd_send_thread(void *opaque)
1130 {
1131     MultiFDSendParams *p = opaque;
1132     Error *local_err = NULL;
1133     int ret = 0;
1134     uint32_t flags = 0;
1135 
1136     trace_multifd_send_thread_start(p->id);
1137     rcu_register_thread();
1138 
1139     if (multifd_send_initial_packet(p, &local_err) < 0) {
1140         ret = -1;
1141         goto out;
1142     }
1143     /* initial packet */
1144     p->num_packets = 1;
1145 
1146     while (true) {
1147         qemu_sem_wait(&p->sem);
1148 
1149         if (atomic_read(&multifd_send_state->exiting)) {
1150             break;
1151         }
1152         qemu_mutex_lock(&p->mutex);
1153 
1154         if (p->pending_job) {
1155             uint32_t used = p->pages->used;
1156             uint64_t packet_num = p->packet_num;
1157             flags = p->flags;
1158 
1159             p->next_packet_size = used * qemu_target_page_size();
1160             multifd_send_fill_packet(p);
1161             p->flags = 0;
1162             p->num_packets++;
1163             p->num_pages += used;
1164             p->pages->used = 0;
1165             p->pages->block = NULL;
1166             qemu_mutex_unlock(&p->mutex);
1167 
1168             trace_multifd_send(p->id, packet_num, used, flags,
1169                                p->next_packet_size);
1170 
1171             ret = qio_channel_write_all(p->c, (void *)p->packet,
1172                                         p->packet_len, &local_err);
1173             if (ret != 0) {
1174                 break;
1175             }
1176 
1177             if (used) {
1178                 ret = qio_channel_writev_all(p->c, p->pages->iov,
1179                                              used, &local_err);
1180                 if (ret != 0) {
1181                     break;
1182                 }
1183             }
1184 
1185             qemu_mutex_lock(&p->mutex);
1186             p->pending_job--;
1187             qemu_mutex_unlock(&p->mutex);
1188 
1189             if (flags & MULTIFD_FLAG_SYNC) {
1190                 qemu_sem_post(&p->sem_sync);
1191             }
1192             qemu_sem_post(&multifd_send_state->channels_ready);
1193         } else if (p->quit) {
1194             qemu_mutex_unlock(&p->mutex);
1195             break;
1196         } else {
1197             qemu_mutex_unlock(&p->mutex);
1198             /* sometimes there are spurious wakeups */
1199         }
1200     }
1201 
1202 out:
1203     if (local_err) {
1204         trace_multifd_send_error(p->id);
1205         multifd_send_terminate_threads(local_err);
1206     }
1207 
1208     /*
1209      * Error happen, I will exit, but I can't just leave, tell
1210      * who pay attention to me.
1211      */
1212     if (ret != 0) {
1213         qemu_sem_post(&p->sem_sync);
1214         qemu_sem_post(&multifd_send_state->channels_ready);
1215     }
1216 
1217     qemu_mutex_lock(&p->mutex);
1218     p->running = false;
1219     qemu_mutex_unlock(&p->mutex);
1220 
1221     rcu_unregister_thread();
1222     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1223 
1224     return NULL;
1225 }
1226 
1227 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1228 {
1229     MultiFDSendParams *p = opaque;
1230     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1231     Error *local_err = NULL;
1232 
1233     trace_multifd_new_send_channel_async(p->id);
1234     if (qio_task_propagate_error(task, &local_err)) {
1235         migrate_set_error(migrate_get_current(), local_err);
1236         multifd_save_cleanup();
1237     } else {
1238         p->c = QIO_CHANNEL(sioc);
1239         qio_channel_set_delay(p->c, false);
1240         p->running = true;
1241         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1242                            QEMU_THREAD_JOINABLE);
1243     }
1244 }
1245 
1246 int multifd_save_setup(void)
1247 {
1248     int thread_count;
1249     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1250     uint8_t i;
1251 
1252     if (!migrate_use_multifd()) {
1253         return 0;
1254     }
1255     thread_count = migrate_multifd_channels();
1256     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1257     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1258     multifd_send_state->pages = multifd_pages_init(page_count);
1259     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1260     atomic_set(&multifd_send_state->exiting, 0);
1261 
1262     for (i = 0; i < thread_count; i++) {
1263         MultiFDSendParams *p = &multifd_send_state->params[i];
1264 
1265         qemu_mutex_init(&p->mutex);
1266         qemu_sem_init(&p->sem, 0);
1267         qemu_sem_init(&p->sem_sync, 0);
1268         p->quit = false;
1269         p->pending_job = 0;
1270         p->id = i;
1271         p->pages = multifd_pages_init(page_count);
1272         p->packet_len = sizeof(MultiFDPacket_t)
1273                       + sizeof(uint64_t) * page_count;
1274         p->packet = g_malloc0(p->packet_len);
1275         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1276         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1277         p->name = g_strdup_printf("multifdsend_%d", i);
1278         socket_send_channel_create(multifd_new_send_channel_async, p);
1279     }
1280     return 0;
1281 }
1282 
1283 struct {
1284     MultiFDRecvParams *params;
1285     /* number of created threads */
1286     int count;
1287     /* syncs main thread and channels */
1288     QemuSemaphore sem_sync;
1289     /* global number of generated multifd packets */
1290     uint64_t packet_num;
1291 } *multifd_recv_state;
1292 
1293 static void multifd_recv_terminate_threads(Error *err)
1294 {
1295     int i;
1296 
1297     trace_multifd_recv_terminate_threads(err != NULL);
1298 
1299     if (err) {
1300         MigrationState *s = migrate_get_current();
1301         migrate_set_error(s, err);
1302         if (s->state == MIGRATION_STATUS_SETUP ||
1303             s->state == MIGRATION_STATUS_ACTIVE) {
1304             migrate_set_state(&s->state, s->state,
1305                               MIGRATION_STATUS_FAILED);
1306         }
1307     }
1308 
1309     for (i = 0; i < migrate_multifd_channels(); i++) {
1310         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1311 
1312         qemu_mutex_lock(&p->mutex);
1313         p->quit = true;
1314         /* We could arrive here for two reasons:
1315            - normal quit, i.e. everything went fine, just finished
1316            - error quit: We close the channels so the channel threads
1317              finish the qio_channel_read_all_eof() */
1318         if (p->c) {
1319             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1320         }
1321         qemu_mutex_unlock(&p->mutex);
1322     }
1323 }
1324 
1325 int multifd_load_cleanup(Error **errp)
1326 {
1327     int i;
1328     int ret = 0;
1329 
1330     if (!migrate_use_multifd()) {
1331         return 0;
1332     }
1333     multifd_recv_terminate_threads(NULL);
1334     for (i = 0; i < migrate_multifd_channels(); i++) {
1335         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1336 
1337         if (p->running) {
1338             p->quit = true;
1339             /*
1340              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1341              * however try to wakeup it without harm in cleanup phase.
1342              */
1343             qemu_sem_post(&p->sem_sync);
1344             qemu_thread_join(&p->thread);
1345         }
1346     }
1347     for (i = 0; i < migrate_multifd_channels(); i++) {
1348         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1349 
1350         object_unref(OBJECT(p->c));
1351         p->c = NULL;
1352         qemu_mutex_destroy(&p->mutex);
1353         qemu_sem_destroy(&p->sem_sync);
1354         g_free(p->name);
1355         p->name = NULL;
1356         multifd_pages_clear(p->pages);
1357         p->pages = NULL;
1358         p->packet_len = 0;
1359         g_free(p->packet);
1360         p->packet = NULL;
1361     }
1362     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1363     g_free(multifd_recv_state->params);
1364     multifd_recv_state->params = NULL;
1365     g_free(multifd_recv_state);
1366     multifd_recv_state = NULL;
1367 
1368     return ret;
1369 }
1370 
1371 static void multifd_recv_sync_main(void)
1372 {
1373     int i;
1374 
1375     if (!migrate_use_multifd()) {
1376         return;
1377     }
1378     for (i = 0; i < migrate_multifd_channels(); i++) {
1379         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1380 
1381         trace_multifd_recv_sync_main_wait(p->id);
1382         qemu_sem_wait(&multifd_recv_state->sem_sync);
1383     }
1384     for (i = 0; i < migrate_multifd_channels(); i++) {
1385         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1386 
1387         qemu_mutex_lock(&p->mutex);
1388         if (multifd_recv_state->packet_num < p->packet_num) {
1389             multifd_recv_state->packet_num = p->packet_num;
1390         }
1391         qemu_mutex_unlock(&p->mutex);
1392         trace_multifd_recv_sync_main_signal(p->id);
1393         qemu_sem_post(&p->sem_sync);
1394     }
1395     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1396 }
1397 
1398 static void *multifd_recv_thread(void *opaque)
1399 {
1400     MultiFDRecvParams *p = opaque;
1401     Error *local_err = NULL;
1402     int ret;
1403 
1404     trace_multifd_recv_thread_start(p->id);
1405     rcu_register_thread();
1406 
1407     while (true) {
1408         uint32_t used;
1409         uint32_t flags;
1410 
1411         if (p->quit) {
1412             break;
1413         }
1414 
1415         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1416                                        p->packet_len, &local_err);
1417         if (ret == 0) {   /* EOF */
1418             break;
1419         }
1420         if (ret == -1) {   /* Error */
1421             break;
1422         }
1423 
1424         qemu_mutex_lock(&p->mutex);
1425         ret = multifd_recv_unfill_packet(p, &local_err);
1426         if (ret) {
1427             qemu_mutex_unlock(&p->mutex);
1428             break;
1429         }
1430 
1431         used = p->pages->used;
1432         flags = p->flags;
1433         trace_multifd_recv(p->id, p->packet_num, used, flags,
1434                            p->next_packet_size);
1435         p->num_packets++;
1436         p->num_pages += used;
1437         qemu_mutex_unlock(&p->mutex);
1438 
1439         if (used) {
1440             ret = qio_channel_readv_all(p->c, p->pages->iov,
1441                                         used, &local_err);
1442             if (ret != 0) {
1443                 break;
1444             }
1445         }
1446 
1447         if (flags & MULTIFD_FLAG_SYNC) {
1448             qemu_sem_post(&multifd_recv_state->sem_sync);
1449             qemu_sem_wait(&p->sem_sync);
1450         }
1451     }
1452 
1453     if (local_err) {
1454         multifd_recv_terminate_threads(local_err);
1455     }
1456     qemu_mutex_lock(&p->mutex);
1457     p->running = false;
1458     qemu_mutex_unlock(&p->mutex);
1459 
1460     rcu_unregister_thread();
1461     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1462 
1463     return NULL;
1464 }
1465 
1466 int multifd_load_setup(void)
1467 {
1468     int thread_count;
1469     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1470     uint8_t i;
1471 
1472     if (!migrate_use_multifd()) {
1473         return 0;
1474     }
1475     thread_count = migrate_multifd_channels();
1476     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1477     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1478     atomic_set(&multifd_recv_state->count, 0);
1479     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1480 
1481     for (i = 0; i < thread_count; i++) {
1482         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1483 
1484         qemu_mutex_init(&p->mutex);
1485         qemu_sem_init(&p->sem_sync, 0);
1486         p->quit = false;
1487         p->id = i;
1488         p->pages = multifd_pages_init(page_count);
1489         p->packet_len = sizeof(MultiFDPacket_t)
1490                       + sizeof(uint64_t) * page_count;
1491         p->packet = g_malloc0(p->packet_len);
1492         p->name = g_strdup_printf("multifdrecv_%d", i);
1493     }
1494     return 0;
1495 }
1496 
1497 bool multifd_recv_all_channels_created(void)
1498 {
1499     int thread_count = migrate_multifd_channels();
1500 
1501     if (!migrate_use_multifd()) {
1502         return true;
1503     }
1504 
1505     return thread_count == atomic_read(&multifd_recv_state->count);
1506 }
1507 
1508 /*
1509  * Try to receive all multifd channels to get ready for the migration.
1510  * - Return true and do not set @errp when correctly receving all channels;
1511  * - Return false and do not set @errp when correctly receiving the current one;
1512  * - Return false and set @errp when failing to receive the current channel.
1513  */
1514 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1515 {
1516     MultiFDRecvParams *p;
1517     Error *local_err = NULL;
1518     int id;
1519 
1520     id = multifd_recv_initial_packet(ioc, &local_err);
1521     if (id < 0) {
1522         multifd_recv_terminate_threads(local_err);
1523         error_propagate_prepend(errp, local_err,
1524                                 "failed to receive packet"
1525                                 " via multifd channel %d: ",
1526                                 atomic_read(&multifd_recv_state->count));
1527         return false;
1528     }
1529     trace_multifd_recv_new_channel(id);
1530 
1531     p = &multifd_recv_state->params[id];
1532     if (p->c != NULL) {
1533         error_setg(&local_err, "multifd: received id '%d' already setup'",
1534                    id);
1535         multifd_recv_terminate_threads(local_err);
1536         error_propagate(errp, local_err);
1537         return false;
1538     }
1539     p->c = ioc;
1540     object_ref(OBJECT(ioc));
1541     /* initial packet */
1542     p->num_packets = 1;
1543 
1544     p->running = true;
1545     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1546                        QEMU_THREAD_JOINABLE);
1547     atomic_inc(&multifd_recv_state->count);
1548     return atomic_read(&multifd_recv_state->count) ==
1549            migrate_multifd_channels();
1550 }
1551 
1552 /**
1553  * save_page_header: write page header to wire
1554  *
1555  * If this is the 1st block, it also writes the block identification
1556  *
1557  * Returns the number of bytes written
1558  *
1559  * @f: QEMUFile where to send the data
1560  * @block: block that contains the page we want to send
1561  * @offset: offset inside the block for the page
1562  *          in the lower bits, it contains flags
1563  */
1564 static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1565                                ram_addr_t offset)
1566 {
1567     size_t size, len;
1568 
1569     if (block == rs->last_sent_block) {
1570         offset |= RAM_SAVE_FLAG_CONTINUE;
1571     }
1572     qemu_put_be64(f, offset);
1573     size = 8;
1574 
1575     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1576         len = strlen(block->idstr);
1577         qemu_put_byte(f, len);
1578         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1579         size += 1 + len;
1580         rs->last_sent_block = block;
1581     }
1582     return size;
1583 }
1584 
1585 /**
1586  * mig_throttle_guest_down: throotle down the guest
1587  *
1588  * Reduce amount of guest cpu execution to hopefully slow down memory
1589  * writes. If guest dirty memory rate is reduced below the rate at
1590  * which we can transfer pages to the destination then we should be
1591  * able to complete migration. Some workloads dirty memory way too
1592  * fast and will not effectively converge, even with auto-converge.
1593  */
1594 static void mig_throttle_guest_down(void)
1595 {
1596     MigrationState *s = migrate_get_current();
1597     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1598     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1599     int pct_max = s->parameters.max_cpu_throttle;
1600 
1601     /* We have not started throttling yet. Let's start it. */
1602     if (!cpu_throttle_active()) {
1603         cpu_throttle_set(pct_initial);
1604     } else {
1605         /* Throttling already on, just increase the rate */
1606         cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1607                          pct_max));
1608     }
1609 }
1610 
1611 /**
1612  * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1613  *
1614  * @rs: current RAM state
1615  * @current_addr: address for the zero page
1616  *
1617  * Update the xbzrle cache to reflect a page that's been sent as all 0.
1618  * The important thing is that a stale (not-yet-0'd) page be replaced
1619  * by the new data.
1620  * As a bonus, if the page wasn't in the cache it gets added so that
1621  * when a small write is made into the 0'd page it gets XBZRLE sent.
1622  */
1623 static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1624 {
1625     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1626         return;
1627     }
1628 
1629     /* We don't care if this fails to allocate a new cache page
1630      * as long as it updated an old one */
1631     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1632                  ram_counters.dirty_sync_count);
1633 }
1634 
1635 #define ENCODING_FLAG_XBZRLE 0x1
1636 
1637 /**
1638  * save_xbzrle_page: compress and send current page
1639  *
1640  * Returns: 1 means that we wrote the page
1641  *          0 means that page is identical to the one already sent
1642  *          -1 means that xbzrle would be longer than normal
1643  *
1644  * @rs: current RAM state
1645  * @current_data: pointer to the address of the page contents
1646  * @current_addr: addr of the page
1647  * @block: block that contains the page we want to send
1648  * @offset: offset inside the block for the page
1649  * @last_stage: if we are at the completion stage
1650  */
1651 static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1652                             ram_addr_t current_addr, RAMBlock *block,
1653                             ram_addr_t offset, bool last_stage)
1654 {
1655     int encoded_len = 0, bytes_xbzrle;
1656     uint8_t *prev_cached_page;
1657 
1658     if (!cache_is_cached(XBZRLE.cache, current_addr,
1659                          ram_counters.dirty_sync_count)) {
1660         xbzrle_counters.cache_miss++;
1661         if (!last_stage) {
1662             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1663                              ram_counters.dirty_sync_count) == -1) {
1664                 return -1;
1665             } else {
1666                 /* update *current_data when the page has been
1667                    inserted into cache */
1668                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
1669             }
1670         }
1671         return -1;
1672     }
1673 
1674     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1675 
1676     /* save current buffer into memory */
1677     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1678 
1679     /* XBZRLE encoding (if there is no overflow) */
1680     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1681                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1682                                        TARGET_PAGE_SIZE);
1683 
1684     /*
1685      * Update the cache contents, so that it corresponds to the data
1686      * sent, in all cases except where we skip the page.
1687      */
1688     if (!last_stage && encoded_len != 0) {
1689         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1690         /*
1691          * In the case where we couldn't compress, ensure that the caller
1692          * sends the data from the cache, since the guest might have
1693          * changed the RAM since we copied it.
1694          */
1695         *current_data = prev_cached_page;
1696     }
1697 
1698     if (encoded_len == 0) {
1699         trace_save_xbzrle_page_skipping();
1700         return 0;
1701     } else if (encoded_len == -1) {
1702         trace_save_xbzrle_page_overflow();
1703         xbzrle_counters.overflow++;
1704         return -1;
1705     }
1706 
1707     /* Send XBZRLE based compressed page */
1708     bytes_xbzrle = save_page_header(rs, rs->f, block,
1709                                     offset | RAM_SAVE_FLAG_XBZRLE);
1710     qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1711     qemu_put_be16(rs->f, encoded_len);
1712     qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1713     bytes_xbzrle += encoded_len + 1 + 2;
1714     xbzrle_counters.pages++;
1715     xbzrle_counters.bytes += bytes_xbzrle;
1716     ram_counters.transferred += bytes_xbzrle;
1717 
1718     return 1;
1719 }
1720 
1721 /**
1722  * migration_bitmap_find_dirty: find the next dirty page from start
1723  *
1724  * Returns the page offset within memory region of the start of a dirty page
1725  *
1726  * @rs: current RAM state
1727  * @rb: RAMBlock where to search for dirty pages
1728  * @start: page where we start the search
1729  */
1730 static inline
1731 unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1732                                           unsigned long start)
1733 {
1734     unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1735     unsigned long *bitmap = rb->bmap;
1736     unsigned long next;
1737 
1738     if (ramblock_is_ignored(rb)) {
1739         return size;
1740     }
1741 
1742     /*
1743      * When the free page optimization is enabled, we need to check the bitmap
1744      * to send the non-free pages rather than all the pages in the bulk stage.
1745      */
1746     if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1747         next = start + 1;
1748     } else {
1749         next = find_next_bit(bitmap, size, start);
1750     }
1751 
1752     return next;
1753 }
1754 
1755 static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1756                                                 RAMBlock *rb,
1757                                                 unsigned long page)
1758 {
1759     bool ret;
1760 
1761     qemu_mutex_lock(&rs->bitmap_mutex);
1762 
1763     /*
1764      * Clear dirty bitmap if needed.  This _must_ be called before we
1765      * send any of the page in the chunk because we need to make sure
1766      * we can capture further page content changes when we sync dirty
1767      * log the next time.  So as long as we are going to send any of
1768      * the page in the chunk we clear the remote dirty bitmap for all.
1769      * Clearing it earlier won't be a problem, but too late will.
1770      */
1771     if (rb->clear_bmap && clear_bmap_test_and_clear(rb, page)) {
1772         uint8_t shift = rb->clear_bmap_shift;
1773         hwaddr size = 1ULL << (TARGET_PAGE_BITS + shift);
1774         hwaddr start = (((ram_addr_t)page) << TARGET_PAGE_BITS) & (-size);
1775 
1776         /*
1777          * CLEAR_BITMAP_SHIFT_MIN should always guarantee this... this
1778          * can make things easier sometimes since then start address
1779          * of the small chunk will always be 64 pages aligned so the
1780          * bitmap will always be aligned to unsigned long.  We should
1781          * even be able to remove this restriction but I'm simply
1782          * keeping it.
1783          */
1784         assert(shift >= 6);
1785         trace_migration_bitmap_clear_dirty(rb->idstr, start, size, page);
1786         memory_region_clear_dirty_bitmap(rb->mr, start, size);
1787     }
1788 
1789     ret = test_and_clear_bit(page, rb->bmap);
1790 
1791     if (ret) {
1792         rs->migration_dirty_pages--;
1793     }
1794     qemu_mutex_unlock(&rs->bitmap_mutex);
1795 
1796     return ret;
1797 }
1798 
1799 /* Called with RCU critical section */
1800 static void ramblock_sync_dirty_bitmap(RAMState *rs, RAMBlock *rb)
1801 {
1802     rs->migration_dirty_pages +=
1803         cpu_physical_memory_sync_dirty_bitmap(rb, 0, rb->used_length,
1804                                               &rs->num_dirty_pages_period);
1805 }
1806 
1807 /**
1808  * ram_pagesize_summary: calculate all the pagesizes of a VM
1809  *
1810  * Returns a summary bitmap of the page sizes of all RAMBlocks
1811  *
1812  * For VMs with just normal pages this is equivalent to the host page
1813  * size. If it's got some huge pages then it's the OR of all the
1814  * different page sizes.
1815  */
1816 uint64_t ram_pagesize_summary(void)
1817 {
1818     RAMBlock *block;
1819     uint64_t summary = 0;
1820 
1821     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1822         summary |= block->page_size;
1823     }
1824 
1825     return summary;
1826 }
1827 
1828 uint64_t ram_get_total_transferred_pages(void)
1829 {
1830     return  ram_counters.normal + ram_counters.duplicate +
1831                 compression_counters.pages + xbzrle_counters.pages;
1832 }
1833 
1834 static void migration_update_rates(RAMState *rs, int64_t end_time)
1835 {
1836     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1837     double compressed_size;
1838 
1839     /* calculate period counters */
1840     ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1841                 / (end_time - rs->time_last_bitmap_sync);
1842 
1843     if (!page_count) {
1844         return;
1845     }
1846 
1847     if (migrate_use_xbzrle()) {
1848         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1849             rs->xbzrle_cache_miss_prev) / page_count;
1850         rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1851     }
1852 
1853     if (migrate_use_compression()) {
1854         compression_counters.busy_rate = (double)(compression_counters.busy -
1855             rs->compress_thread_busy_prev) / page_count;
1856         rs->compress_thread_busy_prev = compression_counters.busy;
1857 
1858         compressed_size = compression_counters.compressed_size -
1859                           rs->compressed_size_prev;
1860         if (compressed_size) {
1861             double uncompressed_size = (compression_counters.pages -
1862                                     rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1863 
1864             /* Compression-Ratio = Uncompressed-size / Compressed-size */
1865             compression_counters.compression_rate =
1866                                         uncompressed_size / compressed_size;
1867 
1868             rs->compress_pages_prev = compression_counters.pages;
1869             rs->compressed_size_prev = compression_counters.compressed_size;
1870         }
1871     }
1872 }
1873 
1874 static void migration_bitmap_sync(RAMState *rs)
1875 {
1876     RAMBlock *block;
1877     int64_t end_time;
1878     uint64_t bytes_xfer_now;
1879 
1880     ram_counters.dirty_sync_count++;
1881 
1882     if (!rs->time_last_bitmap_sync) {
1883         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1884     }
1885 
1886     trace_migration_bitmap_sync_start();
1887     memory_global_dirty_log_sync();
1888 
1889     qemu_mutex_lock(&rs->bitmap_mutex);
1890     WITH_RCU_READ_LOCK_GUARD() {
1891         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1892             ramblock_sync_dirty_bitmap(rs, block);
1893         }
1894         ram_counters.remaining = ram_bytes_remaining();
1895     }
1896     qemu_mutex_unlock(&rs->bitmap_mutex);
1897 
1898     memory_global_after_dirty_log_sync();
1899     trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1900 
1901     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1902 
1903     /* more than 1 second = 1000 millisecons */
1904     if (end_time > rs->time_last_bitmap_sync + 1000) {
1905         bytes_xfer_now = ram_counters.transferred;
1906 
1907         /* During block migration the auto-converge logic incorrectly detects
1908          * that ram migration makes no progress. Avoid this by disabling the
1909          * throttling logic during the bulk phase of block migration. */
1910         if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1911             /* The following detection logic can be refined later. For now:
1912                Check to see if the dirtied bytes is 50% more than the approx.
1913                amount of bytes that just got transferred since the last time we
1914                were in this routine. If that happens twice, start or increase
1915                throttling */
1916 
1917             if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1918                    (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1919                 (++rs->dirty_rate_high_cnt >= 2)) {
1920                     trace_migration_throttle();
1921                     rs->dirty_rate_high_cnt = 0;
1922                     mig_throttle_guest_down();
1923             }
1924         }
1925 
1926         migration_update_rates(rs, end_time);
1927 
1928         rs->target_page_count_prev = rs->target_page_count;
1929 
1930         /* reset period counters */
1931         rs->time_last_bitmap_sync = end_time;
1932         rs->num_dirty_pages_period = 0;
1933         rs->bytes_xfer_prev = bytes_xfer_now;
1934     }
1935     if (migrate_use_events()) {
1936         qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1937     }
1938 }
1939 
1940 static void migration_bitmap_sync_precopy(RAMState *rs)
1941 {
1942     Error *local_err = NULL;
1943 
1944     /*
1945      * The current notifier usage is just an optimization to migration, so we
1946      * don't stop the normal migration process in the error case.
1947      */
1948     if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1949         error_report_err(local_err);
1950     }
1951 
1952     migration_bitmap_sync(rs);
1953 
1954     if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1955         error_report_err(local_err);
1956     }
1957 }
1958 
1959 /**
1960  * save_zero_page_to_file: send the zero page to the file
1961  *
1962  * Returns the size of data written to the file, 0 means the page is not
1963  * a zero page
1964  *
1965  * @rs: current RAM state
1966  * @file: the file where the data is saved
1967  * @block: block that contains the page we want to send
1968  * @offset: offset inside the block for the page
1969  */
1970 static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1971                                   RAMBlock *block, ram_addr_t offset)
1972 {
1973     uint8_t *p = block->host + offset;
1974     int len = 0;
1975 
1976     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1977         len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1978         qemu_put_byte(file, 0);
1979         len += 1;
1980     }
1981     return len;
1982 }
1983 
1984 /**
1985  * save_zero_page: send the zero page to the stream
1986  *
1987  * Returns the number of pages written.
1988  *
1989  * @rs: current RAM state
1990  * @block: block that contains the page we want to send
1991  * @offset: offset inside the block for the page
1992  */
1993 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1994 {
1995     int len = save_zero_page_to_file(rs, rs->f, block, offset);
1996 
1997     if (len) {
1998         ram_counters.duplicate++;
1999         ram_counters.transferred += len;
2000         return 1;
2001     }
2002     return -1;
2003 }
2004 
2005 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
2006 {
2007     if (!migrate_release_ram() || !migration_in_postcopy()) {
2008         return;
2009     }
2010 
2011     ram_discard_range(rbname, offset, ((ram_addr_t)pages) << TARGET_PAGE_BITS);
2012 }
2013 
2014 /*
2015  * @pages: the number of pages written by the control path,
2016  *        < 0 - error
2017  *        > 0 - number of pages written
2018  *
2019  * Return true if the pages has been saved, otherwise false is returned.
2020  */
2021 static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2022                               int *pages)
2023 {
2024     uint64_t bytes_xmit = 0;
2025     int ret;
2026 
2027     *pages = -1;
2028     ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
2029                                 &bytes_xmit);
2030     if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
2031         return false;
2032     }
2033 
2034     if (bytes_xmit) {
2035         ram_counters.transferred += bytes_xmit;
2036         *pages = 1;
2037     }
2038 
2039     if (ret == RAM_SAVE_CONTROL_DELAYED) {
2040         return true;
2041     }
2042 
2043     if (bytes_xmit > 0) {
2044         ram_counters.normal++;
2045     } else if (bytes_xmit == 0) {
2046         ram_counters.duplicate++;
2047     }
2048 
2049     return true;
2050 }
2051 
2052 /*
2053  * directly send the page to the stream
2054  *
2055  * Returns the number of pages written.
2056  *
2057  * @rs: current RAM state
2058  * @block: block that contains the page we want to send
2059  * @offset: offset inside the block for the page
2060  * @buf: the page to be sent
2061  * @async: send to page asyncly
2062  */
2063 static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2064                             uint8_t *buf, bool async)
2065 {
2066     ram_counters.transferred += save_page_header(rs, rs->f, block,
2067                                                  offset | RAM_SAVE_FLAG_PAGE);
2068     if (async) {
2069         qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
2070                               migrate_release_ram() &
2071                               migration_in_postcopy());
2072     } else {
2073         qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
2074     }
2075     ram_counters.transferred += TARGET_PAGE_SIZE;
2076     ram_counters.normal++;
2077     return 1;
2078 }
2079 
2080 /**
2081  * ram_save_page: send the given page to the stream
2082  *
2083  * Returns the number of pages written.
2084  *          < 0 - error
2085  *          >=0 - Number of pages written - this might legally be 0
2086  *                if xbzrle noticed the page was the same.
2087  *
2088  * @rs: current RAM state
2089  * @block: block that contains the page we want to send
2090  * @offset: offset inside the block for the page
2091  * @last_stage: if we are at the completion stage
2092  */
2093 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
2094 {
2095     int pages = -1;
2096     uint8_t *p;
2097     bool send_async = true;
2098     RAMBlock *block = pss->block;
2099     ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
2100     ram_addr_t current_addr = block->offset + offset;
2101 
2102     p = block->host + offset;
2103     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
2104 
2105     XBZRLE_cache_lock();
2106     if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
2107         migrate_use_xbzrle()) {
2108         pages = save_xbzrle_page(rs, &p, current_addr, block,
2109                                  offset, last_stage);
2110         if (!last_stage) {
2111             /* Can't send this cached data async, since the cache page
2112              * might get updated before it gets to the wire
2113              */
2114             send_async = false;
2115         }
2116     }
2117 
2118     /* XBZRLE overflow or normal page */
2119     if (pages == -1) {
2120         pages = save_normal_page(rs, block, offset, p, send_async);
2121     }
2122 
2123     XBZRLE_cache_unlock();
2124 
2125     return pages;
2126 }
2127 
2128 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2129                                  ram_addr_t offset)
2130 {
2131     if (multifd_queue_page(rs, block, offset) < 0) {
2132         return -1;
2133     }
2134     ram_counters.normal++;
2135 
2136     return 1;
2137 }
2138 
2139 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2140                                  ram_addr_t offset, uint8_t *source_buf)
2141 {
2142     RAMState *rs = ram_state;
2143     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2144     bool zero_page = false;
2145     int ret;
2146 
2147     if (save_zero_page_to_file(rs, f, block, offset)) {
2148         zero_page = true;
2149         goto exit;
2150     }
2151 
2152     save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2153 
2154     /*
2155      * copy it to a internal buffer to avoid it being modified by VM
2156      * so that we can catch up the error during compression and
2157      * decompression
2158      */
2159     memcpy(source_buf, p, TARGET_PAGE_SIZE);
2160     ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2161     if (ret < 0) {
2162         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2163         error_report("compressed data failed!");
2164         return false;
2165     }
2166 
2167 exit:
2168     ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2169     return zero_page;
2170 }
2171 
2172 static void
2173 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2174 {
2175     ram_counters.transferred += bytes_xmit;
2176 
2177     if (param->zero_page) {
2178         ram_counters.duplicate++;
2179         return;
2180     }
2181 
2182     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2183     compression_counters.compressed_size += bytes_xmit - 8;
2184     compression_counters.pages++;
2185 }
2186 
2187 static bool save_page_use_compression(RAMState *rs);
2188 
2189 static void flush_compressed_data(RAMState *rs)
2190 {
2191     int idx, len, thread_count;
2192 
2193     if (!save_page_use_compression(rs)) {
2194         return;
2195     }
2196     thread_count = migrate_compress_threads();
2197 
2198     qemu_mutex_lock(&comp_done_lock);
2199     for (idx = 0; idx < thread_count; idx++) {
2200         while (!comp_param[idx].done) {
2201             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2202         }
2203     }
2204     qemu_mutex_unlock(&comp_done_lock);
2205 
2206     for (idx = 0; idx < thread_count; idx++) {
2207         qemu_mutex_lock(&comp_param[idx].mutex);
2208         if (!comp_param[idx].quit) {
2209             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2210             /*
2211              * it's safe to fetch zero_page without holding comp_done_lock
2212              * as there is no further request submitted to the thread,
2213              * i.e, the thread should be waiting for a request at this point.
2214              */
2215             update_compress_thread_counts(&comp_param[idx], len);
2216         }
2217         qemu_mutex_unlock(&comp_param[idx].mutex);
2218     }
2219 }
2220 
2221 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2222                                        ram_addr_t offset)
2223 {
2224     param->block = block;
2225     param->offset = offset;
2226 }
2227 
2228 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2229                                            ram_addr_t offset)
2230 {
2231     int idx, thread_count, bytes_xmit = -1, pages = -1;
2232     bool wait = migrate_compress_wait_thread();
2233 
2234     thread_count = migrate_compress_threads();
2235     qemu_mutex_lock(&comp_done_lock);
2236 retry:
2237     for (idx = 0; idx < thread_count; idx++) {
2238         if (comp_param[idx].done) {
2239             comp_param[idx].done = false;
2240             bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2241             qemu_mutex_lock(&comp_param[idx].mutex);
2242             set_compress_params(&comp_param[idx], block, offset);
2243             qemu_cond_signal(&comp_param[idx].cond);
2244             qemu_mutex_unlock(&comp_param[idx].mutex);
2245             pages = 1;
2246             update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2247             break;
2248         }
2249     }
2250 
2251     /*
2252      * wait for the free thread if the user specifies 'compress-wait-thread',
2253      * otherwise we will post the page out in the main thread as normal page.
2254      */
2255     if (pages < 0 && wait) {
2256         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2257         goto retry;
2258     }
2259     qemu_mutex_unlock(&comp_done_lock);
2260 
2261     return pages;
2262 }
2263 
2264 /**
2265  * find_dirty_block: find the next dirty page and update any state
2266  * associated with the search process.
2267  *
2268  * Returns true if a page is found
2269  *
2270  * @rs: current RAM state
2271  * @pss: data about the state of the current dirty page scan
2272  * @again: set to false if the search has scanned the whole of RAM
2273  */
2274 static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2275 {
2276     pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2277     if (pss->complete_round && pss->block == rs->last_seen_block &&
2278         pss->page >= rs->last_page) {
2279         /*
2280          * We've been once around the RAM and haven't found anything.
2281          * Give up.
2282          */
2283         *again = false;
2284         return false;
2285     }
2286     if ((((ram_addr_t)pss->page) << TARGET_PAGE_BITS)
2287         >= pss->block->used_length) {
2288         /* Didn't find anything in this RAM Block */
2289         pss->page = 0;
2290         pss->block = QLIST_NEXT_RCU(pss->block, next);
2291         if (!pss->block) {
2292             /*
2293              * If memory migration starts over, we will meet a dirtied page
2294              * which may still exists in compression threads's ring, so we
2295              * should flush the compressed data to make sure the new page
2296              * is not overwritten by the old one in the destination.
2297              *
2298              * Also If xbzrle is on, stop using the data compression at this
2299              * point. In theory, xbzrle can do better than compression.
2300              */
2301             flush_compressed_data(rs);
2302 
2303             /* Hit the end of the list */
2304             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2305             /* Flag that we've looped */
2306             pss->complete_round = true;
2307             rs->ram_bulk_stage = false;
2308         }
2309         /* Didn't find anything this time, but try again on the new block */
2310         *again = true;
2311         return false;
2312     } else {
2313         /* Can go around again, but... */
2314         *again = true;
2315         /* We've found something so probably don't need to */
2316         return true;
2317     }
2318 }
2319 
2320 /**
2321  * unqueue_page: gets a page of the queue
2322  *
2323  * Helper for 'get_queued_page' - gets a page off the queue
2324  *
2325  * Returns the block of the page (or NULL if none available)
2326  *
2327  * @rs: current RAM state
2328  * @offset: used to return the offset within the RAMBlock
2329  */
2330 static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2331 {
2332     RAMBlock *block = NULL;
2333 
2334     if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2335         return NULL;
2336     }
2337 
2338     qemu_mutex_lock(&rs->src_page_req_mutex);
2339     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2340         struct RAMSrcPageRequest *entry =
2341                                 QSIMPLEQ_FIRST(&rs->src_page_requests);
2342         block = entry->rb;
2343         *offset = entry->offset;
2344 
2345         if (entry->len > TARGET_PAGE_SIZE) {
2346             entry->len -= TARGET_PAGE_SIZE;
2347             entry->offset += TARGET_PAGE_SIZE;
2348         } else {
2349             memory_region_unref(block->mr);
2350             QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2351             g_free(entry);
2352             migration_consume_urgent_request();
2353         }
2354     }
2355     qemu_mutex_unlock(&rs->src_page_req_mutex);
2356 
2357     return block;
2358 }
2359 
2360 /**
2361  * get_queued_page: unqueue a page from the postcopy requests
2362  *
2363  * Skips pages that are already sent (!dirty)
2364  *
2365  * Returns true if a queued page is found
2366  *
2367  * @rs: current RAM state
2368  * @pss: data about the state of the current dirty page scan
2369  */
2370 static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2371 {
2372     RAMBlock  *block;
2373     ram_addr_t offset;
2374     bool dirty;
2375 
2376     do {
2377         block = unqueue_page(rs, &offset);
2378         /*
2379          * We're sending this page, and since it's postcopy nothing else
2380          * will dirty it, and we must make sure it doesn't get sent again
2381          * even if this queue request was received after the background
2382          * search already sent it.
2383          */
2384         if (block) {
2385             unsigned long page;
2386 
2387             page = offset >> TARGET_PAGE_BITS;
2388             dirty = test_bit(page, block->bmap);
2389             if (!dirty) {
2390                 trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2391                                                 page);
2392             } else {
2393                 trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2394             }
2395         }
2396 
2397     } while (block && !dirty);
2398 
2399     if (block) {
2400         /*
2401          * As soon as we start servicing pages out of order, then we have
2402          * to kill the bulk stage, since the bulk stage assumes
2403          * in (migration_bitmap_find_and_reset_dirty) that every page is
2404          * dirty, that's no longer true.
2405          */
2406         rs->ram_bulk_stage = false;
2407 
2408         /*
2409          * We want the background search to continue from the queued page
2410          * since the guest is likely to want other pages near to the page
2411          * it just requested.
2412          */
2413         pss->block = block;
2414         pss->page = offset >> TARGET_PAGE_BITS;
2415 
2416         /*
2417          * This unqueued page would break the "one round" check, even is
2418          * really rare.
2419          */
2420         pss->complete_round = false;
2421     }
2422 
2423     return !!block;
2424 }
2425 
2426 /**
2427  * migration_page_queue_free: drop any remaining pages in the ram
2428  * request queue
2429  *
2430  * It should be empty at the end anyway, but in error cases there may
2431  * be some left.  in case that there is any page left, we drop it.
2432  *
2433  */
2434 static void migration_page_queue_free(RAMState *rs)
2435 {
2436     struct RAMSrcPageRequest *mspr, *next_mspr;
2437     /* This queue generally should be empty - but in the case of a failed
2438      * migration might have some droppings in.
2439      */
2440     RCU_READ_LOCK_GUARD();
2441     QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2442         memory_region_unref(mspr->rb->mr);
2443         QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2444         g_free(mspr);
2445     }
2446 }
2447 
2448 /**
2449  * ram_save_queue_pages: queue the page for transmission
2450  *
2451  * A request from postcopy destination for example.
2452  *
2453  * Returns zero on success or negative on error
2454  *
2455  * @rbname: Name of the RAMBLock of the request. NULL means the
2456  *          same that last one.
2457  * @start: starting address from the start of the RAMBlock
2458  * @len: length (in bytes) to send
2459  */
2460 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2461 {
2462     RAMBlock *ramblock;
2463     RAMState *rs = ram_state;
2464 
2465     ram_counters.postcopy_requests++;
2466     RCU_READ_LOCK_GUARD();
2467 
2468     if (!rbname) {
2469         /* Reuse last RAMBlock */
2470         ramblock = rs->last_req_rb;
2471 
2472         if (!ramblock) {
2473             /*
2474              * Shouldn't happen, we can't reuse the last RAMBlock if
2475              * it's the 1st request.
2476              */
2477             error_report("ram_save_queue_pages no previous block");
2478             return -1;
2479         }
2480     } else {
2481         ramblock = qemu_ram_block_by_name(rbname);
2482 
2483         if (!ramblock) {
2484             /* We shouldn't be asked for a non-existent RAMBlock */
2485             error_report("ram_save_queue_pages no block '%s'", rbname);
2486             return -1;
2487         }
2488         rs->last_req_rb = ramblock;
2489     }
2490     trace_ram_save_queue_pages(ramblock->idstr, start, len);
2491     if (start+len > ramblock->used_length) {
2492         error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2493                      RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2494                      __func__, start, len, ramblock->used_length);
2495         return -1;
2496     }
2497 
2498     struct RAMSrcPageRequest *new_entry =
2499         g_malloc0(sizeof(struct RAMSrcPageRequest));
2500     new_entry->rb = ramblock;
2501     new_entry->offset = start;
2502     new_entry->len = len;
2503 
2504     memory_region_ref(ramblock->mr);
2505     qemu_mutex_lock(&rs->src_page_req_mutex);
2506     QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2507     migration_make_urgent_request();
2508     qemu_mutex_unlock(&rs->src_page_req_mutex);
2509 
2510     return 0;
2511 }
2512 
2513 static bool save_page_use_compression(RAMState *rs)
2514 {
2515     if (!migrate_use_compression()) {
2516         return false;
2517     }
2518 
2519     /*
2520      * If xbzrle is on, stop using the data compression after first
2521      * round of migration even if compression is enabled. In theory,
2522      * xbzrle can do better than compression.
2523      */
2524     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2525         return true;
2526     }
2527 
2528     return false;
2529 }
2530 
2531 /*
2532  * try to compress the page before posting it out, return true if the page
2533  * has been properly handled by compression, otherwise needs other
2534  * paths to handle it
2535  */
2536 static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2537 {
2538     if (!save_page_use_compression(rs)) {
2539         return false;
2540     }
2541 
2542     /*
2543      * When starting the process of a new block, the first page of
2544      * the block should be sent out before other pages in the same
2545      * block, and all the pages in last block should have been sent
2546      * out, keeping this order is important, because the 'cont' flag
2547      * is used to avoid resending the block name.
2548      *
2549      * We post the fist page as normal page as compression will take
2550      * much CPU resource.
2551      */
2552     if (block != rs->last_sent_block) {
2553         flush_compressed_data(rs);
2554         return false;
2555     }
2556 
2557     if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2558         return true;
2559     }
2560 
2561     compression_counters.busy++;
2562     return false;
2563 }
2564 
2565 /**
2566  * ram_save_target_page: save one target page
2567  *
2568  * Returns the number of pages written
2569  *
2570  * @rs: current RAM state
2571  * @pss: data about the page we want to send
2572  * @last_stage: if we are at the completion stage
2573  */
2574 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2575                                 bool last_stage)
2576 {
2577     RAMBlock *block = pss->block;
2578     ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
2579     int res;
2580 
2581     if (control_save_page(rs, block, offset, &res)) {
2582         return res;
2583     }
2584 
2585     if (save_compress_page(rs, block, offset)) {
2586         return 1;
2587     }
2588 
2589     res = save_zero_page(rs, block, offset);
2590     if (res > 0) {
2591         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2592          * page would be stale
2593          */
2594         if (!save_page_use_compression(rs)) {
2595             XBZRLE_cache_lock();
2596             xbzrle_cache_zero_page(rs, block->offset + offset);
2597             XBZRLE_cache_unlock();
2598         }
2599         ram_release_pages(block->idstr, offset, res);
2600         return res;
2601     }
2602 
2603     /*
2604      * Do not use multifd for:
2605      * 1. Compression as the first page in the new block should be posted out
2606      *    before sending the compressed page
2607      * 2. In postcopy as one whole host page should be placed
2608      */
2609     if (!save_page_use_compression(rs) && migrate_use_multifd()
2610         && !migration_in_postcopy()) {
2611         return ram_save_multifd_page(rs, block, offset);
2612     }
2613 
2614     return ram_save_page(rs, pss, last_stage);
2615 }
2616 
2617 /**
2618  * ram_save_host_page: save a whole host page
2619  *
2620  * Starting at *offset send pages up to the end of the current host
2621  * page. It's valid for the initial offset to point into the middle of
2622  * a host page in which case the remainder of the hostpage is sent.
2623  * Only dirty target pages are sent. Note that the host page size may
2624  * be a huge page for this block.
2625  * The saving stops at the boundary of the used_length of the block
2626  * if the RAMBlock isn't a multiple of the host page size.
2627  *
2628  * Returns the number of pages written or negative on error
2629  *
2630  * @rs: current RAM state
2631  * @ms: current migration state
2632  * @pss: data about the page we want to send
2633  * @last_stage: if we are at the completion stage
2634  */
2635 static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2636                               bool last_stage)
2637 {
2638     int tmppages, pages = 0;
2639     size_t pagesize_bits =
2640         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2641 
2642     if (ramblock_is_ignored(pss->block)) {
2643         error_report("block %s should not be migrated !", pss->block->idstr);
2644         return 0;
2645     }
2646 
2647     do {
2648         /* Check the pages is dirty and if it is send it */
2649         if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2650             pss->page++;
2651             continue;
2652         }
2653 
2654         tmppages = ram_save_target_page(rs, pss, last_stage);
2655         if (tmppages < 0) {
2656             return tmppages;
2657         }
2658 
2659         pages += tmppages;
2660         pss->page++;
2661         /* Allow rate limiting to happen in the middle of huge pages */
2662         migration_rate_limit();
2663     } while ((pss->page & (pagesize_bits - 1)) &&
2664              offset_in_ramblock(pss->block,
2665                                 ((ram_addr_t)pss->page) << TARGET_PAGE_BITS));
2666 
2667     /* The offset we leave with is the last one we looked at */
2668     pss->page--;
2669     return pages;
2670 }
2671 
2672 /**
2673  * ram_find_and_save_block: finds a dirty page and sends it to f
2674  *
2675  * Called within an RCU critical section.
2676  *
2677  * Returns the number of pages written where zero means no dirty pages,
2678  * or negative on error
2679  *
2680  * @rs: current RAM state
2681  * @last_stage: if we are at the completion stage
2682  *
2683  * On systems where host-page-size > target-page-size it will send all the
2684  * pages in a host page that are dirty.
2685  */
2686 
2687 static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2688 {
2689     PageSearchStatus pss;
2690     int pages = 0;
2691     bool again, found;
2692 
2693     /* No dirty page as there is zero RAM */
2694     if (!ram_bytes_total()) {
2695         return pages;
2696     }
2697 
2698     pss.block = rs->last_seen_block;
2699     pss.page = rs->last_page;
2700     pss.complete_round = false;
2701 
2702     if (!pss.block) {
2703         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2704     }
2705 
2706     do {
2707         again = true;
2708         found = get_queued_page(rs, &pss);
2709 
2710         if (!found) {
2711             /* priority queue empty, so just search for something dirty */
2712             found = find_dirty_block(rs, &pss, &again);
2713         }
2714 
2715         if (found) {
2716             pages = ram_save_host_page(rs, &pss, last_stage);
2717         }
2718     } while (!pages && again);
2719 
2720     rs->last_seen_block = pss.block;
2721     rs->last_page = pss.page;
2722 
2723     return pages;
2724 }
2725 
2726 void acct_update_position(QEMUFile *f, size_t size, bool zero)
2727 {
2728     uint64_t pages = size / TARGET_PAGE_SIZE;
2729 
2730     if (zero) {
2731         ram_counters.duplicate += pages;
2732     } else {
2733         ram_counters.normal += pages;
2734         ram_counters.transferred += size;
2735         qemu_update_position(f, size);
2736     }
2737 }
2738 
2739 static uint64_t ram_bytes_total_common(bool count_ignored)
2740 {
2741     RAMBlock *block;
2742     uint64_t total = 0;
2743 
2744     RCU_READ_LOCK_GUARD();
2745 
2746     if (count_ignored) {
2747         RAMBLOCK_FOREACH_MIGRATABLE(block) {
2748             total += block->used_length;
2749         }
2750     } else {
2751         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2752             total += block->used_length;
2753         }
2754     }
2755     return total;
2756 }
2757 
2758 uint64_t ram_bytes_total(void)
2759 {
2760     return ram_bytes_total_common(false);
2761 }
2762 
2763 static void xbzrle_load_setup(void)
2764 {
2765     XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2766 }
2767 
2768 static void xbzrle_load_cleanup(void)
2769 {
2770     g_free(XBZRLE.decoded_buf);
2771     XBZRLE.decoded_buf = NULL;
2772 }
2773 
2774 static void ram_state_cleanup(RAMState **rsp)
2775 {
2776     if (*rsp) {
2777         migration_page_queue_free(*rsp);
2778         qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2779         qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2780         g_free(*rsp);
2781         *rsp = NULL;
2782     }
2783 }
2784 
2785 static void xbzrle_cleanup(void)
2786 {
2787     XBZRLE_cache_lock();
2788     if (XBZRLE.cache) {
2789         cache_fini(XBZRLE.cache);
2790         g_free(XBZRLE.encoded_buf);
2791         g_free(XBZRLE.current_buf);
2792         g_free(XBZRLE.zero_target_page);
2793         XBZRLE.cache = NULL;
2794         XBZRLE.encoded_buf = NULL;
2795         XBZRLE.current_buf = NULL;
2796         XBZRLE.zero_target_page = NULL;
2797     }
2798     XBZRLE_cache_unlock();
2799 }
2800 
2801 static void ram_save_cleanup(void *opaque)
2802 {
2803     RAMState **rsp = opaque;
2804     RAMBlock *block;
2805 
2806     /* caller have hold iothread lock or is in a bh, so there is
2807      * no writing race against the migration bitmap
2808      */
2809     memory_global_dirty_log_stop();
2810 
2811     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2812         g_free(block->clear_bmap);
2813         block->clear_bmap = NULL;
2814         g_free(block->bmap);
2815         block->bmap = NULL;
2816     }
2817 
2818     xbzrle_cleanup();
2819     compress_threads_save_cleanup();
2820     ram_state_cleanup(rsp);
2821 }
2822 
2823 static void ram_state_reset(RAMState *rs)
2824 {
2825     rs->last_seen_block = NULL;
2826     rs->last_sent_block = NULL;
2827     rs->last_page = 0;
2828     rs->last_version = ram_list.version;
2829     rs->ram_bulk_stage = true;
2830     rs->fpo_enabled = false;
2831 }
2832 
2833 #define MAX_WAIT 50 /* ms, half buffered_file limit */
2834 
2835 /*
2836  * 'expected' is the value you expect the bitmap mostly to be full
2837  * of; it won't bother printing lines that are all this value.
2838  * If 'todump' is null the migration bitmap is dumped.
2839  */
2840 void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2841                            unsigned long pages)
2842 {
2843     int64_t cur;
2844     int64_t linelen = 128;
2845     char linebuf[129];
2846 
2847     for (cur = 0; cur < pages; cur += linelen) {
2848         int64_t curb;
2849         bool found = false;
2850         /*
2851          * Last line; catch the case where the line length
2852          * is longer than remaining ram
2853          */
2854         if (cur + linelen > pages) {
2855             linelen = pages - cur;
2856         }
2857         for (curb = 0; curb < linelen; curb++) {
2858             bool thisbit = test_bit(cur + curb, todump);
2859             linebuf[curb] = thisbit ? '1' : '.';
2860             found = found || (thisbit != expected);
2861         }
2862         if (found) {
2863             linebuf[curb] = '\0';
2864             fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2865         }
2866     }
2867 }
2868 
2869 /* **** functions for postcopy ***** */
2870 
2871 void ram_postcopy_migrated_memory_release(MigrationState *ms)
2872 {
2873     struct RAMBlock *block;
2874 
2875     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2876         unsigned long *bitmap = block->bmap;
2877         unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2878         unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2879 
2880         while (run_start < range) {
2881             unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2882             ram_discard_range(block->idstr,
2883                               ((ram_addr_t)run_start) << TARGET_PAGE_BITS,
2884                               ((ram_addr_t)(run_end - run_start))
2885                                 << TARGET_PAGE_BITS);
2886             run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2887         }
2888     }
2889 }
2890 
2891 /**
2892  * postcopy_send_discard_bm_ram: discard a RAMBlock
2893  *
2894  * Returns zero on success
2895  *
2896  * Callback from postcopy_each_ram_send_discard for each RAMBlock
2897  *
2898  * @ms: current migration state
2899  * @block: RAMBlock to discard
2900  */
2901 static int postcopy_send_discard_bm_ram(MigrationState *ms, RAMBlock *block)
2902 {
2903     unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2904     unsigned long current;
2905     unsigned long *bitmap = block->bmap;
2906 
2907     for (current = 0; current < end; ) {
2908         unsigned long one = find_next_bit(bitmap, end, current);
2909         unsigned long zero, discard_length;
2910 
2911         if (one >= end) {
2912             break;
2913         }
2914 
2915         zero = find_next_zero_bit(bitmap, end, one + 1);
2916 
2917         if (zero >= end) {
2918             discard_length = end - one;
2919         } else {
2920             discard_length = zero - one;
2921         }
2922         postcopy_discard_send_range(ms, one, discard_length);
2923         current = one + discard_length;
2924     }
2925 
2926     return 0;
2927 }
2928 
2929 /**
2930  * postcopy_each_ram_send_discard: discard all RAMBlocks
2931  *
2932  * Returns 0 for success or negative for error
2933  *
2934  * Utility for the outgoing postcopy code.
2935  *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2936  *   passing it bitmap indexes and name.
2937  * (qemu_ram_foreach_block ends up passing unscaled lengths
2938  *  which would mean postcopy code would have to deal with target page)
2939  *
2940  * @ms: current migration state
2941  */
2942 static int postcopy_each_ram_send_discard(MigrationState *ms)
2943 {
2944     struct RAMBlock *block;
2945     int ret;
2946 
2947     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2948         postcopy_discard_send_init(ms, block->idstr);
2949 
2950         /*
2951          * Postcopy sends chunks of bitmap over the wire, but it
2952          * just needs indexes at this point, avoids it having
2953          * target page specific code.
2954          */
2955         ret = postcopy_send_discard_bm_ram(ms, block);
2956         postcopy_discard_send_finish(ms);
2957         if (ret) {
2958             return ret;
2959         }
2960     }
2961 
2962     return 0;
2963 }
2964 
2965 /**
2966  * postcopy_chunk_hostpages_pass: canonicalize bitmap in hostpages
2967  *
2968  * Helper for postcopy_chunk_hostpages; it's called twice to
2969  * canonicalize the two bitmaps, that are similar, but one is
2970  * inverted.
2971  *
2972  * Postcopy requires that all target pages in a hostpage are dirty or
2973  * clean, not a mix.  This function canonicalizes the bitmaps.
2974  *
2975  * @ms: current migration state
2976  * @block: block that contains the page we want to canonicalize
2977  */
2978 static void postcopy_chunk_hostpages_pass(MigrationState *ms, RAMBlock *block)
2979 {
2980     RAMState *rs = ram_state;
2981     unsigned long *bitmap = block->bmap;
2982     unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2983     unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2984     unsigned long run_start;
2985 
2986     if (block->page_size == TARGET_PAGE_SIZE) {
2987         /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2988         return;
2989     }
2990 
2991     /* Find a dirty page */
2992     run_start = find_next_bit(bitmap, pages, 0);
2993 
2994     while (run_start < pages) {
2995 
2996         /*
2997          * If the start of this run of pages is in the middle of a host
2998          * page, then we need to fixup this host page.
2999          */
3000         if (QEMU_IS_ALIGNED(run_start, host_ratio)) {
3001             /* Find the end of this run */
3002             run_start = find_next_zero_bit(bitmap, pages, run_start + 1);
3003             /*
3004              * If the end isn't at the start of a host page, then the
3005              * run doesn't finish at the end of a host page
3006              * and we need to discard.
3007              */
3008         }
3009 
3010         if (!QEMU_IS_ALIGNED(run_start, host_ratio)) {
3011             unsigned long page;
3012             unsigned long fixup_start_addr = QEMU_ALIGN_DOWN(run_start,
3013                                                              host_ratio);
3014             run_start = QEMU_ALIGN_UP(run_start, host_ratio);
3015 
3016             /* Clean up the bitmap */
3017             for (page = fixup_start_addr;
3018                  page < fixup_start_addr + host_ratio; page++) {
3019                 /*
3020                  * Remark them as dirty, updating the count for any pages
3021                  * that weren't previously dirty.
3022                  */
3023                 rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
3024             }
3025         }
3026 
3027         /* Find the next dirty page for the next iteration */
3028         run_start = find_next_bit(bitmap, pages, run_start);
3029     }
3030 }
3031 
3032 /**
3033  * postcopy_chunk_hostpages: discard any partially sent host page
3034  *
3035  * Utility for the outgoing postcopy code.
3036  *
3037  * Discard any partially sent host-page size chunks, mark any partially
3038  * dirty host-page size chunks as all dirty.  In this case the host-page
3039  * is the host-page for the particular RAMBlock, i.e. it might be a huge page
3040  *
3041  * Returns zero on success
3042  *
3043  * @ms: current migration state
3044  * @block: block we want to work with
3045  */
3046 static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
3047 {
3048     postcopy_discard_send_init(ms, block->idstr);
3049 
3050     /*
3051      * Ensure that all partially dirty host pages are made fully dirty.
3052      */
3053     postcopy_chunk_hostpages_pass(ms, block);
3054 
3055     postcopy_discard_send_finish(ms);
3056     return 0;
3057 }
3058 
3059 /**
3060  * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3061  *
3062  * Returns zero on success
3063  *
3064  * Transmit the set of pages to be discarded after precopy to the target
3065  * these are pages that:
3066  *     a) Have been previously transmitted but are now dirty again
3067  *     b) Pages that have never been transmitted, this ensures that
3068  *        any pages on the destination that have been mapped by background
3069  *        tasks get discarded (transparent huge pages is the specific concern)
3070  * Hopefully this is pretty sparse
3071  *
3072  * @ms: current migration state
3073  */
3074 int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3075 {
3076     RAMState *rs = ram_state;
3077     RAMBlock *block;
3078     int ret;
3079 
3080     RCU_READ_LOCK_GUARD();
3081 
3082     /* This should be our last sync, the src is now paused */
3083     migration_bitmap_sync(rs);
3084 
3085     /* Easiest way to make sure we don't resume in the middle of a host-page */
3086     rs->last_seen_block = NULL;
3087     rs->last_sent_block = NULL;
3088     rs->last_page = 0;
3089 
3090     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3091         /* Deal with TPS != HPS and huge pages */
3092         ret = postcopy_chunk_hostpages(ms, block);
3093         if (ret) {
3094             return ret;
3095         }
3096 
3097 #ifdef DEBUG_POSTCOPY
3098         ram_debug_dump_bitmap(block->bmap, true,
3099                               block->used_length >> TARGET_PAGE_BITS);
3100 #endif
3101     }
3102     trace_ram_postcopy_send_discard_bitmap();
3103 
3104     ret = postcopy_each_ram_send_discard(ms);
3105 
3106     return ret;
3107 }
3108 
3109 /**
3110  * ram_discard_range: discard dirtied pages at the beginning of postcopy
3111  *
3112  * Returns zero on success
3113  *
3114  * @rbname: name of the RAMBlock of the request. NULL means the
3115  *          same that last one.
3116  * @start: RAMBlock starting page
3117  * @length: RAMBlock size
3118  */
3119 int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3120 {
3121     trace_ram_discard_range(rbname, start, length);
3122 
3123     RCU_READ_LOCK_GUARD();
3124     RAMBlock *rb = qemu_ram_block_by_name(rbname);
3125 
3126     if (!rb) {
3127         error_report("ram_discard_range: Failed to find block '%s'", rbname);
3128         return -1;
3129     }
3130 
3131     /*
3132      * On source VM, we don't need to update the received bitmap since
3133      * we don't even have one.
3134      */
3135     if (rb->receivedmap) {
3136         bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3137                      length >> qemu_target_page_bits());
3138     }
3139 
3140     return ram_block_discard_range(rb, start, length);
3141 }
3142 
3143 /*
3144  * For every allocation, we will try not to crash the VM if the
3145  * allocation failed.
3146  */
3147 static int xbzrle_init(void)
3148 {
3149     Error *local_err = NULL;
3150 
3151     if (!migrate_use_xbzrle()) {
3152         return 0;
3153     }
3154 
3155     XBZRLE_cache_lock();
3156 
3157     XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3158     if (!XBZRLE.zero_target_page) {
3159         error_report("%s: Error allocating zero page", __func__);
3160         goto err_out;
3161     }
3162 
3163     XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3164                               TARGET_PAGE_SIZE, &local_err);
3165     if (!XBZRLE.cache) {
3166         error_report_err(local_err);
3167         goto free_zero_page;
3168     }
3169 
3170     XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3171     if (!XBZRLE.encoded_buf) {
3172         error_report("%s: Error allocating encoded_buf", __func__);
3173         goto free_cache;
3174     }
3175 
3176     XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3177     if (!XBZRLE.current_buf) {
3178         error_report("%s: Error allocating current_buf", __func__);
3179         goto free_encoded_buf;
3180     }
3181 
3182     /* We are all good */
3183     XBZRLE_cache_unlock();
3184     return 0;
3185 
3186 free_encoded_buf:
3187     g_free(XBZRLE.encoded_buf);
3188     XBZRLE.encoded_buf = NULL;
3189 free_cache:
3190     cache_fini(XBZRLE.cache);
3191     XBZRLE.cache = NULL;
3192 free_zero_page:
3193     g_free(XBZRLE.zero_target_page);
3194     XBZRLE.zero_target_page = NULL;
3195 err_out:
3196     XBZRLE_cache_unlock();
3197     return -ENOMEM;
3198 }
3199 
3200 static int ram_state_init(RAMState **rsp)
3201 {
3202     *rsp = g_try_new0(RAMState, 1);
3203 
3204     if (!*rsp) {
3205         error_report("%s: Init ramstate fail", __func__);
3206         return -1;
3207     }
3208 
3209     qemu_mutex_init(&(*rsp)->bitmap_mutex);
3210     qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3211     QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3212 
3213     /*
3214      * Count the total number of pages used by ram blocks not including any
3215      * gaps due to alignment or unplugs.
3216      * This must match with the initial values of dirty bitmap.
3217      */
3218     (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3219     ram_state_reset(*rsp);
3220 
3221     return 0;
3222 }
3223 
3224 static void ram_list_init_bitmaps(void)
3225 {
3226     MigrationState *ms = migrate_get_current();
3227     RAMBlock *block;
3228     unsigned long pages;
3229     uint8_t shift;
3230 
3231     /* Skip setting bitmap if there is no RAM */
3232     if (ram_bytes_total()) {
3233         shift = ms->clear_bitmap_shift;
3234         if (shift > CLEAR_BITMAP_SHIFT_MAX) {
3235             error_report("clear_bitmap_shift (%u) too big, using "
3236                          "max value (%u)", shift, CLEAR_BITMAP_SHIFT_MAX);
3237             shift = CLEAR_BITMAP_SHIFT_MAX;
3238         } else if (shift < CLEAR_BITMAP_SHIFT_MIN) {
3239             error_report("clear_bitmap_shift (%u) too small, using "
3240                          "min value (%u)", shift, CLEAR_BITMAP_SHIFT_MIN);
3241             shift = CLEAR_BITMAP_SHIFT_MIN;
3242         }
3243 
3244         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3245             pages = block->max_length >> TARGET_PAGE_BITS;
3246             /*
3247              * The initial dirty bitmap for migration must be set with all
3248              * ones to make sure we'll migrate every guest RAM page to
3249              * destination.
3250              * Here we set RAMBlock.bmap all to 1 because when rebegin a
3251              * new migration after a failed migration, ram_list.
3252              * dirty_memory[DIRTY_MEMORY_MIGRATION] don't include the whole
3253              * guest memory.
3254              */
3255             block->bmap = bitmap_new(pages);
3256             bitmap_set(block->bmap, 0, pages);
3257             block->clear_bmap_shift = shift;
3258             block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift));
3259         }
3260     }
3261 }
3262 
3263 static void ram_init_bitmaps(RAMState *rs)
3264 {
3265     /* For memory_global_dirty_log_start below.  */
3266     qemu_mutex_lock_iothread();
3267     qemu_mutex_lock_ramlist();
3268 
3269     WITH_RCU_READ_LOCK_GUARD() {
3270         ram_list_init_bitmaps();
3271         memory_global_dirty_log_start();
3272         migration_bitmap_sync_precopy(rs);
3273     }
3274     qemu_mutex_unlock_ramlist();
3275     qemu_mutex_unlock_iothread();
3276 }
3277 
3278 static int ram_init_all(RAMState **rsp)
3279 {
3280     if (ram_state_init(rsp)) {
3281         return -1;
3282     }
3283 
3284     if (xbzrle_init()) {
3285         ram_state_cleanup(rsp);
3286         return -1;
3287     }
3288 
3289     ram_init_bitmaps(*rsp);
3290 
3291     return 0;
3292 }
3293 
3294 static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3295 {
3296     RAMBlock *block;
3297     uint64_t pages = 0;
3298 
3299     /*
3300      * Postcopy is not using xbzrle/compression, so no need for that.
3301      * Also, since source are already halted, we don't need to care
3302      * about dirty page logging as well.
3303      */
3304 
3305     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3306         pages += bitmap_count_one(block->bmap,
3307                                   block->used_length >> TARGET_PAGE_BITS);
3308     }
3309 
3310     /* This may not be aligned with current bitmaps. Recalculate. */
3311     rs->migration_dirty_pages = pages;
3312 
3313     rs->last_seen_block = NULL;
3314     rs->last_sent_block = NULL;
3315     rs->last_page = 0;
3316     rs->last_version = ram_list.version;
3317     /*
3318      * Disable the bulk stage, otherwise we'll resend the whole RAM no
3319      * matter what we have sent.
3320      */
3321     rs->ram_bulk_stage = false;
3322 
3323     /* Update RAMState cache of output QEMUFile */
3324     rs->f = out;
3325 
3326     trace_ram_state_resume_prepare(pages);
3327 }
3328 
3329 /*
3330  * This function clears bits of the free pages reported by the caller from the
3331  * migration dirty bitmap. @addr is the host address corresponding to the
3332  * start of the continuous guest free pages, and @len is the total bytes of
3333  * those pages.
3334  */
3335 void qemu_guest_free_page_hint(void *addr, size_t len)
3336 {
3337     RAMBlock *block;
3338     ram_addr_t offset;
3339     size_t used_len, start, npages;
3340     MigrationState *s = migrate_get_current();
3341 
3342     /* This function is currently expected to be used during live migration */
3343     if (!migration_is_setup_or_active(s->state)) {
3344         return;
3345     }
3346 
3347     for (; len > 0; len -= used_len, addr += used_len) {
3348         block = qemu_ram_block_from_host(addr, false, &offset);
3349         if (unlikely(!block || offset >= block->used_length)) {
3350             /*
3351              * The implementation might not support RAMBlock resize during
3352              * live migration, but it could happen in theory with future
3353              * updates. So we add a check here to capture that case.
3354              */
3355             error_report_once("%s unexpected error", __func__);
3356             return;
3357         }
3358 
3359         if (len <= block->used_length - offset) {
3360             used_len = len;
3361         } else {
3362             used_len = block->used_length - offset;
3363         }
3364 
3365         start = offset >> TARGET_PAGE_BITS;
3366         npages = used_len >> TARGET_PAGE_BITS;
3367 
3368         qemu_mutex_lock(&ram_state->bitmap_mutex);
3369         ram_state->migration_dirty_pages -=
3370                       bitmap_count_one_with_offset(block->bmap, start, npages);
3371         bitmap_clear(block->bmap, start, npages);
3372         qemu_mutex_unlock(&ram_state->bitmap_mutex);
3373     }
3374 }
3375 
3376 /*
3377  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3378  * long-running RCU critical section.  When rcu-reclaims in the code
3379  * start to become numerous it will be necessary to reduce the
3380  * granularity of these critical sections.
3381  */
3382 
3383 /**
3384  * ram_save_setup: Setup RAM for migration
3385  *
3386  * Returns zero to indicate success and negative for error
3387  *
3388  * @f: QEMUFile where to send the data
3389  * @opaque: RAMState pointer
3390  */
3391 static int ram_save_setup(QEMUFile *f, void *opaque)
3392 {
3393     RAMState **rsp = opaque;
3394     RAMBlock *block;
3395 
3396     if (compress_threads_save_setup()) {
3397         return -1;
3398     }
3399 
3400     /* migration has already setup the bitmap, reuse it. */
3401     if (!migration_in_colo_state()) {
3402         if (ram_init_all(rsp) != 0) {
3403             compress_threads_save_cleanup();
3404             return -1;
3405         }
3406     }
3407     (*rsp)->f = f;
3408 
3409     WITH_RCU_READ_LOCK_GUARD() {
3410         qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3411 
3412         RAMBLOCK_FOREACH_MIGRATABLE(block) {
3413             qemu_put_byte(f, strlen(block->idstr));
3414             qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3415             qemu_put_be64(f, block->used_length);
3416             if (migrate_postcopy_ram() && block->page_size !=
3417                                           qemu_host_page_size) {
3418                 qemu_put_be64(f, block->page_size);
3419             }
3420             if (migrate_ignore_shared()) {
3421                 qemu_put_be64(f, block->mr->addr);
3422             }
3423         }
3424     }
3425 
3426     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3427     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3428 
3429     multifd_send_sync_main(*rsp);
3430     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3431     qemu_fflush(f);
3432 
3433     return 0;
3434 }
3435 
3436 /**
3437  * ram_save_iterate: iterative stage for migration
3438  *
3439  * Returns zero to indicate success and negative for error
3440  *
3441  * @f: QEMUFile where to send the data
3442  * @opaque: RAMState pointer
3443  */
3444 static int ram_save_iterate(QEMUFile *f, void *opaque)
3445 {
3446     RAMState **temp = opaque;
3447     RAMState *rs = *temp;
3448     int ret;
3449     int i;
3450     int64_t t0;
3451     int done = 0;
3452 
3453     if (blk_mig_bulk_active()) {
3454         /* Avoid transferring ram during bulk phase of block migration as
3455          * the bulk phase will usually take a long time and transferring
3456          * ram updates during that time is pointless. */
3457         goto out;
3458     }
3459 
3460     WITH_RCU_READ_LOCK_GUARD() {
3461         if (ram_list.version != rs->last_version) {
3462             ram_state_reset(rs);
3463         }
3464 
3465         /* Read version before ram_list.blocks */
3466         smp_rmb();
3467 
3468         ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3469 
3470         t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3471         i = 0;
3472         while ((ret = qemu_file_rate_limit(f)) == 0 ||
3473                 !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3474             int pages;
3475 
3476             if (qemu_file_get_error(f)) {
3477                 break;
3478             }
3479 
3480             pages = ram_find_and_save_block(rs, false);
3481             /* no more pages to sent */
3482             if (pages == 0) {
3483                 done = 1;
3484                 break;
3485             }
3486 
3487             if (pages < 0) {
3488                 qemu_file_set_error(f, pages);
3489                 break;
3490             }
3491 
3492             rs->target_page_count += pages;
3493 
3494             /*
3495              * During postcopy, it is necessary to make sure one whole host
3496              * page is sent in one chunk.
3497              */
3498             if (migrate_postcopy_ram()) {
3499                 flush_compressed_data(rs);
3500             }
3501 
3502             /*
3503              * we want to check in the 1st loop, just in case it was the 1st
3504              * time and we had to sync the dirty bitmap.
3505              * qemu_clock_get_ns() is a bit expensive, so we only check each
3506              * some iterations
3507              */
3508             if ((i & 63) == 0) {
3509                 uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) /
3510                               1000000;
3511                 if (t1 > MAX_WAIT) {
3512                     trace_ram_save_iterate_big_wait(t1, i);
3513                     break;
3514                 }
3515             }
3516             i++;
3517         }
3518     }
3519 
3520     /*
3521      * Must occur before EOS (or any QEMUFile operation)
3522      * because of RDMA protocol.
3523      */
3524     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3525 
3526 out:
3527     multifd_send_sync_main(rs);
3528     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3529     qemu_fflush(f);
3530     ram_counters.transferred += 8;
3531 
3532     ret = qemu_file_get_error(f);
3533     if (ret < 0) {
3534         return ret;
3535     }
3536 
3537     return done;
3538 }
3539 
3540 /**
3541  * ram_save_complete: function called to send the remaining amount of ram
3542  *
3543  * Returns zero to indicate success or negative on error
3544  *
3545  * Called with iothread lock
3546  *
3547  * @f: QEMUFile where to send the data
3548  * @opaque: RAMState pointer
3549  */
3550 static int ram_save_complete(QEMUFile *f, void *opaque)
3551 {
3552     RAMState **temp = opaque;
3553     RAMState *rs = *temp;
3554     int ret = 0;
3555 
3556     WITH_RCU_READ_LOCK_GUARD() {
3557         if (!migration_in_postcopy()) {
3558             migration_bitmap_sync_precopy(rs);
3559         }
3560 
3561         ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3562 
3563         /* try transferring iterative blocks of memory */
3564 
3565         /* flush all remaining blocks regardless of rate limiting */
3566         while (true) {
3567             int pages;
3568 
3569             pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3570             /* no more blocks to sent */
3571             if (pages == 0) {
3572                 break;
3573             }
3574             if (pages < 0) {
3575                 ret = pages;
3576                 break;
3577             }
3578         }
3579 
3580         flush_compressed_data(rs);
3581         ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3582     }
3583 
3584     multifd_send_sync_main(rs);
3585     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3586     qemu_fflush(f);
3587 
3588     return ret;
3589 }
3590 
3591 static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3592                              uint64_t *res_precopy_only,
3593                              uint64_t *res_compatible,
3594                              uint64_t *res_postcopy_only)
3595 {
3596     RAMState **temp = opaque;
3597     RAMState *rs = *temp;
3598     uint64_t remaining_size;
3599 
3600     remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3601 
3602     if (!migration_in_postcopy() &&
3603         remaining_size < max_size) {
3604         qemu_mutex_lock_iothread();
3605         WITH_RCU_READ_LOCK_GUARD() {
3606             migration_bitmap_sync_precopy(rs);
3607         }
3608         qemu_mutex_unlock_iothread();
3609         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3610     }
3611 
3612     if (migrate_postcopy_ram()) {
3613         /* We can do postcopy, and all the data is postcopiable */
3614         *res_compatible += remaining_size;
3615     } else {
3616         *res_precopy_only += remaining_size;
3617     }
3618 }
3619 
3620 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3621 {
3622     unsigned int xh_len;
3623     int xh_flags;
3624     uint8_t *loaded_data;
3625 
3626     /* extract RLE header */
3627     xh_flags = qemu_get_byte(f);
3628     xh_len = qemu_get_be16(f);
3629 
3630     if (xh_flags != ENCODING_FLAG_XBZRLE) {
3631         error_report("Failed to load XBZRLE page - wrong compression!");
3632         return -1;
3633     }
3634 
3635     if (xh_len > TARGET_PAGE_SIZE) {
3636         error_report("Failed to load XBZRLE page - len overflow!");
3637         return -1;
3638     }
3639     loaded_data = XBZRLE.decoded_buf;
3640     /* load data and decode */
3641     /* it can change loaded_data to point to an internal buffer */
3642     qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3643 
3644     /* decode RLE */
3645     if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3646                              TARGET_PAGE_SIZE) == -1) {
3647         error_report("Failed to load XBZRLE page - decode error!");
3648         return -1;
3649     }
3650 
3651     return 0;
3652 }
3653 
3654 /**
3655  * ram_block_from_stream: read a RAMBlock id from the migration stream
3656  *
3657  * Must be called from within a rcu critical section.
3658  *
3659  * Returns a pointer from within the RCU-protected ram_list.
3660  *
3661  * @f: QEMUFile where to read the data from
3662  * @flags: Page flags (mostly to see if it's a continuation of previous block)
3663  */
3664 static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3665 {
3666     static RAMBlock *block = NULL;
3667     char id[256];
3668     uint8_t len;
3669 
3670     if (flags & RAM_SAVE_FLAG_CONTINUE) {
3671         if (!block) {
3672             error_report("Ack, bad migration stream!");
3673             return NULL;
3674         }
3675         return block;
3676     }
3677 
3678     len = qemu_get_byte(f);
3679     qemu_get_buffer(f, (uint8_t *)id, len);
3680     id[len] = 0;
3681 
3682     block = qemu_ram_block_by_name(id);
3683     if (!block) {
3684         error_report("Can't find block %s", id);
3685         return NULL;
3686     }
3687 
3688     if (ramblock_is_ignored(block)) {
3689         error_report("block %s should not be migrated !", id);
3690         return NULL;
3691     }
3692 
3693     return block;
3694 }
3695 
3696 static inline void *host_from_ram_block_offset(RAMBlock *block,
3697                                                ram_addr_t offset)
3698 {
3699     if (!offset_in_ramblock(block, offset)) {
3700         return NULL;
3701     }
3702 
3703     return block->host + offset;
3704 }
3705 
3706 static inline void *colo_cache_from_block_offset(RAMBlock *block,
3707                                                  ram_addr_t offset)
3708 {
3709     if (!offset_in_ramblock(block, offset)) {
3710         return NULL;
3711     }
3712     if (!block->colo_cache) {
3713         error_report("%s: colo_cache is NULL in block :%s",
3714                      __func__, block->idstr);
3715         return NULL;
3716     }
3717 
3718     /*
3719     * During colo checkpoint, we need bitmap of these migrated pages.
3720     * It help us to decide which pages in ram cache should be flushed
3721     * into VM's RAM later.
3722     */
3723     if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3724         ram_state->migration_dirty_pages++;
3725     }
3726     return block->colo_cache + offset;
3727 }
3728 
3729 /**
3730  * ram_handle_compressed: handle the zero page case
3731  *
3732  * If a page (or a whole RDMA chunk) has been
3733  * determined to be zero, then zap it.
3734  *
3735  * @host: host address for the zero page
3736  * @ch: what the page is filled from.  We only support zero
3737  * @size: size of the zero page
3738  */
3739 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3740 {
3741     if (ch != 0 || !is_zero_range(host, size)) {
3742         memset(host, ch, size);
3743     }
3744 }
3745 
3746 /* return the size after decompression, or negative value on error */
3747 static int
3748 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3749                      const uint8_t *source, size_t source_len)
3750 {
3751     int err;
3752 
3753     err = inflateReset(stream);
3754     if (err != Z_OK) {
3755         return -1;
3756     }
3757 
3758     stream->avail_in = source_len;
3759     stream->next_in = (uint8_t *)source;
3760     stream->avail_out = dest_len;
3761     stream->next_out = dest;
3762 
3763     err = inflate(stream, Z_NO_FLUSH);
3764     if (err != Z_STREAM_END) {
3765         return -1;
3766     }
3767 
3768     return stream->total_out;
3769 }
3770 
3771 static void *do_data_decompress(void *opaque)
3772 {
3773     DecompressParam *param = opaque;
3774     unsigned long pagesize;
3775     uint8_t *des;
3776     int len, ret;
3777 
3778     qemu_mutex_lock(&param->mutex);
3779     while (!param->quit) {
3780         if (param->des) {
3781             des = param->des;
3782             len = param->len;
3783             param->des = 0;
3784             qemu_mutex_unlock(&param->mutex);
3785 
3786             pagesize = TARGET_PAGE_SIZE;
3787 
3788             ret = qemu_uncompress_data(&param->stream, des, pagesize,
3789                                        param->compbuf, len);
3790             if (ret < 0 && migrate_get_current()->decompress_error_check) {
3791                 error_report("decompress data failed");
3792                 qemu_file_set_error(decomp_file, ret);
3793             }
3794 
3795             qemu_mutex_lock(&decomp_done_lock);
3796             param->done = true;
3797             qemu_cond_signal(&decomp_done_cond);
3798             qemu_mutex_unlock(&decomp_done_lock);
3799 
3800             qemu_mutex_lock(&param->mutex);
3801         } else {
3802             qemu_cond_wait(&param->cond, &param->mutex);
3803         }
3804     }
3805     qemu_mutex_unlock(&param->mutex);
3806 
3807     return NULL;
3808 }
3809 
3810 static int wait_for_decompress_done(void)
3811 {
3812     int idx, thread_count;
3813 
3814     if (!migrate_use_compression()) {
3815         return 0;
3816     }
3817 
3818     thread_count = migrate_decompress_threads();
3819     qemu_mutex_lock(&decomp_done_lock);
3820     for (idx = 0; idx < thread_count; idx++) {
3821         while (!decomp_param[idx].done) {
3822             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3823         }
3824     }
3825     qemu_mutex_unlock(&decomp_done_lock);
3826     return qemu_file_get_error(decomp_file);
3827 }
3828 
3829 static void compress_threads_load_cleanup(void)
3830 {
3831     int i, thread_count;
3832 
3833     if (!migrate_use_compression()) {
3834         return;
3835     }
3836     thread_count = migrate_decompress_threads();
3837     for (i = 0; i < thread_count; i++) {
3838         /*
3839          * we use it as a indicator which shows if the thread is
3840          * properly init'd or not
3841          */
3842         if (!decomp_param[i].compbuf) {
3843             break;
3844         }
3845 
3846         qemu_mutex_lock(&decomp_param[i].mutex);
3847         decomp_param[i].quit = true;
3848         qemu_cond_signal(&decomp_param[i].cond);
3849         qemu_mutex_unlock(&decomp_param[i].mutex);
3850     }
3851     for (i = 0; i < thread_count; i++) {
3852         if (!decomp_param[i].compbuf) {
3853             break;
3854         }
3855 
3856         qemu_thread_join(decompress_threads + i);
3857         qemu_mutex_destroy(&decomp_param[i].mutex);
3858         qemu_cond_destroy(&decomp_param[i].cond);
3859         inflateEnd(&decomp_param[i].stream);
3860         g_free(decomp_param[i].compbuf);
3861         decomp_param[i].compbuf = NULL;
3862     }
3863     g_free(decompress_threads);
3864     g_free(decomp_param);
3865     decompress_threads = NULL;
3866     decomp_param = NULL;
3867     decomp_file = NULL;
3868 }
3869 
3870 static int compress_threads_load_setup(QEMUFile *f)
3871 {
3872     int i, thread_count;
3873 
3874     if (!migrate_use_compression()) {
3875         return 0;
3876     }
3877 
3878     thread_count = migrate_decompress_threads();
3879     decompress_threads = g_new0(QemuThread, thread_count);
3880     decomp_param = g_new0(DecompressParam, thread_count);
3881     qemu_mutex_init(&decomp_done_lock);
3882     qemu_cond_init(&decomp_done_cond);
3883     decomp_file = f;
3884     for (i = 0; i < thread_count; i++) {
3885         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3886             goto exit;
3887         }
3888 
3889         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3890         qemu_mutex_init(&decomp_param[i].mutex);
3891         qemu_cond_init(&decomp_param[i].cond);
3892         decomp_param[i].done = true;
3893         decomp_param[i].quit = false;
3894         qemu_thread_create(decompress_threads + i, "decompress",
3895                            do_data_decompress, decomp_param + i,
3896                            QEMU_THREAD_JOINABLE);
3897     }
3898     return 0;
3899 exit:
3900     compress_threads_load_cleanup();
3901     return -1;
3902 }
3903 
3904 static void decompress_data_with_multi_threads(QEMUFile *f,
3905                                                void *host, int len)
3906 {
3907     int idx, thread_count;
3908 
3909     thread_count = migrate_decompress_threads();
3910     qemu_mutex_lock(&decomp_done_lock);
3911     while (true) {
3912         for (idx = 0; idx < thread_count; idx++) {
3913             if (decomp_param[idx].done) {
3914                 decomp_param[idx].done = false;
3915                 qemu_mutex_lock(&decomp_param[idx].mutex);
3916                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3917                 decomp_param[idx].des = host;
3918                 decomp_param[idx].len = len;
3919                 qemu_cond_signal(&decomp_param[idx].cond);
3920                 qemu_mutex_unlock(&decomp_param[idx].mutex);
3921                 break;
3922             }
3923         }
3924         if (idx < thread_count) {
3925             break;
3926         } else {
3927             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3928         }
3929     }
3930     qemu_mutex_unlock(&decomp_done_lock);
3931 }
3932 
3933 /*
3934  * colo cache: this is for secondary VM, we cache the whole
3935  * memory of the secondary VM, it is need to hold the global lock
3936  * to call this helper.
3937  */
3938 int colo_init_ram_cache(void)
3939 {
3940     RAMBlock *block;
3941 
3942     WITH_RCU_READ_LOCK_GUARD() {
3943         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3944             block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3945                                                     NULL,
3946                                                     false);
3947             if (!block->colo_cache) {
3948                 error_report("%s: Can't alloc memory for COLO cache of block %s,"
3949                              "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3950                              block->used_length);
3951                 RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3952                     if (block->colo_cache) {
3953                         qemu_anon_ram_free(block->colo_cache, block->used_length);
3954                         block->colo_cache = NULL;
3955                     }
3956                 }
3957                 return -errno;
3958             }
3959             memcpy(block->colo_cache, block->host, block->used_length);
3960         }
3961     }
3962 
3963     /*
3964     * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3965     * with to decide which page in cache should be flushed into SVM's RAM. Here
3966     * we use the same name 'ram_bitmap' as for migration.
3967     */
3968     if (ram_bytes_total()) {
3969         RAMBlock *block;
3970 
3971         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3972             unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3973 
3974             block->bmap = bitmap_new(pages);
3975             bitmap_set(block->bmap, 0, pages);
3976         }
3977     }
3978     ram_state = g_new0(RAMState, 1);
3979     ram_state->migration_dirty_pages = 0;
3980     qemu_mutex_init(&ram_state->bitmap_mutex);
3981     memory_global_dirty_log_start();
3982 
3983     return 0;
3984 }
3985 
3986 /* It is need to hold the global lock to call this helper */
3987 void colo_release_ram_cache(void)
3988 {
3989     RAMBlock *block;
3990 
3991     memory_global_dirty_log_stop();
3992     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3993         g_free(block->bmap);
3994         block->bmap = NULL;
3995     }
3996 
3997     WITH_RCU_READ_LOCK_GUARD() {
3998         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3999             if (block->colo_cache) {
4000                 qemu_anon_ram_free(block->colo_cache, block->used_length);
4001                 block->colo_cache = NULL;
4002             }
4003         }
4004     }
4005     qemu_mutex_destroy(&ram_state->bitmap_mutex);
4006     g_free(ram_state);
4007     ram_state = NULL;
4008 }
4009 
4010 /**
4011  * ram_load_setup: Setup RAM for migration incoming side
4012  *
4013  * Returns zero to indicate success and negative for error
4014  *
4015  * @f: QEMUFile where to receive the data
4016  * @opaque: RAMState pointer
4017  */
4018 static int ram_load_setup(QEMUFile *f, void *opaque)
4019 {
4020     if (compress_threads_load_setup(f)) {
4021         return -1;
4022     }
4023 
4024     xbzrle_load_setup();
4025     ramblock_recv_map_init();
4026 
4027     return 0;
4028 }
4029 
4030 static int ram_load_cleanup(void *opaque)
4031 {
4032     RAMBlock *rb;
4033 
4034     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4035         qemu_ram_block_writeback(rb);
4036     }
4037 
4038     xbzrle_load_cleanup();
4039     compress_threads_load_cleanup();
4040 
4041     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4042         g_free(rb->receivedmap);
4043         rb->receivedmap = NULL;
4044     }
4045 
4046     return 0;
4047 }
4048 
4049 /**
4050  * ram_postcopy_incoming_init: allocate postcopy data structures
4051  *
4052  * Returns 0 for success and negative if there was one error
4053  *
4054  * @mis: current migration incoming state
4055  *
4056  * Allocate data structures etc needed by incoming migration with
4057  * postcopy-ram. postcopy-ram's similarly names
4058  * postcopy_ram_incoming_init does the work.
4059  */
4060 int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4061 {
4062     return postcopy_ram_incoming_init(mis);
4063 }
4064 
4065 /**
4066  * ram_load_postcopy: load a page in postcopy case
4067  *
4068  * Returns 0 for success or -errno in case of error
4069  *
4070  * Called in postcopy mode by ram_load().
4071  * rcu_read_lock is taken prior to this being called.
4072  *
4073  * @f: QEMUFile where to send the data
4074  */
4075 static int ram_load_postcopy(QEMUFile *f)
4076 {
4077     int flags = 0, ret = 0;
4078     bool place_needed = false;
4079     bool matches_target_page_size = false;
4080     MigrationIncomingState *mis = migration_incoming_get_current();
4081     /* Temporary page that is later 'placed' */
4082     void *postcopy_host_page = mis->postcopy_tmp_page;
4083     void *this_host = NULL;
4084     bool all_zero = false;
4085     int target_pages = 0;
4086 
4087     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4088         ram_addr_t addr;
4089         void *host = NULL;
4090         void *page_buffer = NULL;
4091         void *place_source = NULL;
4092         RAMBlock *block = NULL;
4093         uint8_t ch;
4094         int len;
4095 
4096         addr = qemu_get_be64(f);
4097 
4098         /*
4099          * If qemu file error, we should stop here, and then "addr"
4100          * may be invalid
4101          */
4102         ret = qemu_file_get_error(f);
4103         if (ret) {
4104             break;
4105         }
4106 
4107         flags = addr & ~TARGET_PAGE_MASK;
4108         addr &= TARGET_PAGE_MASK;
4109 
4110         trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4111         place_needed = false;
4112         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4113                      RAM_SAVE_FLAG_COMPRESS_PAGE)) {
4114             block = ram_block_from_stream(f, flags);
4115 
4116             host = host_from_ram_block_offset(block, addr);
4117             if (!host) {
4118                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4119                 ret = -EINVAL;
4120                 break;
4121             }
4122             target_pages++;
4123             matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4124             /*
4125              * Postcopy requires that we place whole host pages atomically;
4126              * these may be huge pages for RAMBlocks that are backed by
4127              * hugetlbfs.
4128              * To make it atomic, the data is read into a temporary page
4129              * that's moved into place later.
4130              * The migration protocol uses,  possibly smaller, target-pages
4131              * however the source ensures it always sends all the components
4132              * of a host page in one chunk.
4133              */
4134             page_buffer = postcopy_host_page +
4135                           ((uintptr_t)host & (block->page_size - 1));
4136             /* If all TP are zero then we can optimise the place */
4137             if (target_pages == 1) {
4138                 all_zero = true;
4139                 this_host = (void *)QEMU_ALIGN_DOWN((uintptr_t)host,
4140                                                     block->page_size);
4141             } else {
4142                 /* not the 1st TP within the HP */
4143                 if (QEMU_ALIGN_DOWN((uintptr_t)host, block->page_size) !=
4144                     (uintptr_t)this_host) {
4145                     error_report("Non-same host page %p/%p",
4146                                   host, this_host);
4147                     ret = -EINVAL;
4148                     break;
4149                 }
4150             }
4151 
4152             /*
4153              * If it's the last part of a host page then we place the host
4154              * page
4155              */
4156             if (target_pages == (block->page_size / TARGET_PAGE_SIZE)) {
4157                 place_needed = true;
4158                 target_pages = 0;
4159             }
4160             place_source = postcopy_host_page;
4161         }
4162 
4163         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4164         case RAM_SAVE_FLAG_ZERO:
4165             ch = qemu_get_byte(f);
4166             /*
4167              * Can skip to set page_buffer when
4168              * this is a zero page and (block->page_size == TARGET_PAGE_SIZE).
4169              */
4170             if (ch || !matches_target_page_size) {
4171                 memset(page_buffer, ch, TARGET_PAGE_SIZE);
4172             }
4173             if (ch) {
4174                 all_zero = false;
4175             }
4176             break;
4177 
4178         case RAM_SAVE_FLAG_PAGE:
4179             all_zero = false;
4180             if (!matches_target_page_size) {
4181                 /* For huge pages, we always use temporary buffer */
4182                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4183             } else {
4184                 /*
4185                  * For small pages that matches target page size, we
4186                  * avoid the qemu_file copy.  Instead we directly use
4187                  * the buffer of QEMUFile to place the page.  Note: we
4188                  * cannot do any QEMUFile operation before using that
4189                  * buffer to make sure the buffer is valid when
4190                  * placing the page.
4191                  */
4192                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4193                                          TARGET_PAGE_SIZE);
4194             }
4195             break;
4196         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4197             all_zero = false;
4198             len = qemu_get_be32(f);
4199             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4200                 error_report("Invalid compressed data length: %d", len);
4201                 ret = -EINVAL;
4202                 break;
4203             }
4204             decompress_data_with_multi_threads(f, page_buffer, len);
4205             break;
4206 
4207         case RAM_SAVE_FLAG_EOS:
4208             /* normal exit */
4209             multifd_recv_sync_main();
4210             break;
4211         default:
4212             error_report("Unknown combination of migration flags: %#x"
4213                          " (postcopy mode)", flags);
4214             ret = -EINVAL;
4215             break;
4216         }
4217 
4218         /* Got the whole host page, wait for decompress before placing. */
4219         if (place_needed) {
4220             ret |= wait_for_decompress_done();
4221         }
4222 
4223         /* Detect for any possible file errors */
4224         if (!ret && qemu_file_get_error(f)) {
4225             ret = qemu_file_get_error(f);
4226         }
4227 
4228         if (!ret && place_needed) {
4229             /* This gets called at the last target page in the host page */
4230             void *place_dest = (void *)QEMU_ALIGN_DOWN((uintptr_t)host,
4231                                                        block->page_size);
4232 
4233             if (all_zero) {
4234                 ret = postcopy_place_page_zero(mis, place_dest,
4235                                                block);
4236             } else {
4237                 ret = postcopy_place_page(mis, place_dest,
4238                                           place_source, block);
4239             }
4240         }
4241     }
4242 
4243     return ret;
4244 }
4245 
4246 static bool postcopy_is_advised(void)
4247 {
4248     PostcopyState ps = postcopy_state_get();
4249     return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4250 }
4251 
4252 static bool postcopy_is_running(void)
4253 {
4254     PostcopyState ps = postcopy_state_get();
4255     return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4256 }
4257 
4258 /*
4259  * Flush content of RAM cache into SVM's memory.
4260  * Only flush the pages that be dirtied by PVM or SVM or both.
4261  */
4262 static void colo_flush_ram_cache(void)
4263 {
4264     RAMBlock *block = NULL;
4265     void *dst_host;
4266     void *src_host;
4267     unsigned long offset = 0;
4268 
4269     memory_global_dirty_log_sync();
4270     WITH_RCU_READ_LOCK_GUARD() {
4271         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4272             ramblock_sync_dirty_bitmap(ram_state, block);
4273         }
4274     }
4275 
4276     trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4277     WITH_RCU_READ_LOCK_GUARD() {
4278         block = QLIST_FIRST_RCU(&ram_list.blocks);
4279 
4280         while (block) {
4281             offset = migration_bitmap_find_dirty(ram_state, block, offset);
4282 
4283             if (((ram_addr_t)offset) << TARGET_PAGE_BITS
4284                 >= block->used_length) {
4285                 offset = 0;
4286                 block = QLIST_NEXT_RCU(block, next);
4287             } else {
4288                 migration_bitmap_clear_dirty(ram_state, block, offset);
4289                 dst_host = block->host
4290                          + (((ram_addr_t)offset) << TARGET_PAGE_BITS);
4291                 src_host = block->colo_cache
4292                          + (((ram_addr_t)offset) << TARGET_PAGE_BITS);
4293                 memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4294             }
4295         }
4296     }
4297     trace_colo_flush_ram_cache_end();
4298 }
4299 
4300 /**
4301  * ram_load_precopy: load pages in precopy case
4302  *
4303  * Returns 0 for success or -errno in case of error
4304  *
4305  * Called in precopy mode by ram_load().
4306  * rcu_read_lock is taken prior to this being called.
4307  *
4308  * @f: QEMUFile where to send the data
4309  */
4310 static int ram_load_precopy(QEMUFile *f)
4311 {
4312     int flags = 0, ret = 0, invalid_flags = 0, len = 0, i = 0;
4313     /* ADVISE is earlier, it shows the source has the postcopy capability on */
4314     bool postcopy_advised = postcopy_is_advised();
4315     if (!migrate_use_compression()) {
4316         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4317     }
4318 
4319     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4320         ram_addr_t addr, total_ram_bytes;
4321         void *host = NULL;
4322         uint8_t ch;
4323 
4324         /*
4325          * Yield periodically to let main loop run, but an iteration of
4326          * the main loop is expensive, so do it each some iterations
4327          */
4328         if ((i & 32767) == 0 && qemu_in_coroutine()) {
4329             aio_co_schedule(qemu_get_current_aio_context(),
4330                             qemu_coroutine_self());
4331             qemu_coroutine_yield();
4332         }
4333         i++;
4334 
4335         addr = qemu_get_be64(f);
4336         flags = addr & ~TARGET_PAGE_MASK;
4337         addr &= TARGET_PAGE_MASK;
4338 
4339         if (flags & invalid_flags) {
4340             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4341                 error_report("Received an unexpected compressed page");
4342             }
4343 
4344             ret = -EINVAL;
4345             break;
4346         }
4347 
4348         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4349                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4350             RAMBlock *block = ram_block_from_stream(f, flags);
4351 
4352             /*
4353              * After going into COLO, we should load the Page into colo_cache.
4354              */
4355             if (migration_incoming_in_colo_state()) {
4356                 host = colo_cache_from_block_offset(block, addr);
4357             } else {
4358                 host = host_from_ram_block_offset(block, addr);
4359             }
4360             if (!host) {
4361                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4362                 ret = -EINVAL;
4363                 break;
4364             }
4365 
4366             if (!migration_incoming_in_colo_state()) {
4367                 ramblock_recv_bitmap_set(block, host);
4368             }
4369 
4370             trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4371         }
4372 
4373         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4374         case RAM_SAVE_FLAG_MEM_SIZE:
4375             /* Synchronize RAM block list */
4376             total_ram_bytes = addr;
4377             while (!ret && total_ram_bytes) {
4378                 RAMBlock *block;
4379                 char id[256];
4380                 ram_addr_t length;
4381 
4382                 len = qemu_get_byte(f);
4383                 qemu_get_buffer(f, (uint8_t *)id, len);
4384                 id[len] = 0;
4385                 length = qemu_get_be64(f);
4386 
4387                 block = qemu_ram_block_by_name(id);
4388                 if (block && !qemu_ram_is_migratable(block)) {
4389                     error_report("block %s should not be migrated !", id);
4390                     ret = -EINVAL;
4391                 } else if (block) {
4392                     if (length != block->used_length) {
4393                         Error *local_err = NULL;
4394 
4395                         ret = qemu_ram_resize(block, length,
4396                                               &local_err);
4397                         if (local_err) {
4398                             error_report_err(local_err);
4399                         }
4400                     }
4401                     /* For postcopy we need to check hugepage sizes match */
4402                     if (postcopy_advised &&
4403                         block->page_size != qemu_host_page_size) {
4404                         uint64_t remote_page_size = qemu_get_be64(f);
4405                         if (remote_page_size != block->page_size) {
4406                             error_report("Mismatched RAM page size %s "
4407                                          "(local) %zd != %" PRId64,
4408                                          id, block->page_size,
4409                                          remote_page_size);
4410                             ret = -EINVAL;
4411                         }
4412                     }
4413                     if (migrate_ignore_shared()) {
4414                         hwaddr addr = qemu_get_be64(f);
4415                         if (ramblock_is_ignored(block) &&
4416                             block->mr->addr != addr) {
4417                             error_report("Mismatched GPAs for block %s "
4418                                          "%" PRId64 "!= %" PRId64,
4419                                          id, (uint64_t)addr,
4420                                          (uint64_t)block->mr->addr);
4421                             ret = -EINVAL;
4422                         }
4423                     }
4424                     ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4425                                           block->idstr);
4426                 } else {
4427                     error_report("Unknown ramblock \"%s\", cannot "
4428                                  "accept migration", id);
4429                     ret = -EINVAL;
4430                 }
4431 
4432                 total_ram_bytes -= length;
4433             }
4434             break;
4435 
4436         case RAM_SAVE_FLAG_ZERO:
4437             ch = qemu_get_byte(f);
4438             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4439             break;
4440 
4441         case RAM_SAVE_FLAG_PAGE:
4442             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4443             break;
4444 
4445         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4446             len = qemu_get_be32(f);
4447             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4448                 error_report("Invalid compressed data length: %d", len);
4449                 ret = -EINVAL;
4450                 break;
4451             }
4452             decompress_data_with_multi_threads(f, host, len);
4453             break;
4454 
4455         case RAM_SAVE_FLAG_XBZRLE:
4456             if (load_xbzrle(f, addr, host) < 0) {
4457                 error_report("Failed to decompress XBZRLE page at "
4458                              RAM_ADDR_FMT, addr);
4459                 ret = -EINVAL;
4460                 break;
4461             }
4462             break;
4463         case RAM_SAVE_FLAG_EOS:
4464             /* normal exit */
4465             multifd_recv_sync_main();
4466             break;
4467         default:
4468             if (flags & RAM_SAVE_FLAG_HOOK) {
4469                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4470             } else {
4471                 error_report("Unknown combination of migration flags: %#x",
4472                              flags);
4473                 ret = -EINVAL;
4474             }
4475         }
4476         if (!ret) {
4477             ret = qemu_file_get_error(f);
4478         }
4479     }
4480 
4481     ret |= wait_for_decompress_done();
4482     return ret;
4483 }
4484 
4485 static int ram_load(QEMUFile *f, void *opaque, int version_id)
4486 {
4487     int ret = 0;
4488     static uint64_t seq_iter;
4489     /*
4490      * If system is running in postcopy mode, page inserts to host memory must
4491      * be atomic
4492      */
4493     bool postcopy_running = postcopy_is_running();
4494 
4495     seq_iter++;
4496 
4497     if (version_id != 4) {
4498         return -EINVAL;
4499     }
4500 
4501     /*
4502      * This RCU critical section can be very long running.
4503      * When RCU reclaims in the code start to become numerous,
4504      * it will be necessary to reduce the granularity of this
4505      * critical section.
4506      */
4507     WITH_RCU_READ_LOCK_GUARD() {
4508         if (postcopy_running) {
4509             ret = ram_load_postcopy(f);
4510         } else {
4511             ret = ram_load_precopy(f);
4512         }
4513     }
4514     trace_ram_load_complete(ret, seq_iter);
4515 
4516     if (!ret  && migration_incoming_in_colo_state()) {
4517         colo_flush_ram_cache();
4518     }
4519     return ret;
4520 }
4521 
4522 static bool ram_has_postcopy(void *opaque)
4523 {
4524     RAMBlock *rb;
4525     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4526         if (ramblock_is_pmem(rb)) {
4527             info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4528                          "is not supported now!", rb->idstr, rb->host);
4529             return false;
4530         }
4531     }
4532 
4533     return migrate_postcopy_ram();
4534 }
4535 
4536 /* Sync all the dirty bitmap with destination VM.  */
4537 static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4538 {
4539     RAMBlock *block;
4540     QEMUFile *file = s->to_dst_file;
4541     int ramblock_count = 0;
4542 
4543     trace_ram_dirty_bitmap_sync_start();
4544 
4545     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4546         qemu_savevm_send_recv_bitmap(file, block->idstr);
4547         trace_ram_dirty_bitmap_request(block->idstr);
4548         ramblock_count++;
4549     }
4550 
4551     trace_ram_dirty_bitmap_sync_wait();
4552 
4553     /* Wait until all the ramblocks' dirty bitmap synced */
4554     while (ramblock_count--) {
4555         qemu_sem_wait(&s->rp_state.rp_sem);
4556     }
4557 
4558     trace_ram_dirty_bitmap_sync_complete();
4559 
4560     return 0;
4561 }
4562 
4563 static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4564 {
4565     qemu_sem_post(&s->rp_state.rp_sem);
4566 }
4567 
4568 /*
4569  * Read the received bitmap, revert it as the initial dirty bitmap.
4570  * This is only used when the postcopy migration is paused but wants
4571  * to resume from a middle point.
4572  */
4573 int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4574 {
4575     int ret = -EINVAL;
4576     QEMUFile *file = s->rp_state.from_dst_file;
4577     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4578     uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4579     uint64_t size, end_mark;
4580 
4581     trace_ram_dirty_bitmap_reload_begin(block->idstr);
4582 
4583     if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4584         error_report("%s: incorrect state %s", __func__,
4585                      MigrationStatus_str(s->state));
4586         return -EINVAL;
4587     }
4588 
4589     /*
4590      * Note: see comments in ramblock_recv_bitmap_send() on why we
4591      * need the endianess convertion, and the paddings.
4592      */
4593     local_size = ROUND_UP(local_size, 8);
4594 
4595     /* Add paddings */
4596     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4597 
4598     size = qemu_get_be64(file);
4599 
4600     /* The size of the bitmap should match with our ramblock */
4601     if (size != local_size) {
4602         error_report("%s: ramblock '%s' bitmap size mismatch "
4603                      "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4604                      block->idstr, size, local_size);
4605         ret = -EINVAL;
4606         goto out;
4607     }
4608 
4609     size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4610     end_mark = qemu_get_be64(file);
4611 
4612     ret = qemu_file_get_error(file);
4613     if (ret || size != local_size) {
4614         error_report("%s: read bitmap failed for ramblock '%s': %d"
4615                      " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4616                      __func__, block->idstr, ret, local_size, size);
4617         ret = -EIO;
4618         goto out;
4619     }
4620 
4621     if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4622         error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4623                      __func__, block->idstr, end_mark);
4624         ret = -EINVAL;
4625         goto out;
4626     }
4627 
4628     /*
4629      * Endianess convertion. We are during postcopy (though paused).
4630      * The dirty bitmap won't change. We can directly modify it.
4631      */
4632     bitmap_from_le(block->bmap, le_bitmap, nbits);
4633 
4634     /*
4635      * What we received is "received bitmap". Revert it as the initial
4636      * dirty bitmap for this ramblock.
4637      */
4638     bitmap_complement(block->bmap, block->bmap, nbits);
4639 
4640     trace_ram_dirty_bitmap_reload_complete(block->idstr);
4641 
4642     /*
4643      * We succeeded to sync bitmap for current ramblock. If this is
4644      * the last one to sync, we need to notify the main send thread.
4645      */
4646     ram_dirty_bitmap_reload_notify(s);
4647 
4648     ret = 0;
4649 out:
4650     g_free(le_bitmap);
4651     return ret;
4652 }
4653 
4654 static int ram_resume_prepare(MigrationState *s, void *opaque)
4655 {
4656     RAMState *rs = *(RAMState **)opaque;
4657     int ret;
4658 
4659     ret = ram_dirty_bitmap_sync_all(s, rs);
4660     if (ret) {
4661         return ret;
4662     }
4663 
4664     ram_state_resume_prepare(rs, s->to_dst_file);
4665 
4666     return 0;
4667 }
4668 
4669 static SaveVMHandlers savevm_ram_handlers = {
4670     .save_setup = ram_save_setup,
4671     .save_live_iterate = ram_save_iterate,
4672     .save_live_complete_postcopy = ram_save_complete,
4673     .save_live_complete_precopy = ram_save_complete,
4674     .has_postcopy = ram_has_postcopy,
4675     .save_live_pending = ram_save_pending,
4676     .load_state = ram_load,
4677     .save_cleanup = ram_save_cleanup,
4678     .load_setup = ram_load_setup,
4679     .load_cleanup = ram_load_cleanup,
4680     .resume_prepare = ram_resume_prepare,
4681 };
4682 
4683 void ram_mig_init(void)
4684 {
4685     qemu_mutex_init(&XBZRLE.lock);
4686     register_savevm_live("ram", 0, 4, &savevm_ram_handlers, &ram_state);
4687 }
4688