xref: /qemu/migration/multifd.c (revision d0fb9657)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "socket.h"
23 #include "tls.h"
24 #include "qemu-file.h"
25 #include "trace.h"
26 #include "multifd.h"
27 
28 #include "qemu/yank.h"
29 #include "io/channel-socket.h"
30 #include "yank_functions.h"
31 
32 /* Multiple fd's */
33 
34 #define MULTIFD_MAGIC 0x11223344U
35 #define MULTIFD_VERSION 1
36 
37 typedef struct {
38     uint32_t magic;
39     uint32_t version;
40     unsigned char uuid[16]; /* QemuUUID */
41     uint8_t id;
42     uint8_t unused1[7];     /* Reserved for future use */
43     uint64_t unused2[4];    /* Reserved for future use */
44 } __attribute__((packed)) MultiFDInit_t;
45 
46 /* Multifd without compression */
47 
48 /**
49  * nocomp_send_setup: setup send side
50  *
51  * For no compression this function does nothing.
52  *
53  * Returns 0 for success or -1 for error
54  *
55  * @p: Params for the channel that we are using
56  * @errp: pointer to an error
57  */
58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
59 {
60     return 0;
61 }
62 
63 /**
64  * nocomp_send_cleanup: cleanup send side
65  *
66  * For no compression this function does nothing.
67  *
68  * @p: Params for the channel that we are using
69  */
70 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
71 {
72     return;
73 }
74 
75 /**
76  * nocomp_send_prepare: prepare date to be able to send
77  *
78  * For no compression we just have to calculate the size of the
79  * packet.
80  *
81  * Returns 0 for success or -1 for error
82  *
83  * @p: Params for the channel that we are using
84  * @used: number of pages used
85  * @errp: pointer to an error
86  */
87 static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
88                                Error **errp)
89 {
90     p->next_packet_size = used * qemu_target_page_size();
91     p->flags |= MULTIFD_FLAG_NOCOMP;
92     return 0;
93 }
94 
95 /**
96  * nocomp_send_write: do the actual write of the data
97  *
98  * For no compression we just have to write the data.
99  *
100  * Returns 0 for success or -1 for error
101  *
102  * @p: Params for the channel that we are using
103  * @used: number of pages used
104  * @errp: pointer to an error
105  */
106 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
107 {
108     return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
109 }
110 
111 /**
112  * nocomp_recv_setup: setup receive side
113  *
114  * For no compression this function does nothing.
115  *
116  * Returns 0 for success or -1 for error
117  *
118  * @p: Params for the channel that we are using
119  * @errp: pointer to an error
120  */
121 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
122 {
123     return 0;
124 }
125 
126 /**
127  * nocomp_recv_cleanup: setup receive side
128  *
129  * For no compression this function does nothing.
130  *
131  * @p: Params for the channel that we are using
132  */
133 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
134 {
135 }
136 
137 /**
138  * nocomp_recv_pages: read the data from the channel into actual pages
139  *
140  * For no compression we just need to read things into the correct place.
141  *
142  * Returns 0 for success or -1 for error
143  *
144  * @p: Params for the channel that we are using
145  * @used: number of pages used
146  * @errp: pointer to an error
147  */
148 static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
149 {
150     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
151 
152     if (flags != MULTIFD_FLAG_NOCOMP) {
153         error_setg(errp, "multifd %d: flags received %x flags expected %x",
154                    p->id, flags, MULTIFD_FLAG_NOCOMP);
155         return -1;
156     }
157     return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
158 }
159 
160 static MultiFDMethods multifd_nocomp_ops = {
161     .send_setup = nocomp_send_setup,
162     .send_cleanup = nocomp_send_cleanup,
163     .send_prepare = nocomp_send_prepare,
164     .send_write = nocomp_send_write,
165     .recv_setup = nocomp_recv_setup,
166     .recv_cleanup = nocomp_recv_cleanup,
167     .recv_pages = nocomp_recv_pages
168 };
169 
170 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
171     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
172 };
173 
174 void multifd_register_ops(int method, MultiFDMethods *ops)
175 {
176     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
177     multifd_ops[method] = ops;
178 }
179 
180 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
181 {
182     MultiFDInit_t msg = {};
183     int ret;
184 
185     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
186     msg.version = cpu_to_be32(MULTIFD_VERSION);
187     msg.id = p->id;
188     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
189 
190     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
191     if (ret != 0) {
192         return -1;
193     }
194     return 0;
195 }
196 
197 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
198 {
199     MultiFDInit_t msg;
200     int ret;
201 
202     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
203     if (ret != 0) {
204         return -1;
205     }
206 
207     msg.magic = be32_to_cpu(msg.magic);
208     msg.version = be32_to_cpu(msg.version);
209 
210     if (msg.magic != MULTIFD_MAGIC) {
211         error_setg(errp, "multifd: received packet magic %x "
212                    "expected %x", msg.magic, MULTIFD_MAGIC);
213         return -1;
214     }
215 
216     if (msg.version != MULTIFD_VERSION) {
217         error_setg(errp, "multifd: received packet version %d "
218                    "expected %d", msg.version, MULTIFD_VERSION);
219         return -1;
220     }
221 
222     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
223         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
224         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
225 
226         error_setg(errp, "multifd: received uuid '%s' and expected "
227                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
228         g_free(uuid);
229         g_free(msg_uuid);
230         return -1;
231     }
232 
233     if (msg.id > migrate_multifd_channels()) {
234         error_setg(errp, "multifd: received channel version %d "
235                    "expected %d", msg.version, MULTIFD_VERSION);
236         return -1;
237     }
238 
239     return msg.id;
240 }
241 
242 static MultiFDPages_t *multifd_pages_init(size_t size)
243 {
244     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
245 
246     pages->allocated = size;
247     pages->iov = g_new0(struct iovec, size);
248     pages->offset = g_new0(ram_addr_t, size);
249 
250     return pages;
251 }
252 
253 static void multifd_pages_clear(MultiFDPages_t *pages)
254 {
255     pages->used = 0;
256     pages->allocated = 0;
257     pages->packet_num = 0;
258     pages->block = NULL;
259     g_free(pages->iov);
260     pages->iov = NULL;
261     g_free(pages->offset);
262     pages->offset = NULL;
263     g_free(pages);
264 }
265 
266 static void multifd_send_fill_packet(MultiFDSendParams *p)
267 {
268     MultiFDPacket_t *packet = p->packet;
269     int i;
270 
271     packet->flags = cpu_to_be32(p->flags);
272     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
273     packet->pages_used = cpu_to_be32(p->pages->used);
274     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
275     packet->packet_num = cpu_to_be64(p->packet_num);
276 
277     if (p->pages->block) {
278         strncpy(packet->ramblock, p->pages->block->idstr, 256);
279     }
280 
281     for (i = 0; i < p->pages->used; i++) {
282         /* there are architectures where ram_addr_t is 32 bit */
283         uint64_t temp = p->pages->offset[i];
284 
285         packet->offset[i] = cpu_to_be64(temp);
286     }
287 }
288 
289 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
290 {
291     MultiFDPacket_t *packet = p->packet;
292     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
293     RAMBlock *block;
294     int i;
295 
296     packet->magic = be32_to_cpu(packet->magic);
297     if (packet->magic != MULTIFD_MAGIC) {
298         error_setg(errp, "multifd: received packet "
299                    "magic %x and expected magic %x",
300                    packet->magic, MULTIFD_MAGIC);
301         return -1;
302     }
303 
304     packet->version = be32_to_cpu(packet->version);
305     if (packet->version != MULTIFD_VERSION) {
306         error_setg(errp, "multifd: received packet "
307                    "version %d and expected version %d",
308                    packet->version, MULTIFD_VERSION);
309         return -1;
310     }
311 
312     p->flags = be32_to_cpu(packet->flags);
313 
314     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
315     /*
316      * If we received a packet that is 100 times bigger than expected
317      * just stop migration.  It is a magic number.
318      */
319     if (packet->pages_alloc > pages_max * 100) {
320         error_setg(errp, "multifd: received packet "
321                    "with size %d and expected a maximum size of %d",
322                    packet->pages_alloc, pages_max * 100) ;
323         return -1;
324     }
325     /*
326      * We received a packet that is bigger than expected but inside
327      * reasonable limits (see previous comment).  Just reallocate.
328      */
329     if (packet->pages_alloc > p->pages->allocated) {
330         multifd_pages_clear(p->pages);
331         p->pages = multifd_pages_init(packet->pages_alloc);
332     }
333 
334     p->pages->used = be32_to_cpu(packet->pages_used);
335     if (p->pages->used > packet->pages_alloc) {
336         error_setg(errp, "multifd: received packet "
337                    "with %d pages and expected maximum pages are %d",
338                    p->pages->used, packet->pages_alloc) ;
339         return -1;
340     }
341 
342     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
343     p->packet_num = be64_to_cpu(packet->packet_num);
344 
345     if (p->pages->used == 0) {
346         return 0;
347     }
348 
349     /* make sure that ramblock is 0 terminated */
350     packet->ramblock[255] = 0;
351     block = qemu_ram_block_by_name(packet->ramblock);
352     if (!block) {
353         error_setg(errp, "multifd: unknown ram block %s",
354                    packet->ramblock);
355         return -1;
356     }
357 
358     for (i = 0; i < p->pages->used; i++) {
359         uint64_t offset = be64_to_cpu(packet->offset[i]);
360 
361         if (offset > (block->used_length - qemu_target_page_size())) {
362             error_setg(errp, "multifd: offset too long %" PRIu64
363                        " (max " RAM_ADDR_FMT ")",
364                        offset, block->used_length);
365             return -1;
366         }
367         p->pages->iov[i].iov_base = block->host + offset;
368         p->pages->iov[i].iov_len = qemu_target_page_size();
369     }
370 
371     return 0;
372 }
373 
374 struct {
375     MultiFDSendParams *params;
376     /* array of pages to sent */
377     MultiFDPages_t *pages;
378     /* global number of generated multifd packets */
379     uint64_t packet_num;
380     /* send channels ready */
381     QemuSemaphore channels_ready;
382     /*
383      * Have we already run terminate threads.  There is a race when it
384      * happens that we got one error while we are exiting.
385      * We will use atomic operations.  Only valid values are 0 and 1.
386      */
387     int exiting;
388     /* multifd ops */
389     MultiFDMethods *ops;
390 } *multifd_send_state;
391 
392 /*
393  * How we use multifd_send_state->pages and channel->pages?
394  *
395  * We create a pages for each channel, and a main one.  Each time that
396  * we need to send a batch of pages we interchange the ones between
397  * multifd_send_state and the channel that is sending it.  There are
398  * two reasons for that:
399  *    - to not have to do so many mallocs during migration
400  *    - to make easier to know what to free at the end of migration
401  *
402  * This way we always know who is the owner of each "pages" struct,
403  * and we don't need any locking.  It belongs to the migration thread
404  * or to the channel thread.  Switching is safe because the migration
405  * thread is using the channel mutex when changing it, and the channel
406  * have to had finish with its own, otherwise pending_job can't be
407  * false.
408  */
409 
410 static int multifd_send_pages(QEMUFile *f)
411 {
412     int i;
413     static int next_channel;
414     MultiFDSendParams *p = NULL; /* make happy gcc */
415     MultiFDPages_t *pages = multifd_send_state->pages;
416     uint64_t transferred;
417 
418     if (qatomic_read(&multifd_send_state->exiting)) {
419         return -1;
420     }
421 
422     qemu_sem_wait(&multifd_send_state->channels_ready);
423     /*
424      * next_channel can remain from a previous migration that was
425      * using more channels, so ensure it doesn't overflow if the
426      * limit is lower now.
427      */
428     next_channel %= migrate_multifd_channels();
429     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
430         p = &multifd_send_state->params[i];
431 
432         qemu_mutex_lock(&p->mutex);
433         if (p->quit) {
434             error_report("%s: channel %d has already quit!", __func__, i);
435             qemu_mutex_unlock(&p->mutex);
436             return -1;
437         }
438         if (!p->pending_job) {
439             p->pending_job++;
440             next_channel = (i + 1) % migrate_multifd_channels();
441             break;
442         }
443         qemu_mutex_unlock(&p->mutex);
444     }
445     assert(!p->pages->used);
446     assert(!p->pages->block);
447 
448     p->packet_num = multifd_send_state->packet_num++;
449     multifd_send_state->pages = p->pages;
450     p->pages = pages;
451     transferred = ((uint64_t) pages->used) * qemu_target_page_size()
452                 + p->packet_len;
453     qemu_file_update_transfer(f, transferred);
454     ram_counters.multifd_bytes += transferred;
455     ram_counters.transferred += transferred;
456     qemu_mutex_unlock(&p->mutex);
457     qemu_sem_post(&p->sem);
458 
459     return 1;
460 }
461 
462 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
463 {
464     MultiFDPages_t *pages = multifd_send_state->pages;
465 
466     if (!pages->block) {
467         pages->block = block;
468     }
469 
470     if (pages->block == block) {
471         pages->offset[pages->used] = offset;
472         pages->iov[pages->used].iov_base = block->host + offset;
473         pages->iov[pages->used].iov_len = qemu_target_page_size();
474         pages->used++;
475 
476         if (pages->used < pages->allocated) {
477             return 1;
478         }
479     }
480 
481     if (multifd_send_pages(f) < 0) {
482         return -1;
483     }
484 
485     if (pages->block != block) {
486         return  multifd_queue_page(f, block, offset);
487     }
488 
489     return 1;
490 }
491 
492 static void multifd_send_terminate_threads(Error *err)
493 {
494     int i;
495 
496     trace_multifd_send_terminate_threads(err != NULL);
497 
498     if (err) {
499         MigrationState *s = migrate_get_current();
500         migrate_set_error(s, err);
501         if (s->state == MIGRATION_STATUS_SETUP ||
502             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
503             s->state == MIGRATION_STATUS_DEVICE ||
504             s->state == MIGRATION_STATUS_ACTIVE) {
505             migrate_set_state(&s->state, s->state,
506                               MIGRATION_STATUS_FAILED);
507         }
508     }
509 
510     /*
511      * We don't want to exit each threads twice.  Depending on where
512      * we get the error, or if there are two independent errors in two
513      * threads at the same time, we can end calling this function
514      * twice.
515      */
516     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
517         return;
518     }
519 
520     for (i = 0; i < migrate_multifd_channels(); i++) {
521         MultiFDSendParams *p = &multifd_send_state->params[i];
522 
523         qemu_mutex_lock(&p->mutex);
524         p->quit = true;
525         qemu_sem_post(&p->sem);
526         qemu_mutex_unlock(&p->mutex);
527     }
528 }
529 
530 void multifd_save_cleanup(void)
531 {
532     int i;
533 
534     if (!migrate_use_multifd()) {
535         return;
536     }
537     multifd_send_terminate_threads(NULL);
538     for (i = 0; i < migrate_multifd_channels(); i++) {
539         MultiFDSendParams *p = &multifd_send_state->params[i];
540 
541         if (p->running) {
542             qemu_thread_join(&p->thread);
543         }
544     }
545     for (i = 0; i < migrate_multifd_channels(); i++) {
546         MultiFDSendParams *p = &multifd_send_state->params[i];
547         Error *local_err = NULL;
548 
549         socket_send_channel_destroy(p->c);
550         p->c = NULL;
551         qemu_mutex_destroy(&p->mutex);
552         qemu_sem_destroy(&p->sem);
553         qemu_sem_destroy(&p->sem_sync);
554         g_free(p->name);
555         p->name = NULL;
556         g_free(p->tls_hostname);
557         p->tls_hostname = NULL;
558         multifd_pages_clear(p->pages);
559         p->pages = NULL;
560         p->packet_len = 0;
561         g_free(p->packet);
562         p->packet = NULL;
563         multifd_send_state->ops->send_cleanup(p, &local_err);
564         if (local_err) {
565             migrate_set_error(migrate_get_current(), local_err);
566             error_free(local_err);
567         }
568     }
569     qemu_sem_destroy(&multifd_send_state->channels_ready);
570     g_free(multifd_send_state->params);
571     multifd_send_state->params = NULL;
572     multifd_pages_clear(multifd_send_state->pages);
573     multifd_send_state->pages = NULL;
574     g_free(multifd_send_state);
575     multifd_send_state = NULL;
576 }
577 
578 void multifd_send_sync_main(QEMUFile *f)
579 {
580     int i;
581 
582     if (!migrate_use_multifd()) {
583         return;
584     }
585     if (multifd_send_state->pages->used) {
586         if (multifd_send_pages(f) < 0) {
587             error_report("%s: multifd_send_pages fail", __func__);
588             return;
589         }
590     }
591     for (i = 0; i < migrate_multifd_channels(); i++) {
592         MultiFDSendParams *p = &multifd_send_state->params[i];
593 
594         trace_multifd_send_sync_main_signal(p->id);
595 
596         qemu_mutex_lock(&p->mutex);
597 
598         if (p->quit) {
599             error_report("%s: channel %d has already quit", __func__, i);
600             qemu_mutex_unlock(&p->mutex);
601             return;
602         }
603 
604         p->packet_num = multifd_send_state->packet_num++;
605         p->flags |= MULTIFD_FLAG_SYNC;
606         p->pending_job++;
607         qemu_file_update_transfer(f, p->packet_len);
608         ram_counters.multifd_bytes += p->packet_len;
609         ram_counters.transferred += p->packet_len;
610         qemu_mutex_unlock(&p->mutex);
611         qemu_sem_post(&p->sem);
612     }
613     for (i = 0; i < migrate_multifd_channels(); i++) {
614         MultiFDSendParams *p = &multifd_send_state->params[i];
615 
616         trace_multifd_send_sync_main_wait(p->id);
617         qemu_sem_wait(&p->sem_sync);
618     }
619     trace_multifd_send_sync_main(multifd_send_state->packet_num);
620 }
621 
622 static void *multifd_send_thread(void *opaque)
623 {
624     MultiFDSendParams *p = opaque;
625     Error *local_err = NULL;
626     int ret = 0;
627     uint32_t flags = 0;
628 
629     trace_multifd_send_thread_start(p->id);
630     rcu_register_thread();
631 
632     if (multifd_send_initial_packet(p, &local_err) < 0) {
633         ret = -1;
634         goto out;
635     }
636     /* initial packet */
637     p->num_packets = 1;
638 
639     while (true) {
640         qemu_sem_wait(&p->sem);
641 
642         if (qatomic_read(&multifd_send_state->exiting)) {
643             break;
644         }
645         qemu_mutex_lock(&p->mutex);
646 
647         if (p->pending_job) {
648             uint32_t used = p->pages->used;
649             uint64_t packet_num = p->packet_num;
650             flags = p->flags;
651 
652             if (used) {
653                 ret = multifd_send_state->ops->send_prepare(p, used,
654                                                             &local_err);
655                 if (ret != 0) {
656                     qemu_mutex_unlock(&p->mutex);
657                     break;
658                 }
659             }
660             multifd_send_fill_packet(p);
661             p->flags = 0;
662             p->num_packets++;
663             p->num_pages += used;
664             p->pages->used = 0;
665             p->pages->block = NULL;
666             qemu_mutex_unlock(&p->mutex);
667 
668             trace_multifd_send(p->id, packet_num, used, flags,
669                                p->next_packet_size);
670 
671             ret = qio_channel_write_all(p->c, (void *)p->packet,
672                                         p->packet_len, &local_err);
673             if (ret != 0) {
674                 break;
675             }
676 
677             if (used) {
678                 ret = multifd_send_state->ops->send_write(p, used, &local_err);
679                 if (ret != 0) {
680                     break;
681                 }
682             }
683 
684             qemu_mutex_lock(&p->mutex);
685             p->pending_job--;
686             qemu_mutex_unlock(&p->mutex);
687 
688             if (flags & MULTIFD_FLAG_SYNC) {
689                 qemu_sem_post(&p->sem_sync);
690             }
691             qemu_sem_post(&multifd_send_state->channels_ready);
692         } else if (p->quit) {
693             qemu_mutex_unlock(&p->mutex);
694             break;
695         } else {
696             qemu_mutex_unlock(&p->mutex);
697             /* sometimes there are spurious wakeups */
698         }
699     }
700 
701 out:
702     if (local_err) {
703         trace_multifd_send_error(p->id);
704         multifd_send_terminate_threads(local_err);
705         error_free(local_err);
706     }
707 
708     /*
709      * Error happen, I will exit, but I can't just leave, tell
710      * who pay attention to me.
711      */
712     if (ret != 0) {
713         qemu_sem_post(&p->sem_sync);
714         qemu_sem_post(&multifd_send_state->channels_ready);
715     }
716 
717     qemu_mutex_lock(&p->mutex);
718     p->running = false;
719     qemu_mutex_unlock(&p->mutex);
720 
721     rcu_unregister_thread();
722     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
723 
724     return NULL;
725 }
726 
727 static bool multifd_channel_connect(MultiFDSendParams *p,
728                                     QIOChannel *ioc,
729                                     Error *error);
730 
731 static void multifd_tls_outgoing_handshake(QIOTask *task,
732                                            gpointer opaque)
733 {
734     MultiFDSendParams *p = opaque;
735     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
736     Error *err = NULL;
737 
738     if (qio_task_propagate_error(task, &err)) {
739         trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
740     } else {
741         trace_multifd_tls_outgoing_handshake_complete(ioc);
742     }
743 
744     if (!multifd_channel_connect(p, ioc, err)) {
745         /*
746          * Error happen, mark multifd_send_thread status as 'quit' although it
747          * is not created, and then tell who pay attention to me.
748          */
749         p->quit = true;
750         qemu_sem_post(&multifd_send_state->channels_ready);
751         qemu_sem_post(&p->sem_sync);
752     }
753 }
754 
755 static void *multifd_tls_handshake_thread(void *opaque)
756 {
757     MultiFDSendParams *p = opaque;
758     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
759 
760     qio_channel_tls_handshake(tioc,
761                               multifd_tls_outgoing_handshake,
762                               p,
763                               NULL,
764                               NULL);
765     return NULL;
766 }
767 
768 static void multifd_tls_channel_connect(MultiFDSendParams *p,
769                                         QIOChannel *ioc,
770                                         Error **errp)
771 {
772     MigrationState *s = migrate_get_current();
773     const char *hostname = p->tls_hostname;
774     QIOChannelTLS *tioc;
775 
776     tioc = migration_tls_client_create(s, ioc, hostname, errp);
777     if (!tioc) {
778         return;
779     }
780 
781     object_unref(OBJECT(ioc));
782     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
783     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
784     p->c = QIO_CHANNEL(tioc);
785     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
786                        multifd_tls_handshake_thread, p,
787                        QEMU_THREAD_JOINABLE);
788 }
789 
790 static bool multifd_channel_connect(MultiFDSendParams *p,
791                                     QIOChannel *ioc,
792                                     Error *error)
793 {
794     MigrationState *s = migrate_get_current();
795 
796     trace_multifd_set_outgoing_channel(
797         ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
798 
799     if (!error) {
800         if (s->parameters.tls_creds &&
801             *s->parameters.tls_creds &&
802             !object_dynamic_cast(OBJECT(ioc),
803                                  TYPE_QIO_CHANNEL_TLS)) {
804             multifd_tls_channel_connect(p, ioc, &error);
805             if (!error) {
806                 /*
807                  * tls_channel_connect will call back to this
808                  * function after the TLS handshake,
809                  * so we mustn't call multifd_send_thread until then
810                  */
811                 return true;
812             } else {
813                 return false;
814             }
815         } else {
816             /* update for tls qio channel */
817             p->c = ioc;
818             qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
819                                    QEMU_THREAD_JOINABLE);
820        }
821        return true;
822     }
823 
824     return false;
825 }
826 
827 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
828                                              QIOChannel *ioc, Error *err)
829 {
830      migrate_set_error(migrate_get_current(), err);
831      /* Error happen, we need to tell who pay attention to me */
832      qemu_sem_post(&multifd_send_state->channels_ready);
833      qemu_sem_post(&p->sem_sync);
834      /*
835       * Although multifd_send_thread is not created, but main migration
836       * thread neet to judge whether it is running, so we need to mark
837       * its status.
838       */
839      p->quit = true;
840      object_unref(OBJECT(ioc));
841      error_free(err);
842 }
843 
844 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
845 {
846     MultiFDSendParams *p = opaque;
847     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
848     Error *local_err = NULL;
849 
850     trace_multifd_new_send_channel_async(p->id);
851     if (qio_task_propagate_error(task, &local_err)) {
852         goto cleanup;
853     } else {
854         p->c = QIO_CHANNEL(sioc);
855         qio_channel_set_delay(p->c, false);
856         p->running = true;
857         if (!multifd_channel_connect(p, sioc, local_err)) {
858             goto cleanup;
859         }
860         return;
861     }
862 
863 cleanup:
864     multifd_new_send_channel_cleanup(p, sioc, local_err);
865 }
866 
867 int multifd_save_setup(Error **errp)
868 {
869     int thread_count;
870     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
871     uint8_t i;
872     MigrationState *s;
873 
874     if (!migrate_use_multifd()) {
875         return 0;
876     }
877     s = migrate_get_current();
878     thread_count = migrate_multifd_channels();
879     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
880     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
881     multifd_send_state->pages = multifd_pages_init(page_count);
882     qemu_sem_init(&multifd_send_state->channels_ready, 0);
883     qatomic_set(&multifd_send_state->exiting, 0);
884     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
885 
886     for (i = 0; i < thread_count; i++) {
887         MultiFDSendParams *p = &multifd_send_state->params[i];
888 
889         qemu_mutex_init(&p->mutex);
890         qemu_sem_init(&p->sem, 0);
891         qemu_sem_init(&p->sem_sync, 0);
892         p->quit = false;
893         p->pending_job = 0;
894         p->id = i;
895         p->pages = multifd_pages_init(page_count);
896         p->packet_len = sizeof(MultiFDPacket_t)
897                       + sizeof(uint64_t) * page_count;
898         p->packet = g_malloc0(p->packet_len);
899         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
900         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
901         p->name = g_strdup_printf("multifdsend_%d", i);
902         p->tls_hostname = g_strdup(s->hostname);
903         socket_send_channel_create(multifd_new_send_channel_async, p);
904     }
905 
906     for (i = 0; i < thread_count; i++) {
907         MultiFDSendParams *p = &multifd_send_state->params[i];
908         Error *local_err = NULL;
909         int ret;
910 
911         ret = multifd_send_state->ops->send_setup(p, &local_err);
912         if (ret) {
913             error_propagate(errp, local_err);
914             return ret;
915         }
916     }
917     return 0;
918 }
919 
920 struct {
921     MultiFDRecvParams *params;
922     /* number of created threads */
923     int count;
924     /* syncs main thread and channels */
925     QemuSemaphore sem_sync;
926     /* global number of generated multifd packets */
927     uint64_t packet_num;
928     /* multifd ops */
929     MultiFDMethods *ops;
930 } *multifd_recv_state;
931 
932 static void multifd_recv_terminate_threads(Error *err)
933 {
934     int i;
935 
936     trace_multifd_recv_terminate_threads(err != NULL);
937 
938     if (err) {
939         MigrationState *s = migrate_get_current();
940         migrate_set_error(s, err);
941         if (s->state == MIGRATION_STATUS_SETUP ||
942             s->state == MIGRATION_STATUS_ACTIVE) {
943             migrate_set_state(&s->state, s->state,
944                               MIGRATION_STATUS_FAILED);
945         }
946     }
947 
948     for (i = 0; i < migrate_multifd_channels(); i++) {
949         MultiFDRecvParams *p = &multifd_recv_state->params[i];
950 
951         qemu_mutex_lock(&p->mutex);
952         p->quit = true;
953         /*
954          * We could arrive here for two reasons:
955          *  - normal quit, i.e. everything went fine, just finished
956          *  - error quit: We close the channels so the channel threads
957          *    finish the qio_channel_read_all_eof()
958          */
959         if (p->c) {
960             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
961         }
962         qemu_mutex_unlock(&p->mutex);
963     }
964 }
965 
966 int multifd_load_cleanup(Error **errp)
967 {
968     int i;
969 
970     if (!migrate_use_multifd()) {
971         return 0;
972     }
973     multifd_recv_terminate_threads(NULL);
974     for (i = 0; i < migrate_multifd_channels(); i++) {
975         MultiFDRecvParams *p = &multifd_recv_state->params[i];
976 
977         if (p->running) {
978             p->quit = true;
979             /*
980              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
981              * however try to wakeup it without harm in cleanup phase.
982              */
983             qemu_sem_post(&p->sem_sync);
984             qemu_thread_join(&p->thread);
985         }
986     }
987     for (i = 0; i < migrate_multifd_channels(); i++) {
988         MultiFDRecvParams *p = &multifd_recv_state->params[i];
989 
990         if (object_dynamic_cast(OBJECT(p->c), TYPE_QIO_CHANNEL_SOCKET)
991             && OBJECT(p->c)->ref == 1) {
992             yank_unregister_function(MIGRATION_YANK_INSTANCE,
993                                      migration_yank_iochannel,
994                                      QIO_CHANNEL(p->c));
995         }
996 
997         object_unref(OBJECT(p->c));
998         p->c = NULL;
999         qemu_mutex_destroy(&p->mutex);
1000         qemu_sem_destroy(&p->sem_sync);
1001         g_free(p->name);
1002         p->name = NULL;
1003         multifd_pages_clear(p->pages);
1004         p->pages = NULL;
1005         p->packet_len = 0;
1006         g_free(p->packet);
1007         p->packet = NULL;
1008         multifd_recv_state->ops->recv_cleanup(p);
1009     }
1010     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1011     g_free(multifd_recv_state->params);
1012     multifd_recv_state->params = NULL;
1013     g_free(multifd_recv_state);
1014     multifd_recv_state = NULL;
1015 
1016     return 0;
1017 }
1018 
1019 void multifd_recv_sync_main(void)
1020 {
1021     int i;
1022 
1023     if (!migrate_use_multifd()) {
1024         return;
1025     }
1026     for (i = 0; i < migrate_multifd_channels(); i++) {
1027         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1028 
1029         trace_multifd_recv_sync_main_wait(p->id);
1030         qemu_sem_wait(&multifd_recv_state->sem_sync);
1031     }
1032     for (i = 0; i < migrate_multifd_channels(); i++) {
1033         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1034 
1035         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1036             if (multifd_recv_state->packet_num < p->packet_num) {
1037                 multifd_recv_state->packet_num = p->packet_num;
1038             }
1039         }
1040         trace_multifd_recv_sync_main_signal(p->id);
1041         qemu_sem_post(&p->sem_sync);
1042     }
1043     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1044 }
1045 
1046 static void *multifd_recv_thread(void *opaque)
1047 {
1048     MultiFDRecvParams *p = opaque;
1049     Error *local_err = NULL;
1050     int ret;
1051 
1052     trace_multifd_recv_thread_start(p->id);
1053     rcu_register_thread();
1054 
1055     while (true) {
1056         uint32_t used;
1057         uint32_t flags;
1058 
1059         if (p->quit) {
1060             break;
1061         }
1062 
1063         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1064                                        p->packet_len, &local_err);
1065         if (ret == 0) {   /* EOF */
1066             break;
1067         }
1068         if (ret == -1) {   /* Error */
1069             break;
1070         }
1071 
1072         qemu_mutex_lock(&p->mutex);
1073         ret = multifd_recv_unfill_packet(p, &local_err);
1074         if (ret) {
1075             qemu_mutex_unlock(&p->mutex);
1076             break;
1077         }
1078 
1079         used = p->pages->used;
1080         flags = p->flags;
1081         /* recv methods don't know how to handle the SYNC flag */
1082         p->flags &= ~MULTIFD_FLAG_SYNC;
1083         trace_multifd_recv(p->id, p->packet_num, used, flags,
1084                            p->next_packet_size);
1085         p->num_packets++;
1086         p->num_pages += used;
1087         qemu_mutex_unlock(&p->mutex);
1088 
1089         if (used) {
1090             ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
1091             if (ret != 0) {
1092                 break;
1093             }
1094         }
1095 
1096         if (flags & MULTIFD_FLAG_SYNC) {
1097             qemu_sem_post(&multifd_recv_state->sem_sync);
1098             qemu_sem_wait(&p->sem_sync);
1099         }
1100     }
1101 
1102     if (local_err) {
1103         multifd_recv_terminate_threads(local_err);
1104         error_free(local_err);
1105     }
1106     qemu_mutex_lock(&p->mutex);
1107     p->running = false;
1108     qemu_mutex_unlock(&p->mutex);
1109 
1110     rcu_unregister_thread();
1111     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1112 
1113     return NULL;
1114 }
1115 
1116 int multifd_load_setup(Error **errp)
1117 {
1118     int thread_count;
1119     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1120     uint8_t i;
1121 
1122     if (!migrate_use_multifd()) {
1123         return 0;
1124     }
1125     thread_count = migrate_multifd_channels();
1126     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1127     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1128     qatomic_set(&multifd_recv_state->count, 0);
1129     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1130     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1131 
1132     for (i = 0; i < thread_count; i++) {
1133         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1134 
1135         qemu_mutex_init(&p->mutex);
1136         qemu_sem_init(&p->sem_sync, 0);
1137         p->quit = false;
1138         p->id = i;
1139         p->pages = multifd_pages_init(page_count);
1140         p->packet_len = sizeof(MultiFDPacket_t)
1141                       + sizeof(uint64_t) * page_count;
1142         p->packet = g_malloc0(p->packet_len);
1143         p->name = g_strdup_printf("multifdrecv_%d", i);
1144     }
1145 
1146     for (i = 0; i < thread_count; i++) {
1147         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1148         Error *local_err = NULL;
1149         int ret;
1150 
1151         ret = multifd_recv_state->ops->recv_setup(p, &local_err);
1152         if (ret) {
1153             error_propagate(errp, local_err);
1154             return ret;
1155         }
1156     }
1157     return 0;
1158 }
1159 
1160 bool multifd_recv_all_channels_created(void)
1161 {
1162     int thread_count = migrate_multifd_channels();
1163 
1164     if (!migrate_use_multifd()) {
1165         return true;
1166     }
1167 
1168     return thread_count == qatomic_read(&multifd_recv_state->count);
1169 }
1170 
1171 /*
1172  * Try to receive all multifd channels to get ready for the migration.
1173  * - Return true and do not set @errp when correctly receiving all channels;
1174  * - Return false and do not set @errp when correctly receiving the current one;
1175  * - Return false and set @errp when failing to receive the current channel.
1176  */
1177 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1178 {
1179     MultiFDRecvParams *p;
1180     Error *local_err = NULL;
1181     int id;
1182 
1183     id = multifd_recv_initial_packet(ioc, &local_err);
1184     if (id < 0) {
1185         multifd_recv_terminate_threads(local_err);
1186         error_propagate_prepend(errp, local_err,
1187                                 "failed to receive packet"
1188                                 " via multifd channel %d: ",
1189                                 qatomic_read(&multifd_recv_state->count));
1190         return false;
1191     }
1192     trace_multifd_recv_new_channel(id);
1193 
1194     p = &multifd_recv_state->params[id];
1195     if (p->c != NULL) {
1196         error_setg(&local_err, "multifd: received id '%d' already setup'",
1197                    id);
1198         multifd_recv_terminate_threads(local_err);
1199         error_propagate(errp, local_err);
1200         return false;
1201     }
1202     p->c = ioc;
1203     object_ref(OBJECT(ioc));
1204     /* initial packet */
1205     p->num_packets = 1;
1206 
1207     p->running = true;
1208     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1209                        QEMU_THREAD_JOINABLE);
1210     qatomic_inc(&multifd_recv_state->count);
1211     return qatomic_read(&multifd_recv_state->count) ==
1212            migrate_multifd_channels();
1213 }
1214