xref: /qemu/migration/multifd.c (revision 19f9c044)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "migration-stats.h"
23 #include "socket.h"
24 #include "tls.h"
25 #include "qemu-file.h"
26 #include "trace.h"
27 #include "multifd.h"
28 #include "threadinfo.h"
29 #include "options.h"
30 #include "qemu/yank.h"
31 #include "io/channel-socket.h"
32 #include "yank_functions.h"
33 
34 /* Multiple fd's */
35 
36 #define MULTIFD_MAGIC 0x11223344U
37 #define MULTIFD_VERSION 1
38 
39 typedef struct {
40     uint32_t magic;
41     uint32_t version;
42     unsigned char uuid[16]; /* QemuUUID */
43     uint8_t id;
44     uint8_t unused1[7];     /* Reserved for future use */
45     uint64_t unused2[4];    /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47 
48 struct {
49     MultiFDSendParams *params;
50     /* array of pages to sent */
51     MultiFDPages_t *pages;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 struct {
83     MultiFDRecvParams *params;
84     /* number of created threads */
85     int count;
86     /* syncs main thread and channels */
87     QemuSemaphore sem_sync;
88     /* global number of generated multifd packets */
89     uint64_t packet_num;
90     int exiting;
91     /* multifd ops */
92     MultiFDMethods *ops;
93 } *multifd_recv_state;
94 
95 /* Multifd without compression */
96 
97 /**
98  * nocomp_send_setup: setup send side
99  *
100  * @p: Params for the channel that we are using
101  * @errp: pointer to an error
102  */
103 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
104 {
105     if (migrate_zero_copy_send()) {
106         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
107     }
108 
109     return 0;
110 }
111 
112 /**
113  * nocomp_send_cleanup: cleanup send side
114  *
115  * For no compression this function does nothing.
116  *
117  * @p: Params for the channel that we are using
118  * @errp: pointer to an error
119  */
120 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
121 {
122     return;
123 }
124 
125 /**
126  * nocomp_send_prepare: prepare date to be able to send
127  *
128  * For no compression we just have to calculate the size of the
129  * packet.
130  *
131  * Returns 0 for success or -1 for error
132  *
133  * @p: Params for the channel that we are using
134  * @errp: pointer to an error
135  */
136 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
137 {
138     bool use_zero_copy_send = migrate_zero_copy_send();
139     MultiFDPages_t *pages = p->pages;
140     int ret;
141 
142     if (!use_zero_copy_send) {
143         /*
144          * Only !zerocopy needs the header in IOV; zerocopy will
145          * send it separately.
146          */
147         multifd_send_prepare_header(p);
148     }
149 
150     for (int i = 0; i < pages->num; i++) {
151         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
152         p->iov[p->iovs_num].iov_len = p->page_size;
153         p->iovs_num++;
154     }
155 
156     p->next_packet_size = pages->num * p->page_size;
157     p->flags |= MULTIFD_FLAG_NOCOMP;
158 
159     multifd_send_fill_packet(p);
160 
161     if (use_zero_copy_send) {
162         /* Send header first, without zerocopy */
163         ret = qio_channel_write_all(p->c, (void *)p->packet,
164                                     p->packet_len, errp);
165         if (ret != 0) {
166             return -1;
167         }
168     }
169 
170     return 0;
171 }
172 
173 /**
174  * nocomp_recv_setup: setup receive side
175  *
176  * For no compression this function does nothing.
177  *
178  * Returns 0 for success or -1 for error
179  *
180  * @p: Params for the channel that we are using
181  * @errp: pointer to an error
182  */
183 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
184 {
185     return 0;
186 }
187 
188 /**
189  * nocomp_recv_cleanup: setup receive side
190  *
191  * For no compression this function does nothing.
192  *
193  * @p: Params for the channel that we are using
194  */
195 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
196 {
197 }
198 
199 /**
200  * nocomp_recv_pages: read the data from the channel into actual pages
201  *
202  * For no compression we just need to read things into the correct place.
203  *
204  * Returns 0 for success or -1 for error
205  *
206  * @p: Params for the channel that we are using
207  * @errp: pointer to an error
208  */
209 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
210 {
211     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
212 
213     if (flags != MULTIFD_FLAG_NOCOMP) {
214         error_setg(errp, "multifd %u: flags received %x flags expected %x",
215                    p->id, flags, MULTIFD_FLAG_NOCOMP);
216         return -1;
217     }
218     for (int i = 0; i < p->normal_num; i++) {
219         p->iov[i].iov_base = p->host + p->normal[i];
220         p->iov[i].iov_len = p->page_size;
221     }
222     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
223 }
224 
225 static MultiFDMethods multifd_nocomp_ops = {
226     .send_setup = nocomp_send_setup,
227     .send_cleanup = nocomp_send_cleanup,
228     .send_prepare = nocomp_send_prepare,
229     .recv_setup = nocomp_recv_setup,
230     .recv_cleanup = nocomp_recv_cleanup,
231     .recv_pages = nocomp_recv_pages
232 };
233 
234 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
235     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
236 };
237 
238 void multifd_register_ops(int method, MultiFDMethods *ops)
239 {
240     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
241     multifd_ops[method] = ops;
242 }
243 
244 /* Reset a MultiFDPages_t* object for the next use */
245 static void multifd_pages_reset(MultiFDPages_t *pages)
246 {
247     /*
248      * We don't need to touch offset[] array, because it will be
249      * overwritten later when reused.
250      */
251     pages->num = 0;
252     pages->block = NULL;
253 }
254 
255 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
256 {
257     MultiFDInit_t msg = {};
258     size_t size = sizeof(msg);
259     int ret;
260 
261     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
262     msg.version = cpu_to_be32(MULTIFD_VERSION);
263     msg.id = p->id;
264     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
265 
266     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
267     if (ret != 0) {
268         return -1;
269     }
270     stat64_add(&mig_stats.multifd_bytes, size);
271     return 0;
272 }
273 
274 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
275 {
276     MultiFDInit_t msg;
277     int ret;
278 
279     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
280     if (ret != 0) {
281         return -1;
282     }
283 
284     msg.magic = be32_to_cpu(msg.magic);
285     msg.version = be32_to_cpu(msg.version);
286 
287     if (msg.magic != MULTIFD_MAGIC) {
288         error_setg(errp, "multifd: received packet magic %x "
289                    "expected %x", msg.magic, MULTIFD_MAGIC);
290         return -1;
291     }
292 
293     if (msg.version != MULTIFD_VERSION) {
294         error_setg(errp, "multifd: received packet version %u "
295                    "expected %u", msg.version, MULTIFD_VERSION);
296         return -1;
297     }
298 
299     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
300         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
301         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
302 
303         error_setg(errp, "multifd: received uuid '%s' and expected "
304                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
305         g_free(uuid);
306         g_free(msg_uuid);
307         return -1;
308     }
309 
310     if (msg.id > migrate_multifd_channels()) {
311         error_setg(errp, "multifd: received channel id %u is greater than "
312                    "number of channels %u", msg.id, migrate_multifd_channels());
313         return -1;
314     }
315 
316     return msg.id;
317 }
318 
319 static MultiFDPages_t *multifd_pages_init(uint32_t n)
320 {
321     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
322 
323     pages->allocated = n;
324     pages->offset = g_new0(ram_addr_t, n);
325 
326     return pages;
327 }
328 
329 static void multifd_pages_clear(MultiFDPages_t *pages)
330 {
331     multifd_pages_reset(pages);
332     pages->allocated = 0;
333     g_free(pages->offset);
334     pages->offset = NULL;
335     g_free(pages);
336 }
337 
338 void multifd_send_fill_packet(MultiFDSendParams *p)
339 {
340     MultiFDPacket_t *packet = p->packet;
341     MultiFDPages_t *pages = p->pages;
342     uint64_t packet_num;
343     int i;
344 
345     packet->flags = cpu_to_be32(p->flags);
346     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
347     packet->normal_pages = cpu_to_be32(pages->num);
348     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
349 
350     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
351     packet->packet_num = cpu_to_be64(packet_num);
352 
353     if (pages->block) {
354         strncpy(packet->ramblock, pages->block->idstr, 256);
355     }
356 
357     for (i = 0; i < pages->num; i++) {
358         /* there are architectures where ram_addr_t is 32 bit */
359         uint64_t temp = pages->offset[i];
360 
361         packet->offset[i] = cpu_to_be64(temp);
362     }
363 
364     p->packets_sent++;
365     p->total_normal_pages += pages->num;
366 
367     trace_multifd_send(p->id, packet_num, pages->num, p->flags,
368                        p->next_packet_size);
369 }
370 
371 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
372 {
373     MultiFDPacket_t *packet = p->packet;
374     int i;
375 
376     packet->magic = be32_to_cpu(packet->magic);
377     if (packet->magic != MULTIFD_MAGIC) {
378         error_setg(errp, "multifd: received packet "
379                    "magic %x and expected magic %x",
380                    packet->magic, MULTIFD_MAGIC);
381         return -1;
382     }
383 
384     packet->version = be32_to_cpu(packet->version);
385     if (packet->version != MULTIFD_VERSION) {
386         error_setg(errp, "multifd: received packet "
387                    "version %u and expected version %u",
388                    packet->version, MULTIFD_VERSION);
389         return -1;
390     }
391 
392     p->flags = be32_to_cpu(packet->flags);
393 
394     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
395     /*
396      * If we received a packet that is 100 times bigger than expected
397      * just stop migration.  It is a magic number.
398      */
399     if (packet->pages_alloc > p->page_count) {
400         error_setg(errp, "multifd: received packet "
401                    "with size %u and expected a size of %u",
402                    packet->pages_alloc, p->page_count) ;
403         return -1;
404     }
405 
406     p->normal_num = be32_to_cpu(packet->normal_pages);
407     if (p->normal_num > packet->pages_alloc) {
408         error_setg(errp, "multifd: received packet "
409                    "with %u pages and expected maximum pages are %u",
410                    p->normal_num, packet->pages_alloc) ;
411         return -1;
412     }
413 
414     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
415     p->packet_num = be64_to_cpu(packet->packet_num);
416     p->packets_recved++;
417     p->total_normal_pages += p->normal_num;
418 
419     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
420                        p->next_packet_size);
421 
422     if (p->normal_num == 0) {
423         return 0;
424     }
425 
426     /* make sure that ramblock is 0 terminated */
427     packet->ramblock[255] = 0;
428     p->block = qemu_ram_block_by_name(packet->ramblock);
429     if (!p->block) {
430         error_setg(errp, "multifd: unknown ram block %s",
431                    packet->ramblock);
432         return -1;
433     }
434 
435     p->host = p->block->host;
436     for (i = 0; i < p->normal_num; i++) {
437         uint64_t offset = be64_to_cpu(packet->offset[i]);
438 
439         if (offset > (p->block->used_length - p->page_size)) {
440             error_setg(errp, "multifd: offset too long %" PRIu64
441                        " (max " RAM_ADDR_FMT ")",
442                        offset, p->block->used_length);
443             return -1;
444         }
445         p->normal[i] = offset;
446     }
447 
448     return 0;
449 }
450 
451 static bool multifd_send_should_exit(void)
452 {
453     return qatomic_read(&multifd_send_state->exiting);
454 }
455 
456 static bool multifd_recv_should_exit(void)
457 {
458     return qatomic_read(&multifd_recv_state->exiting);
459 }
460 
461 /*
462  * The migration thread can wait on either of the two semaphores.  This
463  * function can be used to kick the main thread out of waiting on either of
464  * them.  Should mostly only be called when something wrong happened with
465  * the current multifd send thread.
466  */
467 static void multifd_send_kick_main(MultiFDSendParams *p)
468 {
469     qemu_sem_post(&p->sem_sync);
470     qemu_sem_post(&multifd_send_state->channels_ready);
471 }
472 
473 /*
474  * How we use multifd_send_state->pages and channel->pages?
475  *
476  * We create a pages for each channel, and a main one.  Each time that
477  * we need to send a batch of pages we interchange the ones between
478  * multifd_send_state and the channel that is sending it.  There are
479  * two reasons for that:
480  *    - to not have to do so many mallocs during migration
481  *    - to make easier to know what to free at the end of migration
482  *
483  * This way we always know who is the owner of each "pages" struct,
484  * and we don't need any locking.  It belongs to the migration thread
485  * or to the channel thread.  Switching is safe because the migration
486  * thread is using the channel mutex when changing it, and the channel
487  * have to had finish with its own, otherwise pending_job can't be
488  * false.
489  *
490  * Returns true if succeed, false otherwise.
491  */
492 static bool multifd_send_pages(void)
493 {
494     int i;
495     static int next_channel;
496     MultiFDSendParams *p = NULL; /* make happy gcc */
497     MultiFDPages_t *pages = multifd_send_state->pages;
498 
499     if (multifd_send_should_exit()) {
500         return false;
501     }
502 
503     /* We wait here, until at least one channel is ready */
504     qemu_sem_wait(&multifd_send_state->channels_ready);
505 
506     /*
507      * next_channel can remain from a previous migration that was
508      * using more channels, so ensure it doesn't overflow if the
509      * limit is lower now.
510      */
511     next_channel %= migrate_multifd_channels();
512     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
513         if (multifd_send_should_exit()) {
514             return false;
515         }
516         p = &multifd_send_state->params[i];
517         /*
518          * Lockless read to p->pending_job is safe, because only multifd
519          * sender thread can clear it.
520          */
521         if (qatomic_read(&p->pending_job) == false) {
522             next_channel = (i + 1) % migrate_multifd_channels();
523             break;
524         }
525     }
526 
527     /*
528      * Make sure we read p->pending_job before all the rest.  Pairs with
529      * qatomic_store_release() in multifd_send_thread().
530      */
531     smp_mb_acquire();
532     assert(!p->pages->num);
533     multifd_send_state->pages = p->pages;
534     p->pages = pages;
535     /*
536      * Making sure p->pages is setup before marking pending_job=true. Pairs
537      * with the qatomic_load_acquire() in multifd_send_thread().
538      */
539     qatomic_store_release(&p->pending_job, true);
540     qemu_sem_post(&p->sem);
541 
542     return true;
543 }
544 
545 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
546 {
547     return pages->num == 0;
548 }
549 
550 static inline bool multifd_queue_full(MultiFDPages_t *pages)
551 {
552     return pages->num == pages->allocated;
553 }
554 
555 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
556 {
557     pages->offset[pages->num++] = offset;
558 }
559 
560 /* Returns true if enqueue successful, false otherwise */
561 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
562 {
563     MultiFDPages_t *pages;
564 
565 retry:
566     pages = multifd_send_state->pages;
567 
568     /* If the queue is empty, we can already enqueue now */
569     if (multifd_queue_empty(pages)) {
570         pages->block = block;
571         multifd_enqueue(pages, offset);
572         return true;
573     }
574 
575     /*
576      * Not empty, meanwhile we need a flush.  It can because of either:
577      *
578      * (1) The page is not on the same ramblock of previous ones, or,
579      * (2) The queue is full.
580      *
581      * After flush, always retry.
582      */
583     if (pages->block != block || multifd_queue_full(pages)) {
584         if (!multifd_send_pages()) {
585             return false;
586         }
587         goto retry;
588     }
589 
590     /* Not empty, and we still have space, do it! */
591     multifd_enqueue(pages, offset);
592     return true;
593 }
594 
595 /* Multifd send side hit an error; remember it and prepare to quit */
596 static void multifd_send_set_error(Error *err)
597 {
598     /*
599      * We don't want to exit each threads twice.  Depending on where
600      * we get the error, or if there are two independent errors in two
601      * threads at the same time, we can end calling this function
602      * twice.
603      */
604     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
605         return;
606     }
607 
608     if (err) {
609         MigrationState *s = migrate_get_current();
610         migrate_set_error(s, err);
611         if (s->state == MIGRATION_STATUS_SETUP ||
612             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
613             s->state == MIGRATION_STATUS_DEVICE ||
614             s->state == MIGRATION_STATUS_ACTIVE) {
615             migrate_set_state(&s->state, s->state,
616                               MIGRATION_STATUS_FAILED);
617         }
618     }
619 }
620 
621 static void multifd_send_terminate_threads(void)
622 {
623     int i;
624 
625     trace_multifd_send_terminate_threads();
626 
627     /*
628      * Tell everyone we're quitting.  No xchg() needed here; we simply
629      * always set it.
630      */
631     qatomic_set(&multifd_send_state->exiting, 1);
632 
633     /*
634      * Firstly, kick all threads out; no matter whether they are just idle,
635      * or blocked in an IO system call.
636      */
637     for (i = 0; i < migrate_multifd_channels(); i++) {
638         MultiFDSendParams *p = &multifd_send_state->params[i];
639 
640         qemu_sem_post(&p->sem);
641         if (p->c) {
642             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
643         }
644     }
645 
646     /*
647      * Finally recycle all the threads.
648      */
649     for (i = 0; i < migrate_multifd_channels(); i++) {
650         MultiFDSendParams *p = &multifd_send_state->params[i];
651 
652         if (p->tls_thread_created) {
653             qemu_thread_join(&p->tls_thread);
654         }
655 
656         if (p->thread_created) {
657             qemu_thread_join(&p->thread);
658         }
659     }
660 }
661 
662 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
663 {
664     if (p->c) {
665         migration_ioc_unregister_yank(p->c);
666         object_unref(OBJECT(p->c));
667         p->c = NULL;
668     }
669     qemu_sem_destroy(&p->sem);
670     qemu_sem_destroy(&p->sem_sync);
671     g_free(p->name);
672     p->name = NULL;
673     multifd_pages_clear(p->pages);
674     p->pages = NULL;
675     p->packet_len = 0;
676     g_free(p->packet);
677     p->packet = NULL;
678     g_free(p->iov);
679     p->iov = NULL;
680     multifd_send_state->ops->send_cleanup(p, errp);
681 
682     return *errp == NULL;
683 }
684 
685 static void multifd_send_cleanup_state(void)
686 {
687     socket_cleanup_outgoing_migration();
688     qemu_sem_destroy(&multifd_send_state->channels_created);
689     qemu_sem_destroy(&multifd_send_state->channels_ready);
690     g_free(multifd_send_state->params);
691     multifd_send_state->params = NULL;
692     multifd_pages_clear(multifd_send_state->pages);
693     multifd_send_state->pages = NULL;
694     g_free(multifd_send_state);
695     multifd_send_state = NULL;
696 }
697 
698 void multifd_send_shutdown(void)
699 {
700     int i;
701 
702     if (!migrate_multifd()) {
703         return;
704     }
705 
706     multifd_send_terminate_threads();
707 
708     for (i = 0; i < migrate_multifd_channels(); i++) {
709         MultiFDSendParams *p = &multifd_send_state->params[i];
710         Error *local_err = NULL;
711 
712         if (!multifd_send_cleanup_channel(p, &local_err)) {
713             migrate_set_error(migrate_get_current(), local_err);
714             error_free(local_err);
715         }
716     }
717 
718     multifd_send_cleanup_state();
719 }
720 
721 static int multifd_zero_copy_flush(QIOChannel *c)
722 {
723     int ret;
724     Error *err = NULL;
725 
726     ret = qio_channel_flush(c, &err);
727     if (ret < 0) {
728         error_report_err(err);
729         return -1;
730     }
731     if (ret == 1) {
732         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
733     }
734 
735     return ret;
736 }
737 
738 int multifd_send_sync_main(void)
739 {
740     int i;
741     bool flush_zero_copy;
742 
743     if (!migrate_multifd()) {
744         return 0;
745     }
746     if (multifd_send_state->pages->num) {
747         if (!multifd_send_pages()) {
748             error_report("%s: multifd_send_pages fail", __func__);
749             return -1;
750         }
751     }
752 
753     flush_zero_copy = migrate_zero_copy_send();
754 
755     for (i = 0; i < migrate_multifd_channels(); i++) {
756         MultiFDSendParams *p = &multifd_send_state->params[i];
757 
758         if (multifd_send_should_exit()) {
759             return -1;
760         }
761 
762         trace_multifd_send_sync_main_signal(p->id);
763 
764         /*
765          * We should be the only user so far, so not possible to be set by
766          * others concurrently.
767          */
768         assert(qatomic_read(&p->pending_sync) == false);
769         qatomic_set(&p->pending_sync, true);
770         qemu_sem_post(&p->sem);
771     }
772     for (i = 0; i < migrate_multifd_channels(); i++) {
773         MultiFDSendParams *p = &multifd_send_state->params[i];
774 
775         if (multifd_send_should_exit()) {
776             return -1;
777         }
778 
779         qemu_sem_wait(&multifd_send_state->channels_ready);
780         trace_multifd_send_sync_main_wait(p->id);
781         qemu_sem_wait(&p->sem_sync);
782 
783         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
784             return -1;
785         }
786     }
787     trace_multifd_send_sync_main(multifd_send_state->packet_num);
788 
789     return 0;
790 }
791 
792 static void *multifd_send_thread(void *opaque)
793 {
794     MultiFDSendParams *p = opaque;
795     MigrationThread *thread = NULL;
796     Error *local_err = NULL;
797     int ret = 0;
798 
799     thread = migration_threads_add(p->name, qemu_get_thread_id());
800 
801     trace_multifd_send_thread_start(p->id);
802     rcu_register_thread();
803 
804     if (multifd_send_initial_packet(p, &local_err) < 0) {
805         ret = -1;
806         goto out;
807     }
808 
809     while (true) {
810         qemu_sem_post(&multifd_send_state->channels_ready);
811         qemu_sem_wait(&p->sem);
812 
813         if (multifd_send_should_exit()) {
814             break;
815         }
816 
817         /*
818          * Read pending_job flag before p->pages.  Pairs with the
819          * qatomic_store_release() in multifd_send_pages().
820          */
821         if (qatomic_load_acquire(&p->pending_job)) {
822             MultiFDPages_t *pages = p->pages;
823 
824             p->iovs_num = 0;
825             assert(pages->num);
826 
827             ret = multifd_send_state->ops->send_prepare(p, &local_err);
828             if (ret != 0) {
829                 break;
830             }
831 
832             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
833                                               0, p->write_flags, &local_err);
834             if (ret != 0) {
835                 break;
836             }
837 
838             stat64_add(&mig_stats.multifd_bytes,
839                        p->next_packet_size + p->packet_len);
840 
841             multifd_pages_reset(p->pages);
842             p->next_packet_size = 0;
843 
844             /*
845              * Making sure p->pages is published before saying "we're
846              * free".  Pairs with the smp_mb_acquire() in
847              * multifd_send_pages().
848              */
849             qatomic_store_release(&p->pending_job, false);
850         } else {
851             /*
852              * If not a normal job, must be a sync request.  Note that
853              * pending_sync is a standalone flag (unlike pending_job), so
854              * it doesn't require explicit memory barriers.
855              */
856             assert(qatomic_read(&p->pending_sync));
857             p->flags = MULTIFD_FLAG_SYNC;
858             multifd_send_fill_packet(p);
859             ret = qio_channel_write_all(p->c, (void *)p->packet,
860                                         p->packet_len, &local_err);
861             if (ret != 0) {
862                 break;
863             }
864             /* p->next_packet_size will always be zero for a SYNC packet */
865             stat64_add(&mig_stats.multifd_bytes, p->packet_len);
866             p->flags = 0;
867             qatomic_set(&p->pending_sync, false);
868             qemu_sem_post(&p->sem_sync);
869         }
870     }
871 
872 out:
873     if (ret) {
874         assert(local_err);
875         trace_multifd_send_error(p->id);
876         multifd_send_set_error(local_err);
877         multifd_send_kick_main(p);
878         error_free(local_err);
879     }
880 
881     rcu_unregister_thread();
882     migration_threads_remove(thread);
883     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
884 
885     return NULL;
886 }
887 
888 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
889 
890 typedef struct {
891     MultiFDSendParams *p;
892     QIOChannelTLS *tioc;
893 } MultiFDTLSThreadArgs;
894 
895 static void *multifd_tls_handshake_thread(void *opaque)
896 {
897     MultiFDTLSThreadArgs *args = opaque;
898 
899     qio_channel_tls_handshake(args->tioc,
900                               multifd_new_send_channel_async,
901                               args->p,
902                               NULL,
903                               NULL);
904     g_free(args);
905 
906     return NULL;
907 }
908 
909 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
910                                         QIOChannel *ioc,
911                                         Error **errp)
912 {
913     MigrationState *s = migrate_get_current();
914     const char *hostname = s->hostname;
915     MultiFDTLSThreadArgs *args;
916     QIOChannelTLS *tioc;
917 
918     tioc = migration_tls_client_create(ioc, hostname, errp);
919     if (!tioc) {
920         return false;
921     }
922 
923     /*
924      * Ownership of the socket channel now transfers to the newly
925      * created TLS channel, which has already taken a reference.
926      */
927     object_unref(OBJECT(ioc));
928     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
929     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
930 
931     args = g_new0(MultiFDTLSThreadArgs, 1);
932     args->tioc = tioc;
933     args->p = p;
934 
935     p->tls_thread_created = true;
936     qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
937                        multifd_tls_handshake_thread, args,
938                        QEMU_THREAD_JOINABLE);
939     return true;
940 }
941 
942 static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
943 {
944     qio_channel_set_delay(ioc, false);
945 
946     migration_ioc_register_yank(ioc);
947     /* Setup p->c only if the channel is completely setup */
948     p->c = ioc;
949 
950     p->thread_created = true;
951     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
952                        QEMU_THREAD_JOINABLE);
953 }
954 
955 /*
956  * When TLS is enabled this function is called once to establish the
957  * TLS connection and a second time after the TLS handshake to create
958  * the multifd channel. Without TLS it goes straight into the channel
959  * creation.
960  */
961 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
962 {
963     MultiFDSendParams *p = opaque;
964     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
965     Error *local_err = NULL;
966     bool ret;
967 
968     trace_multifd_new_send_channel_async(p->id);
969 
970     if (qio_task_propagate_error(task, &local_err)) {
971         ret = false;
972         goto out;
973     }
974 
975     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
976                                        migrate_get_current()->hostname);
977 
978     if (migrate_channel_requires_tls_upgrade(ioc)) {
979         ret = multifd_tls_channel_connect(p, ioc, &local_err);
980         if (ret) {
981             return;
982         }
983     } else {
984         multifd_channel_connect(p, ioc);
985         ret = true;
986     }
987 
988 out:
989     /*
990      * Here we're not interested whether creation succeeded, only that
991      * it happened at all.
992      */
993     qemu_sem_post(&multifd_send_state->channels_created);
994 
995     if (ret) {
996         return;
997     }
998 
999     trace_multifd_new_send_channel_async_error(p->id, local_err);
1000     multifd_send_set_error(local_err);
1001     /*
1002      * For error cases (TLS or non-TLS), IO channel is always freed here
1003      * rather than when cleanup multifd: since p->c is not set, multifd
1004      * cleanup code doesn't even know its existence.
1005      */
1006     object_unref(OBJECT(ioc));
1007     error_free(local_err);
1008 }
1009 
1010 static void multifd_new_send_channel_create(gpointer opaque)
1011 {
1012     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1013 }
1014 
1015 bool multifd_send_setup(void)
1016 {
1017     MigrationState *s = migrate_get_current();
1018     Error *local_err = NULL;
1019     int thread_count, ret = 0;
1020     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1021     uint8_t i;
1022 
1023     if (!migrate_multifd()) {
1024         return true;
1025     }
1026 
1027     thread_count = migrate_multifd_channels();
1028     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1029     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1030     multifd_send_state->pages = multifd_pages_init(page_count);
1031     qemu_sem_init(&multifd_send_state->channels_created, 0);
1032     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1033     qatomic_set(&multifd_send_state->exiting, 0);
1034     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1035 
1036     for (i = 0; i < thread_count; i++) {
1037         MultiFDSendParams *p = &multifd_send_state->params[i];
1038 
1039         qemu_sem_init(&p->sem, 0);
1040         qemu_sem_init(&p->sem_sync, 0);
1041         p->id = i;
1042         p->pages = multifd_pages_init(page_count);
1043         p->packet_len = sizeof(MultiFDPacket_t)
1044                       + sizeof(uint64_t) * page_count;
1045         p->packet = g_malloc0(p->packet_len);
1046         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1047         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1048         p->name = g_strdup_printf("multifdsend_%d", i);
1049         /* We need one extra place for the packet header */
1050         p->iov = g_new0(struct iovec, page_count + 1);
1051         p->page_size = qemu_target_page_size();
1052         p->page_count = page_count;
1053         p->write_flags = 0;
1054         multifd_new_send_channel_create(p);
1055     }
1056 
1057     /*
1058      * Wait until channel creation has started for all channels. The
1059      * creation can still fail, but no more channels will be created
1060      * past this point.
1061      */
1062     for (i = 0; i < thread_count; i++) {
1063         qemu_sem_wait(&multifd_send_state->channels_created);
1064     }
1065 
1066     for (i = 0; i < thread_count; i++) {
1067         MultiFDSendParams *p = &multifd_send_state->params[i];
1068 
1069         ret = multifd_send_state->ops->send_setup(p, &local_err);
1070         if (ret) {
1071             break;
1072         }
1073     }
1074 
1075     if (ret) {
1076         migrate_set_error(s, local_err);
1077         error_report_err(local_err);
1078         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1079                           MIGRATION_STATUS_FAILED);
1080         return false;
1081     }
1082 
1083     return true;
1084 }
1085 
1086 static void multifd_recv_terminate_threads(Error *err)
1087 {
1088     int i;
1089 
1090     trace_multifd_recv_terminate_threads(err != NULL);
1091 
1092     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1093         return;
1094     }
1095 
1096     if (err) {
1097         MigrationState *s = migrate_get_current();
1098         migrate_set_error(s, err);
1099         if (s->state == MIGRATION_STATUS_SETUP ||
1100             s->state == MIGRATION_STATUS_ACTIVE) {
1101             migrate_set_state(&s->state, s->state,
1102                               MIGRATION_STATUS_FAILED);
1103         }
1104     }
1105 
1106     for (i = 0; i < migrate_multifd_channels(); i++) {
1107         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1108 
1109         /*
1110          * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1111          * however try to wakeup it without harm in cleanup phase.
1112          */
1113         qemu_sem_post(&p->sem_sync);
1114 
1115         /*
1116          * We could arrive here for two reasons:
1117          *  - normal quit, i.e. everything went fine, just finished
1118          *  - error quit: We close the channels so the channel threads
1119          *    finish the qio_channel_read_all_eof()
1120          */
1121         if (p->c) {
1122             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1123         }
1124     }
1125 }
1126 
1127 void multifd_recv_shutdown(void)
1128 {
1129     if (migrate_multifd()) {
1130         multifd_recv_terminate_threads(NULL);
1131     }
1132 }
1133 
1134 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1135 {
1136     migration_ioc_unregister_yank(p->c);
1137     object_unref(OBJECT(p->c));
1138     p->c = NULL;
1139     qemu_mutex_destroy(&p->mutex);
1140     qemu_sem_destroy(&p->sem_sync);
1141     g_free(p->name);
1142     p->name = NULL;
1143     p->packet_len = 0;
1144     g_free(p->packet);
1145     p->packet = NULL;
1146     g_free(p->iov);
1147     p->iov = NULL;
1148     g_free(p->normal);
1149     p->normal = NULL;
1150     multifd_recv_state->ops->recv_cleanup(p);
1151 }
1152 
1153 static void multifd_recv_cleanup_state(void)
1154 {
1155     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1156     g_free(multifd_recv_state->params);
1157     multifd_recv_state->params = NULL;
1158     g_free(multifd_recv_state);
1159     multifd_recv_state = NULL;
1160 }
1161 
1162 void multifd_recv_cleanup(void)
1163 {
1164     int i;
1165 
1166     if (!migrate_multifd()) {
1167         return;
1168     }
1169     multifd_recv_terminate_threads(NULL);
1170     for (i = 0; i < migrate_multifd_channels(); i++) {
1171         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1172 
1173         if (p->thread_created) {
1174             qemu_thread_join(&p->thread);
1175         }
1176     }
1177     for (i = 0; i < migrate_multifd_channels(); i++) {
1178         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1179     }
1180     multifd_recv_cleanup_state();
1181 }
1182 
1183 void multifd_recv_sync_main(void)
1184 {
1185     int i;
1186 
1187     if (!migrate_multifd()) {
1188         return;
1189     }
1190     for (i = 0; i < migrate_multifd_channels(); i++) {
1191         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1192 
1193         trace_multifd_recv_sync_main_wait(p->id);
1194         qemu_sem_wait(&multifd_recv_state->sem_sync);
1195     }
1196     for (i = 0; i < migrate_multifd_channels(); i++) {
1197         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1198 
1199         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1200             if (multifd_recv_state->packet_num < p->packet_num) {
1201                 multifd_recv_state->packet_num = p->packet_num;
1202             }
1203         }
1204         trace_multifd_recv_sync_main_signal(p->id);
1205         qemu_sem_post(&p->sem_sync);
1206     }
1207     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1208 }
1209 
1210 static void *multifd_recv_thread(void *opaque)
1211 {
1212     MultiFDRecvParams *p = opaque;
1213     Error *local_err = NULL;
1214     int ret;
1215 
1216     trace_multifd_recv_thread_start(p->id);
1217     rcu_register_thread();
1218 
1219     while (true) {
1220         uint32_t flags;
1221 
1222         if (multifd_recv_should_exit()) {
1223             break;
1224         }
1225 
1226         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1227                                        p->packet_len, &local_err);
1228         if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1229             break;
1230         }
1231 
1232         qemu_mutex_lock(&p->mutex);
1233         ret = multifd_recv_unfill_packet(p, &local_err);
1234         if (ret) {
1235             qemu_mutex_unlock(&p->mutex);
1236             break;
1237         }
1238 
1239         flags = p->flags;
1240         /* recv methods don't know how to handle the SYNC flag */
1241         p->flags &= ~MULTIFD_FLAG_SYNC;
1242         qemu_mutex_unlock(&p->mutex);
1243 
1244         if (p->normal_num) {
1245             ret = multifd_recv_state->ops->recv_pages(p, &local_err);
1246             if (ret != 0) {
1247                 break;
1248             }
1249         }
1250 
1251         if (flags & MULTIFD_FLAG_SYNC) {
1252             qemu_sem_post(&multifd_recv_state->sem_sync);
1253             qemu_sem_wait(&p->sem_sync);
1254         }
1255     }
1256 
1257     if (local_err) {
1258         multifd_recv_terminate_threads(local_err);
1259         error_free(local_err);
1260     }
1261 
1262     rcu_unregister_thread();
1263     trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1264 
1265     return NULL;
1266 }
1267 
1268 int multifd_recv_setup(Error **errp)
1269 {
1270     int thread_count;
1271     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1272     uint8_t i;
1273 
1274     /*
1275      * Return successfully if multiFD recv state is already initialised
1276      * or multiFD is not enabled.
1277      */
1278     if (multifd_recv_state || !migrate_multifd()) {
1279         return 0;
1280     }
1281 
1282     thread_count = migrate_multifd_channels();
1283     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1284     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1285     qatomic_set(&multifd_recv_state->count, 0);
1286     qatomic_set(&multifd_recv_state->exiting, 0);
1287     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1288     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1289 
1290     for (i = 0; i < thread_count; i++) {
1291         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1292 
1293         qemu_mutex_init(&p->mutex);
1294         qemu_sem_init(&p->sem_sync, 0);
1295         p->id = i;
1296         p->packet_len = sizeof(MultiFDPacket_t)
1297                       + sizeof(uint64_t) * page_count;
1298         p->packet = g_malloc0(p->packet_len);
1299         p->name = g_strdup_printf("multifdrecv_%d", i);
1300         p->iov = g_new0(struct iovec, page_count);
1301         p->normal = g_new0(ram_addr_t, page_count);
1302         p->page_count = page_count;
1303         p->page_size = qemu_target_page_size();
1304     }
1305 
1306     for (i = 0; i < thread_count; i++) {
1307         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1308         int ret;
1309 
1310         ret = multifd_recv_state->ops->recv_setup(p, errp);
1311         if (ret) {
1312             return ret;
1313         }
1314     }
1315     return 0;
1316 }
1317 
1318 bool multifd_recv_all_channels_created(void)
1319 {
1320     int thread_count = migrate_multifd_channels();
1321 
1322     if (!migrate_multifd()) {
1323         return true;
1324     }
1325 
1326     if (!multifd_recv_state) {
1327         /* Called before any connections created */
1328         return false;
1329     }
1330 
1331     return thread_count == qatomic_read(&multifd_recv_state->count);
1332 }
1333 
1334 /*
1335  * Try to receive all multifd channels to get ready for the migration.
1336  * Sets @errp when failing to receive the current channel.
1337  */
1338 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1339 {
1340     MultiFDRecvParams *p;
1341     Error *local_err = NULL;
1342     int id;
1343 
1344     id = multifd_recv_initial_packet(ioc, &local_err);
1345     if (id < 0) {
1346         multifd_recv_terminate_threads(local_err);
1347         error_propagate_prepend(errp, local_err,
1348                                 "failed to receive packet"
1349                                 " via multifd channel %d: ",
1350                                 qatomic_read(&multifd_recv_state->count));
1351         return;
1352     }
1353     trace_multifd_recv_new_channel(id);
1354 
1355     p = &multifd_recv_state->params[id];
1356     if (p->c != NULL) {
1357         error_setg(&local_err, "multifd: received id '%d' already setup'",
1358                    id);
1359         multifd_recv_terminate_threads(local_err);
1360         error_propagate(errp, local_err);
1361         return;
1362     }
1363     p->c = ioc;
1364     object_ref(OBJECT(ioc));
1365 
1366     p->thread_created = true;
1367     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1368                        QEMU_THREAD_JOINABLE);
1369     qatomic_inc(&multifd_recv_state->count);
1370 }
1371