xref: /qemu/migration/multifd.c (revision a0e93dd8)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "migration-stats.h"
23 #include "socket.h"
24 #include "tls.h"
25 #include "qemu-file.h"
26 #include "trace.h"
27 #include "multifd.h"
28 #include "threadinfo.h"
29 #include "options.h"
30 #include "qemu/yank.h"
31 #include "io/channel-socket.h"
32 #include "yank_functions.h"
33 
34 /* Multiple fd's */
35 
36 #define MULTIFD_MAGIC 0x11223344U
37 #define MULTIFD_VERSION 1
38 
39 typedef struct {
40     uint32_t magic;
41     uint32_t version;
42     unsigned char uuid[16]; /* QemuUUID */
43     uint8_t id;
44     uint8_t unused1[7];     /* Reserved for future use */
45     uint64_t unused2[4];    /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47 
48 struct {
49     MultiFDSendParams *params;
50     /* array of pages to sent */
51     MultiFDPages_t *pages;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 /* Multifd without compression */
83 
84 /**
85  * nocomp_send_setup: setup send side
86  *
87  * @p: Params for the channel that we are using
88  * @errp: pointer to an error
89  */
90 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
91 {
92     if (migrate_zero_copy_send()) {
93         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
94     }
95 
96     return 0;
97 }
98 
99 /**
100  * nocomp_send_cleanup: cleanup send side
101  *
102  * For no compression this function does nothing.
103  *
104  * @p: Params for the channel that we are using
105  * @errp: pointer to an error
106  */
107 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
108 {
109     return;
110 }
111 
112 /**
113  * nocomp_send_prepare: prepare date to be able to send
114  *
115  * For no compression we just have to calculate the size of the
116  * packet.
117  *
118  * Returns 0 for success or -1 for error
119  *
120  * @p: Params for the channel that we are using
121  * @errp: pointer to an error
122  */
123 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
124 {
125     bool use_zero_copy_send = migrate_zero_copy_send();
126     MultiFDPages_t *pages = p->pages;
127     int ret;
128 
129     if (!use_zero_copy_send) {
130         /*
131          * Only !zerocopy needs the header in IOV; zerocopy will
132          * send it separately.
133          */
134         multifd_send_prepare_header(p);
135     }
136 
137     for (int i = 0; i < pages->num; i++) {
138         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
139         p->iov[p->iovs_num].iov_len = p->page_size;
140         p->iovs_num++;
141     }
142 
143     p->next_packet_size = pages->num * p->page_size;
144     p->flags |= MULTIFD_FLAG_NOCOMP;
145 
146     multifd_send_fill_packet(p);
147 
148     if (use_zero_copy_send) {
149         /* Send header first, without zerocopy */
150         ret = qio_channel_write_all(p->c, (void *)p->packet,
151                                     p->packet_len, errp);
152         if (ret != 0) {
153             return -1;
154         }
155     }
156 
157     return 0;
158 }
159 
160 /**
161  * nocomp_recv_setup: setup receive side
162  *
163  * For no compression this function does nothing.
164  *
165  * Returns 0 for success or -1 for error
166  *
167  * @p: Params for the channel that we are using
168  * @errp: pointer to an error
169  */
170 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
171 {
172     return 0;
173 }
174 
175 /**
176  * nocomp_recv_cleanup: setup receive side
177  *
178  * For no compression this function does nothing.
179  *
180  * @p: Params for the channel that we are using
181  */
182 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
183 {
184 }
185 
186 /**
187  * nocomp_recv_pages: read the data from the channel into actual pages
188  *
189  * For no compression we just need to read things into the correct place.
190  *
191  * Returns 0 for success or -1 for error
192  *
193  * @p: Params for the channel that we are using
194  * @errp: pointer to an error
195  */
196 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
197 {
198     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
199 
200     if (flags != MULTIFD_FLAG_NOCOMP) {
201         error_setg(errp, "multifd %u: flags received %x flags expected %x",
202                    p->id, flags, MULTIFD_FLAG_NOCOMP);
203         return -1;
204     }
205     for (int i = 0; i < p->normal_num; i++) {
206         p->iov[i].iov_base = p->host + p->normal[i];
207         p->iov[i].iov_len = p->page_size;
208     }
209     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
210 }
211 
212 static MultiFDMethods multifd_nocomp_ops = {
213     .send_setup = nocomp_send_setup,
214     .send_cleanup = nocomp_send_cleanup,
215     .send_prepare = nocomp_send_prepare,
216     .recv_setup = nocomp_recv_setup,
217     .recv_cleanup = nocomp_recv_cleanup,
218     .recv_pages = nocomp_recv_pages
219 };
220 
221 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
222     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
223 };
224 
225 void multifd_register_ops(int method, MultiFDMethods *ops)
226 {
227     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
228     multifd_ops[method] = ops;
229 }
230 
231 /* Reset a MultiFDPages_t* object for the next use */
232 static void multifd_pages_reset(MultiFDPages_t *pages)
233 {
234     /*
235      * We don't need to touch offset[] array, because it will be
236      * overwritten later when reused.
237      */
238     pages->num = 0;
239     pages->block = NULL;
240 }
241 
242 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
243 {
244     MultiFDInit_t msg = {};
245     size_t size = sizeof(msg);
246     int ret;
247 
248     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
249     msg.version = cpu_to_be32(MULTIFD_VERSION);
250     msg.id = p->id;
251     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
252 
253     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
254     if (ret != 0) {
255         return -1;
256     }
257     stat64_add(&mig_stats.multifd_bytes, size);
258     return 0;
259 }
260 
261 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
262 {
263     MultiFDInit_t msg;
264     int ret;
265 
266     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
267     if (ret != 0) {
268         return -1;
269     }
270 
271     msg.magic = be32_to_cpu(msg.magic);
272     msg.version = be32_to_cpu(msg.version);
273 
274     if (msg.magic != MULTIFD_MAGIC) {
275         error_setg(errp, "multifd: received packet magic %x "
276                    "expected %x", msg.magic, MULTIFD_MAGIC);
277         return -1;
278     }
279 
280     if (msg.version != MULTIFD_VERSION) {
281         error_setg(errp, "multifd: received packet version %u "
282                    "expected %u", msg.version, MULTIFD_VERSION);
283         return -1;
284     }
285 
286     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
287         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
288         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
289 
290         error_setg(errp, "multifd: received uuid '%s' and expected "
291                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
292         g_free(uuid);
293         g_free(msg_uuid);
294         return -1;
295     }
296 
297     if (msg.id > migrate_multifd_channels()) {
298         error_setg(errp, "multifd: received channel id %u is greater than "
299                    "number of channels %u", msg.id, migrate_multifd_channels());
300         return -1;
301     }
302 
303     return msg.id;
304 }
305 
306 static MultiFDPages_t *multifd_pages_init(uint32_t n)
307 {
308     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
309 
310     pages->allocated = n;
311     pages->offset = g_new0(ram_addr_t, n);
312 
313     return pages;
314 }
315 
316 static void multifd_pages_clear(MultiFDPages_t *pages)
317 {
318     multifd_pages_reset(pages);
319     pages->allocated = 0;
320     g_free(pages->offset);
321     pages->offset = NULL;
322     g_free(pages);
323 }
324 
325 void multifd_send_fill_packet(MultiFDSendParams *p)
326 {
327     MultiFDPacket_t *packet = p->packet;
328     MultiFDPages_t *pages = p->pages;
329     uint64_t packet_num;
330     int i;
331 
332     packet->flags = cpu_to_be32(p->flags);
333     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
334     packet->normal_pages = cpu_to_be32(pages->num);
335     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
336 
337     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
338     packet->packet_num = cpu_to_be64(packet_num);
339 
340     if (pages->block) {
341         strncpy(packet->ramblock, pages->block->idstr, 256);
342     }
343 
344     for (i = 0; i < pages->num; i++) {
345         /* there are architectures where ram_addr_t is 32 bit */
346         uint64_t temp = pages->offset[i];
347 
348         packet->offset[i] = cpu_to_be64(temp);
349     }
350 
351     p->packets_sent++;
352     p->total_normal_pages += pages->num;
353 
354     trace_multifd_send(p->id, packet_num, pages->num, p->flags,
355                        p->next_packet_size);
356 }
357 
358 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
359 {
360     MultiFDPacket_t *packet = p->packet;
361     int i;
362 
363     packet->magic = be32_to_cpu(packet->magic);
364     if (packet->magic != MULTIFD_MAGIC) {
365         error_setg(errp, "multifd: received packet "
366                    "magic %x and expected magic %x",
367                    packet->magic, MULTIFD_MAGIC);
368         return -1;
369     }
370 
371     packet->version = be32_to_cpu(packet->version);
372     if (packet->version != MULTIFD_VERSION) {
373         error_setg(errp, "multifd: received packet "
374                    "version %u and expected version %u",
375                    packet->version, MULTIFD_VERSION);
376         return -1;
377     }
378 
379     p->flags = be32_to_cpu(packet->flags);
380 
381     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
382     /*
383      * If we received a packet that is 100 times bigger than expected
384      * just stop migration.  It is a magic number.
385      */
386     if (packet->pages_alloc > p->page_count) {
387         error_setg(errp, "multifd: received packet "
388                    "with size %u and expected a size of %u",
389                    packet->pages_alloc, p->page_count) ;
390         return -1;
391     }
392 
393     p->normal_num = be32_to_cpu(packet->normal_pages);
394     if (p->normal_num > packet->pages_alloc) {
395         error_setg(errp, "multifd: received packet "
396                    "with %u pages and expected maximum pages are %u",
397                    p->normal_num, packet->pages_alloc) ;
398         return -1;
399     }
400 
401     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
402     p->packet_num = be64_to_cpu(packet->packet_num);
403     p->packets_recved++;
404     p->total_normal_pages += p->normal_num;
405 
406     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
407                        p->next_packet_size);
408 
409     if (p->normal_num == 0) {
410         return 0;
411     }
412 
413     /* make sure that ramblock is 0 terminated */
414     packet->ramblock[255] = 0;
415     p->block = qemu_ram_block_by_name(packet->ramblock);
416     if (!p->block) {
417         error_setg(errp, "multifd: unknown ram block %s",
418                    packet->ramblock);
419         return -1;
420     }
421 
422     p->host = p->block->host;
423     for (i = 0; i < p->normal_num; i++) {
424         uint64_t offset = be64_to_cpu(packet->offset[i]);
425 
426         if (offset > (p->block->used_length - p->page_size)) {
427             error_setg(errp, "multifd: offset too long %" PRIu64
428                        " (max " RAM_ADDR_FMT ")",
429                        offset, p->block->used_length);
430             return -1;
431         }
432         p->normal[i] = offset;
433     }
434 
435     return 0;
436 }
437 
438 static bool multifd_send_should_exit(void)
439 {
440     return qatomic_read(&multifd_send_state->exiting);
441 }
442 
443 /*
444  * The migration thread can wait on either of the two semaphores.  This
445  * function can be used to kick the main thread out of waiting on either of
446  * them.  Should mostly only be called when something wrong happened with
447  * the current multifd send thread.
448  */
449 static void multifd_send_kick_main(MultiFDSendParams *p)
450 {
451     qemu_sem_post(&p->sem_sync);
452     qemu_sem_post(&multifd_send_state->channels_ready);
453 }
454 
455 /*
456  * How we use multifd_send_state->pages and channel->pages?
457  *
458  * We create a pages for each channel, and a main one.  Each time that
459  * we need to send a batch of pages we interchange the ones between
460  * multifd_send_state and the channel that is sending it.  There are
461  * two reasons for that:
462  *    - to not have to do so many mallocs during migration
463  *    - to make easier to know what to free at the end of migration
464  *
465  * This way we always know who is the owner of each "pages" struct,
466  * and we don't need any locking.  It belongs to the migration thread
467  * or to the channel thread.  Switching is safe because the migration
468  * thread is using the channel mutex when changing it, and the channel
469  * have to had finish with its own, otherwise pending_job can't be
470  * false.
471  *
472  * Returns true if succeed, false otherwise.
473  */
474 static bool multifd_send_pages(void)
475 {
476     int i;
477     static int next_channel;
478     MultiFDSendParams *p = NULL; /* make happy gcc */
479     MultiFDPages_t *pages = multifd_send_state->pages;
480 
481     if (multifd_send_should_exit()) {
482         return false;
483     }
484 
485     /* We wait here, until at least one channel is ready */
486     qemu_sem_wait(&multifd_send_state->channels_ready);
487 
488     /*
489      * next_channel can remain from a previous migration that was
490      * using more channels, so ensure it doesn't overflow if the
491      * limit is lower now.
492      */
493     next_channel %= migrate_multifd_channels();
494     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
495         if (multifd_send_should_exit()) {
496             return false;
497         }
498         p = &multifd_send_state->params[i];
499         /*
500          * Lockless read to p->pending_job is safe, because only multifd
501          * sender thread can clear it.
502          */
503         if (qatomic_read(&p->pending_job) == false) {
504             next_channel = (i + 1) % migrate_multifd_channels();
505             break;
506         }
507     }
508 
509     /*
510      * Make sure we read p->pending_job before all the rest.  Pairs with
511      * qatomic_store_release() in multifd_send_thread().
512      */
513     smp_mb_acquire();
514     assert(!p->pages->num);
515     multifd_send_state->pages = p->pages;
516     p->pages = pages;
517     /*
518      * Making sure p->pages is setup before marking pending_job=true. Pairs
519      * with the qatomic_load_acquire() in multifd_send_thread().
520      */
521     qatomic_store_release(&p->pending_job, true);
522     qemu_sem_post(&p->sem);
523 
524     return true;
525 }
526 
527 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
528 {
529     return pages->num == 0;
530 }
531 
532 static inline bool multifd_queue_full(MultiFDPages_t *pages)
533 {
534     return pages->num == pages->allocated;
535 }
536 
537 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
538 {
539     pages->offset[pages->num++] = offset;
540 }
541 
542 /* Returns true if enqueue successful, false otherwise */
543 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
544 {
545     MultiFDPages_t *pages;
546 
547 retry:
548     pages = multifd_send_state->pages;
549 
550     /* If the queue is empty, we can already enqueue now */
551     if (multifd_queue_empty(pages)) {
552         pages->block = block;
553         multifd_enqueue(pages, offset);
554         return true;
555     }
556 
557     /*
558      * Not empty, meanwhile we need a flush.  It can because of either:
559      *
560      * (1) The page is not on the same ramblock of previous ones, or,
561      * (2) The queue is full.
562      *
563      * After flush, always retry.
564      */
565     if (pages->block != block || multifd_queue_full(pages)) {
566         if (!multifd_send_pages()) {
567             return false;
568         }
569         goto retry;
570     }
571 
572     /* Not empty, and we still have space, do it! */
573     multifd_enqueue(pages, offset);
574     return true;
575 }
576 
577 /* Multifd send side hit an error; remember it and prepare to quit */
578 static void multifd_send_set_error(Error *err)
579 {
580     /*
581      * We don't want to exit each threads twice.  Depending on where
582      * we get the error, or if there are two independent errors in two
583      * threads at the same time, we can end calling this function
584      * twice.
585      */
586     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
587         return;
588     }
589 
590     if (err) {
591         MigrationState *s = migrate_get_current();
592         migrate_set_error(s, err);
593         if (s->state == MIGRATION_STATUS_SETUP ||
594             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
595             s->state == MIGRATION_STATUS_DEVICE ||
596             s->state == MIGRATION_STATUS_ACTIVE) {
597             migrate_set_state(&s->state, s->state,
598                               MIGRATION_STATUS_FAILED);
599         }
600     }
601 }
602 
603 static void multifd_send_terminate_threads(void)
604 {
605     int i;
606 
607     trace_multifd_send_terminate_threads();
608 
609     /*
610      * Tell everyone we're quitting.  No xchg() needed here; we simply
611      * always set it.
612      */
613     qatomic_set(&multifd_send_state->exiting, 1);
614 
615     /*
616      * Firstly, kick all threads out; no matter whether they are just idle,
617      * or blocked in an IO system call.
618      */
619     for (i = 0; i < migrate_multifd_channels(); i++) {
620         MultiFDSendParams *p = &multifd_send_state->params[i];
621 
622         qemu_sem_post(&p->sem);
623         if (p->c) {
624             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
625         }
626     }
627 
628     /*
629      * Finally recycle all the threads.
630      */
631     for (i = 0; i < migrate_multifd_channels(); i++) {
632         MultiFDSendParams *p = &multifd_send_state->params[i];
633 
634         if (p->tls_thread_created) {
635             qemu_thread_join(&p->tls_thread);
636         }
637 
638         if (p->thread_created) {
639             qemu_thread_join(&p->thread);
640         }
641     }
642 }
643 
644 static int multifd_send_channel_destroy(QIOChannel *send)
645 {
646     return socket_send_channel_destroy(send);
647 }
648 
649 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
650 {
651     if (p->registered_yank) {
652         migration_ioc_unregister_yank(p->c);
653     }
654     multifd_send_channel_destroy(p->c);
655     p->c = NULL;
656     qemu_sem_destroy(&p->sem);
657     qemu_sem_destroy(&p->sem_sync);
658     g_free(p->name);
659     p->name = NULL;
660     multifd_pages_clear(p->pages);
661     p->pages = NULL;
662     p->packet_len = 0;
663     g_free(p->packet);
664     p->packet = NULL;
665     g_free(p->iov);
666     p->iov = NULL;
667     multifd_send_state->ops->send_cleanup(p, errp);
668 
669     return *errp == NULL;
670 }
671 
672 static void multifd_send_cleanup_state(void)
673 {
674     qemu_sem_destroy(&multifd_send_state->channels_created);
675     qemu_sem_destroy(&multifd_send_state->channels_ready);
676     g_free(multifd_send_state->params);
677     multifd_send_state->params = NULL;
678     multifd_pages_clear(multifd_send_state->pages);
679     multifd_send_state->pages = NULL;
680     g_free(multifd_send_state);
681     multifd_send_state = NULL;
682 }
683 
684 void multifd_send_shutdown(void)
685 {
686     int i;
687 
688     if (!migrate_multifd()) {
689         return;
690     }
691 
692     multifd_send_terminate_threads();
693 
694     for (i = 0; i < migrate_multifd_channels(); i++) {
695         MultiFDSendParams *p = &multifd_send_state->params[i];
696         Error *local_err = NULL;
697 
698         if (!multifd_send_cleanup_channel(p, &local_err)) {
699             migrate_set_error(migrate_get_current(), local_err);
700             error_free(local_err);
701         }
702     }
703 
704     multifd_send_cleanup_state();
705 }
706 
707 static int multifd_zero_copy_flush(QIOChannel *c)
708 {
709     int ret;
710     Error *err = NULL;
711 
712     ret = qio_channel_flush(c, &err);
713     if (ret < 0) {
714         error_report_err(err);
715         return -1;
716     }
717     if (ret == 1) {
718         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
719     }
720 
721     return ret;
722 }
723 
724 int multifd_send_sync_main(void)
725 {
726     int i;
727     bool flush_zero_copy;
728 
729     if (!migrate_multifd()) {
730         return 0;
731     }
732     if (multifd_send_state->pages->num) {
733         if (!multifd_send_pages()) {
734             error_report("%s: multifd_send_pages fail", __func__);
735             return -1;
736         }
737     }
738 
739     flush_zero_copy = migrate_zero_copy_send();
740 
741     for (i = 0; i < migrate_multifd_channels(); i++) {
742         MultiFDSendParams *p = &multifd_send_state->params[i];
743 
744         if (multifd_send_should_exit()) {
745             return -1;
746         }
747 
748         trace_multifd_send_sync_main_signal(p->id);
749 
750         /*
751          * We should be the only user so far, so not possible to be set by
752          * others concurrently.
753          */
754         assert(qatomic_read(&p->pending_sync) == false);
755         qatomic_set(&p->pending_sync, true);
756         qemu_sem_post(&p->sem);
757     }
758     for (i = 0; i < migrate_multifd_channels(); i++) {
759         MultiFDSendParams *p = &multifd_send_state->params[i];
760 
761         if (multifd_send_should_exit()) {
762             return -1;
763         }
764 
765         qemu_sem_wait(&multifd_send_state->channels_ready);
766         trace_multifd_send_sync_main_wait(p->id);
767         qemu_sem_wait(&p->sem_sync);
768 
769         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
770             return -1;
771         }
772     }
773     trace_multifd_send_sync_main(multifd_send_state->packet_num);
774 
775     return 0;
776 }
777 
778 static void *multifd_send_thread(void *opaque)
779 {
780     MultiFDSendParams *p = opaque;
781     MigrationThread *thread = NULL;
782     Error *local_err = NULL;
783     int ret = 0;
784 
785     thread = migration_threads_add(p->name, qemu_get_thread_id());
786 
787     trace_multifd_send_thread_start(p->id);
788     rcu_register_thread();
789 
790     if (multifd_send_initial_packet(p, &local_err) < 0) {
791         ret = -1;
792         goto out;
793     }
794 
795     while (true) {
796         qemu_sem_post(&multifd_send_state->channels_ready);
797         qemu_sem_wait(&p->sem);
798 
799         if (multifd_send_should_exit()) {
800             break;
801         }
802 
803         /*
804          * Read pending_job flag before p->pages.  Pairs with the
805          * qatomic_store_release() in multifd_send_pages().
806          */
807         if (qatomic_load_acquire(&p->pending_job)) {
808             MultiFDPages_t *pages = p->pages;
809 
810             p->iovs_num = 0;
811             assert(pages->num);
812 
813             ret = multifd_send_state->ops->send_prepare(p, &local_err);
814             if (ret != 0) {
815                 break;
816             }
817 
818             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
819                                               0, p->write_flags, &local_err);
820             if (ret != 0) {
821                 break;
822             }
823 
824             stat64_add(&mig_stats.multifd_bytes,
825                        p->next_packet_size + p->packet_len);
826 
827             multifd_pages_reset(p->pages);
828             p->next_packet_size = 0;
829 
830             /*
831              * Making sure p->pages is published before saying "we're
832              * free".  Pairs with the smp_mb_acquire() in
833              * multifd_send_pages().
834              */
835             qatomic_store_release(&p->pending_job, false);
836         } else {
837             /*
838              * If not a normal job, must be a sync request.  Note that
839              * pending_sync is a standalone flag (unlike pending_job), so
840              * it doesn't require explicit memory barriers.
841              */
842             assert(qatomic_read(&p->pending_sync));
843             p->flags = MULTIFD_FLAG_SYNC;
844             multifd_send_fill_packet(p);
845             ret = qio_channel_write_all(p->c, (void *)p->packet,
846                                         p->packet_len, &local_err);
847             if (ret != 0) {
848                 break;
849             }
850             /* p->next_packet_size will always be zero for a SYNC packet */
851             stat64_add(&mig_stats.multifd_bytes, p->packet_len);
852             p->flags = 0;
853             qatomic_set(&p->pending_sync, false);
854             qemu_sem_post(&p->sem_sync);
855         }
856     }
857 
858 out:
859     if (ret) {
860         assert(local_err);
861         trace_multifd_send_error(p->id);
862         multifd_send_set_error(local_err);
863         multifd_send_kick_main(p);
864         error_free(local_err);
865     }
866 
867     rcu_unregister_thread();
868     migration_threads_remove(thread);
869     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
870 
871     return NULL;
872 }
873 
874 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
875 
876 static void *multifd_tls_handshake_thread(void *opaque)
877 {
878     MultiFDSendParams *p = opaque;
879     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
880 
881     qio_channel_tls_handshake(tioc,
882                               multifd_new_send_channel_async,
883                               p,
884                               NULL,
885                               NULL);
886     return NULL;
887 }
888 
889 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
890                                         QIOChannel *ioc,
891                                         Error **errp)
892 {
893     MigrationState *s = migrate_get_current();
894     const char *hostname = s->hostname;
895     QIOChannelTLS *tioc;
896 
897     tioc = migration_tls_client_create(ioc, hostname, errp);
898     if (!tioc) {
899         return false;
900     }
901 
902     /*
903      * Ownership of the socket channel now transfers to the newly
904      * created TLS channel, which has already taken a reference.
905      */
906     object_unref(OBJECT(ioc));
907     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
908     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
909     p->c = QIO_CHANNEL(tioc);
910 
911     p->tls_thread_created = true;
912     qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
913                        multifd_tls_handshake_thread, p,
914                        QEMU_THREAD_JOINABLE);
915     return true;
916 }
917 
918 static bool multifd_channel_connect(MultiFDSendParams *p,
919                                     QIOChannel *ioc,
920                                     Error **errp)
921 {
922     qio_channel_set_delay(ioc, false);
923 
924     migration_ioc_register_yank(ioc);
925     p->registered_yank = true;
926     p->c = ioc;
927 
928     p->thread_created = true;
929     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
930                        QEMU_THREAD_JOINABLE);
931     return true;
932 }
933 
934 /*
935  * When TLS is enabled this function is called once to establish the
936  * TLS connection and a second time after the TLS handshake to create
937  * the multifd channel. Without TLS it goes straight into the channel
938  * creation.
939  */
940 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
941 {
942     MultiFDSendParams *p = opaque;
943     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
944     Error *local_err = NULL;
945     bool ret;
946 
947     trace_multifd_new_send_channel_async(p->id);
948 
949     if (qio_task_propagate_error(task, &local_err)) {
950         ret = false;
951         goto out;
952     }
953 
954     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
955                                        migrate_get_current()->hostname);
956 
957     if (migrate_channel_requires_tls_upgrade(ioc)) {
958         ret = multifd_tls_channel_connect(p, ioc, &local_err);
959         if (ret) {
960             return;
961         }
962     } else {
963         ret = multifd_channel_connect(p, ioc, &local_err);
964     }
965 
966 out:
967     /*
968      * Here we're not interested whether creation succeeded, only that
969      * it happened at all.
970      */
971     qemu_sem_post(&multifd_send_state->channels_created);
972 
973     if (ret) {
974         return;
975     }
976 
977     trace_multifd_new_send_channel_async_error(p->id, local_err);
978     multifd_send_set_error(local_err);
979     if (!p->c) {
980         /*
981          * If no channel has been created, drop the initial
982          * reference. Otherwise cleanup happens at
983          * multifd_send_channel_destroy()
984          */
985         object_unref(OBJECT(ioc));
986     }
987     error_free(local_err);
988 }
989 
990 static void multifd_new_send_channel_create(gpointer opaque)
991 {
992     socket_send_channel_create(multifd_new_send_channel_async, opaque);
993 }
994 
995 bool multifd_send_setup(void)
996 {
997     MigrationState *s = migrate_get_current();
998     Error *local_err = NULL;
999     int thread_count, ret = 0;
1000     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1001     uint8_t i;
1002 
1003     if (!migrate_multifd()) {
1004         return true;
1005     }
1006 
1007     thread_count = migrate_multifd_channels();
1008     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1009     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1010     multifd_send_state->pages = multifd_pages_init(page_count);
1011     qemu_sem_init(&multifd_send_state->channels_created, 0);
1012     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1013     qatomic_set(&multifd_send_state->exiting, 0);
1014     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1015 
1016     for (i = 0; i < thread_count; i++) {
1017         MultiFDSendParams *p = &multifd_send_state->params[i];
1018 
1019         qemu_sem_init(&p->sem, 0);
1020         qemu_sem_init(&p->sem_sync, 0);
1021         p->id = i;
1022         p->pages = multifd_pages_init(page_count);
1023         p->packet_len = sizeof(MultiFDPacket_t)
1024                       + sizeof(uint64_t) * page_count;
1025         p->packet = g_malloc0(p->packet_len);
1026         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1027         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1028         p->name = g_strdup_printf("multifdsend_%d", i);
1029         /* We need one extra place for the packet header */
1030         p->iov = g_new0(struct iovec, page_count + 1);
1031         p->page_size = qemu_target_page_size();
1032         p->page_count = page_count;
1033         p->write_flags = 0;
1034         multifd_new_send_channel_create(p);
1035     }
1036 
1037     /*
1038      * Wait until channel creation has started for all channels. The
1039      * creation can still fail, but no more channels will be created
1040      * past this point.
1041      */
1042     for (i = 0; i < thread_count; i++) {
1043         qemu_sem_wait(&multifd_send_state->channels_created);
1044     }
1045 
1046     for (i = 0; i < thread_count; i++) {
1047         MultiFDSendParams *p = &multifd_send_state->params[i];
1048 
1049         ret = multifd_send_state->ops->send_setup(p, &local_err);
1050         if (ret) {
1051             break;
1052         }
1053     }
1054 
1055     if (ret) {
1056         migrate_set_error(s, local_err);
1057         error_report_err(local_err);
1058         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1059                           MIGRATION_STATUS_FAILED);
1060         return false;
1061     }
1062 
1063     return true;
1064 }
1065 
1066 struct {
1067     MultiFDRecvParams *params;
1068     /* number of created threads */
1069     int count;
1070     /* syncs main thread and channels */
1071     QemuSemaphore sem_sync;
1072     /* global number of generated multifd packets */
1073     uint64_t packet_num;
1074     /* multifd ops */
1075     MultiFDMethods *ops;
1076 } *multifd_recv_state;
1077 
1078 static void multifd_recv_terminate_threads(Error *err)
1079 {
1080     int i;
1081 
1082     trace_multifd_recv_terminate_threads(err != NULL);
1083 
1084     if (err) {
1085         MigrationState *s = migrate_get_current();
1086         migrate_set_error(s, err);
1087         if (s->state == MIGRATION_STATUS_SETUP ||
1088             s->state == MIGRATION_STATUS_ACTIVE) {
1089             migrate_set_state(&s->state, s->state,
1090                               MIGRATION_STATUS_FAILED);
1091         }
1092     }
1093 
1094     for (i = 0; i < migrate_multifd_channels(); i++) {
1095         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1096 
1097         qemu_mutex_lock(&p->mutex);
1098         p->quit = true;
1099         /*
1100          * We could arrive here for two reasons:
1101          *  - normal quit, i.e. everything went fine, just finished
1102          *  - error quit: We close the channels so the channel threads
1103          *    finish the qio_channel_read_all_eof()
1104          */
1105         if (p->c) {
1106             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1107         }
1108         qemu_mutex_unlock(&p->mutex);
1109     }
1110 }
1111 
1112 void multifd_recv_shutdown(void)
1113 {
1114     if (migrate_multifd()) {
1115         multifd_recv_terminate_threads(NULL);
1116     }
1117 }
1118 
1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120 {
1121     migration_ioc_unregister_yank(p->c);
1122     object_unref(OBJECT(p->c));
1123     p->c = NULL;
1124     qemu_mutex_destroy(&p->mutex);
1125     qemu_sem_destroy(&p->sem_sync);
1126     g_free(p->name);
1127     p->name = NULL;
1128     p->packet_len = 0;
1129     g_free(p->packet);
1130     p->packet = NULL;
1131     g_free(p->iov);
1132     p->iov = NULL;
1133     g_free(p->normal);
1134     p->normal = NULL;
1135     multifd_recv_state->ops->recv_cleanup(p);
1136 }
1137 
1138 static void multifd_recv_cleanup_state(void)
1139 {
1140     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1141     g_free(multifd_recv_state->params);
1142     multifd_recv_state->params = NULL;
1143     g_free(multifd_recv_state);
1144     multifd_recv_state = NULL;
1145 }
1146 
1147 void multifd_recv_cleanup(void)
1148 {
1149     int i;
1150 
1151     if (!migrate_multifd()) {
1152         return;
1153     }
1154     multifd_recv_terminate_threads(NULL);
1155     for (i = 0; i < migrate_multifd_channels(); i++) {
1156         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1157 
1158         /*
1159          * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1160          * however try to wakeup it without harm in cleanup phase.
1161          */
1162         qemu_sem_post(&p->sem_sync);
1163 
1164         if (p->thread_created) {
1165             qemu_thread_join(&p->thread);
1166         }
1167     }
1168     for (i = 0; i < migrate_multifd_channels(); i++) {
1169         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170     }
1171     multifd_recv_cleanup_state();
1172 }
1173 
1174 void multifd_recv_sync_main(void)
1175 {
1176     int i;
1177 
1178     if (!migrate_multifd()) {
1179         return;
1180     }
1181     for (i = 0; i < migrate_multifd_channels(); i++) {
1182         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1183 
1184         trace_multifd_recv_sync_main_wait(p->id);
1185         qemu_sem_wait(&multifd_recv_state->sem_sync);
1186     }
1187     for (i = 0; i < migrate_multifd_channels(); i++) {
1188         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1189 
1190         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1191             if (multifd_recv_state->packet_num < p->packet_num) {
1192                 multifd_recv_state->packet_num = p->packet_num;
1193             }
1194         }
1195         trace_multifd_recv_sync_main_signal(p->id);
1196         qemu_sem_post(&p->sem_sync);
1197     }
1198     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1199 }
1200 
1201 static void *multifd_recv_thread(void *opaque)
1202 {
1203     MultiFDRecvParams *p = opaque;
1204     Error *local_err = NULL;
1205     int ret;
1206 
1207     trace_multifd_recv_thread_start(p->id);
1208     rcu_register_thread();
1209 
1210     while (true) {
1211         uint32_t flags;
1212 
1213         if (p->quit) {
1214             break;
1215         }
1216 
1217         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1218                                        p->packet_len, &local_err);
1219         if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1220             break;
1221         }
1222 
1223         qemu_mutex_lock(&p->mutex);
1224         ret = multifd_recv_unfill_packet(p, &local_err);
1225         if (ret) {
1226             qemu_mutex_unlock(&p->mutex);
1227             break;
1228         }
1229 
1230         flags = p->flags;
1231         /* recv methods don't know how to handle the SYNC flag */
1232         p->flags &= ~MULTIFD_FLAG_SYNC;
1233         qemu_mutex_unlock(&p->mutex);
1234 
1235         if (p->normal_num) {
1236             ret = multifd_recv_state->ops->recv_pages(p, &local_err);
1237             if (ret != 0) {
1238                 break;
1239             }
1240         }
1241 
1242         if (flags & MULTIFD_FLAG_SYNC) {
1243             qemu_sem_post(&multifd_recv_state->sem_sync);
1244             qemu_sem_wait(&p->sem_sync);
1245         }
1246     }
1247 
1248     if (local_err) {
1249         multifd_recv_terminate_threads(local_err);
1250         error_free(local_err);
1251     }
1252 
1253     rcu_unregister_thread();
1254     trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1255 
1256     return NULL;
1257 }
1258 
1259 int multifd_recv_setup(Error **errp)
1260 {
1261     int thread_count;
1262     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1263     uint8_t i;
1264 
1265     /*
1266      * Return successfully if multiFD recv state is already initialised
1267      * or multiFD is not enabled.
1268      */
1269     if (multifd_recv_state || !migrate_multifd()) {
1270         return 0;
1271     }
1272 
1273     thread_count = migrate_multifd_channels();
1274     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1275     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1276     qatomic_set(&multifd_recv_state->count, 0);
1277     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1278     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1279 
1280     for (i = 0; i < thread_count; i++) {
1281         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1282 
1283         qemu_mutex_init(&p->mutex);
1284         qemu_sem_init(&p->sem_sync, 0);
1285         p->quit = false;
1286         p->id = i;
1287         p->packet_len = sizeof(MultiFDPacket_t)
1288                       + sizeof(uint64_t) * page_count;
1289         p->packet = g_malloc0(p->packet_len);
1290         p->name = g_strdup_printf("multifdrecv_%d", i);
1291         p->iov = g_new0(struct iovec, page_count);
1292         p->normal = g_new0(ram_addr_t, page_count);
1293         p->page_count = page_count;
1294         p->page_size = qemu_target_page_size();
1295     }
1296 
1297     for (i = 0; i < thread_count; i++) {
1298         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1299         int ret;
1300 
1301         ret = multifd_recv_state->ops->recv_setup(p, errp);
1302         if (ret) {
1303             return ret;
1304         }
1305     }
1306     return 0;
1307 }
1308 
1309 bool multifd_recv_all_channels_created(void)
1310 {
1311     int thread_count = migrate_multifd_channels();
1312 
1313     if (!migrate_multifd()) {
1314         return true;
1315     }
1316 
1317     if (!multifd_recv_state) {
1318         /* Called before any connections created */
1319         return false;
1320     }
1321 
1322     return thread_count == qatomic_read(&multifd_recv_state->count);
1323 }
1324 
1325 /*
1326  * Try to receive all multifd channels to get ready for the migration.
1327  * Sets @errp when failing to receive the current channel.
1328  */
1329 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1330 {
1331     MultiFDRecvParams *p;
1332     Error *local_err = NULL;
1333     int id;
1334 
1335     id = multifd_recv_initial_packet(ioc, &local_err);
1336     if (id < 0) {
1337         multifd_recv_terminate_threads(local_err);
1338         error_propagate_prepend(errp, local_err,
1339                                 "failed to receive packet"
1340                                 " via multifd channel %d: ",
1341                                 qatomic_read(&multifd_recv_state->count));
1342         return;
1343     }
1344     trace_multifd_recv_new_channel(id);
1345 
1346     p = &multifd_recv_state->params[id];
1347     if (p->c != NULL) {
1348         error_setg(&local_err, "multifd: received id '%d' already setup'",
1349                    id);
1350         multifd_recv_terminate_threads(local_err);
1351         error_propagate(errp, local_err);
1352         return;
1353     }
1354     p->c = ioc;
1355     object_ref(OBJECT(ioc));
1356 
1357     p->thread_created = true;
1358     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1359                        QEMU_THREAD_JOINABLE);
1360     qatomic_inc(&multifd_recv_state->count);
1361 }
1362