xref: /qemu/migration/multifd.c (revision b355f08a)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "socket.h"
23 #include "tls.h"
24 #include "qemu-file.h"
25 #include "trace.h"
26 #include "multifd.h"
27 
28 #include "qemu/yank.h"
29 #include "io/channel-socket.h"
30 #include "yank_functions.h"
31 
32 /* Multiple fd's */
33 
34 #define MULTIFD_MAGIC 0x11223344U
35 #define MULTIFD_VERSION 1
36 
37 typedef struct {
38     uint32_t magic;
39     uint32_t version;
40     unsigned char uuid[16]; /* QemuUUID */
41     uint8_t id;
42     uint8_t unused1[7];     /* Reserved for future use */
43     uint64_t unused2[4];    /* Reserved for future use */
44 } __attribute__((packed)) MultiFDInit_t;
45 
46 /* Multifd without compression */
47 
48 /**
49  * nocomp_send_setup: setup send side
50  *
51  * For no compression this function does nothing.
52  *
53  * Returns 0 for success or -1 for error
54  *
55  * @p: Params for the channel that we are using
56  * @errp: pointer to an error
57  */
58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
59 {
60     return 0;
61 }
62 
63 /**
64  * nocomp_send_cleanup: cleanup send side
65  *
66  * For no compression this function does nothing.
67  *
68  * @p: Params for the channel that we are using
69  */
70 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
71 {
72     return;
73 }
74 
75 /**
76  * nocomp_send_prepare: prepare date to be able to send
77  *
78  * For no compression we just have to calculate the size of the
79  * packet.
80  *
81  * Returns 0 for success or -1 for error
82  *
83  * @p: Params for the channel that we are using
84  * @used: number of pages used
85  * @errp: pointer to an error
86  */
87 static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
88                                Error **errp)
89 {
90     p->next_packet_size = used * qemu_target_page_size();
91     p->flags |= MULTIFD_FLAG_NOCOMP;
92     return 0;
93 }
94 
95 /**
96  * nocomp_send_write: do the actual write of the data
97  *
98  * For no compression we just have to write the data.
99  *
100  * Returns 0 for success or -1 for error
101  *
102  * @p: Params for the channel that we are using
103  * @used: number of pages used
104  * @errp: pointer to an error
105  */
106 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
107 {
108     return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
109 }
110 
111 /**
112  * nocomp_recv_setup: setup receive side
113  *
114  * For no compression this function does nothing.
115  *
116  * Returns 0 for success or -1 for error
117  *
118  * @p: Params for the channel that we are using
119  * @errp: pointer to an error
120  */
121 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
122 {
123     return 0;
124 }
125 
126 /**
127  * nocomp_recv_cleanup: setup receive side
128  *
129  * For no compression this function does nothing.
130  *
131  * @p: Params for the channel that we are using
132  */
133 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
134 {
135 }
136 
137 /**
138  * nocomp_recv_pages: read the data from the channel into actual pages
139  *
140  * For no compression we just need to read things into the correct place.
141  *
142  * Returns 0 for success or -1 for error
143  *
144  * @p: Params for the channel that we are using
145  * @used: number of pages used
146  * @errp: pointer to an error
147  */
148 static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
149 {
150     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
151 
152     if (flags != MULTIFD_FLAG_NOCOMP) {
153         error_setg(errp, "multifd %d: flags received %x flags expected %x",
154                    p->id, flags, MULTIFD_FLAG_NOCOMP);
155         return -1;
156     }
157     return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
158 }
159 
160 static MultiFDMethods multifd_nocomp_ops = {
161     .send_setup = nocomp_send_setup,
162     .send_cleanup = nocomp_send_cleanup,
163     .send_prepare = nocomp_send_prepare,
164     .send_write = nocomp_send_write,
165     .recv_setup = nocomp_recv_setup,
166     .recv_cleanup = nocomp_recv_cleanup,
167     .recv_pages = nocomp_recv_pages
168 };
169 
170 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
171     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
172 };
173 
174 void multifd_register_ops(int method, MultiFDMethods *ops)
175 {
176     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
177     multifd_ops[method] = ops;
178 }
179 
180 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
181 {
182     MultiFDInit_t msg = {};
183     int ret;
184 
185     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
186     msg.version = cpu_to_be32(MULTIFD_VERSION);
187     msg.id = p->id;
188     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
189 
190     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
191     if (ret != 0) {
192         return -1;
193     }
194     return 0;
195 }
196 
197 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
198 {
199     MultiFDInit_t msg;
200     int ret;
201 
202     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
203     if (ret != 0) {
204         return -1;
205     }
206 
207     msg.magic = be32_to_cpu(msg.magic);
208     msg.version = be32_to_cpu(msg.version);
209 
210     if (msg.magic != MULTIFD_MAGIC) {
211         error_setg(errp, "multifd: received packet magic %x "
212                    "expected %x", msg.magic, MULTIFD_MAGIC);
213         return -1;
214     }
215 
216     if (msg.version != MULTIFD_VERSION) {
217         error_setg(errp, "multifd: received packet version %d "
218                    "expected %d", msg.version, MULTIFD_VERSION);
219         return -1;
220     }
221 
222     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
223         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
224         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
225 
226         error_setg(errp, "multifd: received uuid '%s' and expected "
227                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
228         g_free(uuid);
229         g_free(msg_uuid);
230         return -1;
231     }
232 
233     if (msg.id > migrate_multifd_channels()) {
234         error_setg(errp, "multifd: received channel version %d "
235                    "expected %d", msg.version, MULTIFD_VERSION);
236         return -1;
237     }
238 
239     return msg.id;
240 }
241 
242 static MultiFDPages_t *multifd_pages_init(size_t size)
243 {
244     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
245 
246     pages->allocated = size;
247     pages->iov = g_new0(struct iovec, size);
248     pages->offset = g_new0(ram_addr_t, size);
249 
250     return pages;
251 }
252 
253 static void multifd_pages_clear(MultiFDPages_t *pages)
254 {
255     pages->used = 0;
256     pages->allocated = 0;
257     pages->packet_num = 0;
258     pages->block = NULL;
259     g_free(pages->iov);
260     pages->iov = NULL;
261     g_free(pages->offset);
262     pages->offset = NULL;
263     g_free(pages);
264 }
265 
266 static void multifd_send_fill_packet(MultiFDSendParams *p)
267 {
268     MultiFDPacket_t *packet = p->packet;
269     int i;
270 
271     packet->flags = cpu_to_be32(p->flags);
272     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
273     packet->pages_used = cpu_to_be32(p->pages->used);
274     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
275     packet->packet_num = cpu_to_be64(p->packet_num);
276 
277     if (p->pages->block) {
278         strncpy(packet->ramblock, p->pages->block->idstr, 256);
279     }
280 
281     for (i = 0; i < p->pages->used; i++) {
282         /* there are architectures where ram_addr_t is 32 bit */
283         uint64_t temp = p->pages->offset[i];
284 
285         packet->offset[i] = cpu_to_be64(temp);
286     }
287 }
288 
289 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
290 {
291     MultiFDPacket_t *packet = p->packet;
292     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
293     RAMBlock *block;
294     int i;
295 
296     packet->magic = be32_to_cpu(packet->magic);
297     if (packet->magic != MULTIFD_MAGIC) {
298         error_setg(errp, "multifd: received packet "
299                    "magic %x and expected magic %x",
300                    packet->magic, MULTIFD_MAGIC);
301         return -1;
302     }
303 
304     packet->version = be32_to_cpu(packet->version);
305     if (packet->version != MULTIFD_VERSION) {
306         error_setg(errp, "multifd: received packet "
307                    "version %d and expected version %d",
308                    packet->version, MULTIFD_VERSION);
309         return -1;
310     }
311 
312     p->flags = be32_to_cpu(packet->flags);
313 
314     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
315     /*
316      * If we received a packet that is 100 times bigger than expected
317      * just stop migration.  It is a magic number.
318      */
319     if (packet->pages_alloc > pages_max * 100) {
320         error_setg(errp, "multifd: received packet "
321                    "with size %d and expected a maximum size of %d",
322                    packet->pages_alloc, pages_max * 100) ;
323         return -1;
324     }
325     /*
326      * We received a packet that is bigger than expected but inside
327      * reasonable limits (see previous comment).  Just reallocate.
328      */
329     if (packet->pages_alloc > p->pages->allocated) {
330         multifd_pages_clear(p->pages);
331         p->pages = multifd_pages_init(packet->pages_alloc);
332     }
333 
334     p->pages->used = be32_to_cpu(packet->pages_used);
335     if (p->pages->used > packet->pages_alloc) {
336         error_setg(errp, "multifd: received packet "
337                    "with %d pages and expected maximum pages are %d",
338                    p->pages->used, packet->pages_alloc) ;
339         return -1;
340     }
341 
342     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
343     p->packet_num = be64_to_cpu(packet->packet_num);
344 
345     if (p->pages->used == 0) {
346         return 0;
347     }
348 
349     /* make sure that ramblock is 0 terminated */
350     packet->ramblock[255] = 0;
351     block = qemu_ram_block_by_name(packet->ramblock);
352     if (!block) {
353         error_setg(errp, "multifd: unknown ram block %s",
354                    packet->ramblock);
355         return -1;
356     }
357 
358     for (i = 0; i < p->pages->used; i++) {
359         uint64_t offset = be64_to_cpu(packet->offset[i]);
360 
361         if (offset > (block->used_length - qemu_target_page_size())) {
362             error_setg(errp, "multifd: offset too long %" PRIu64
363                        " (max " RAM_ADDR_FMT ")",
364                        offset, block->used_length);
365             return -1;
366         }
367         p->pages->iov[i].iov_base = block->host + offset;
368         p->pages->iov[i].iov_len = qemu_target_page_size();
369     }
370 
371     return 0;
372 }
373 
374 struct {
375     MultiFDSendParams *params;
376     /* array of pages to sent */
377     MultiFDPages_t *pages;
378     /* global number of generated multifd packets */
379     uint64_t packet_num;
380     /* send channels ready */
381     QemuSemaphore channels_ready;
382     /*
383      * Have we already run terminate threads.  There is a race when it
384      * happens that we got one error while we are exiting.
385      * We will use atomic operations.  Only valid values are 0 and 1.
386      */
387     int exiting;
388     /* multifd ops */
389     MultiFDMethods *ops;
390 } *multifd_send_state;
391 
392 /*
393  * How we use multifd_send_state->pages and channel->pages?
394  *
395  * We create a pages for each channel, and a main one.  Each time that
396  * we need to send a batch of pages we interchange the ones between
397  * multifd_send_state and the channel that is sending it.  There are
398  * two reasons for that:
399  *    - to not have to do so many mallocs during migration
400  *    - to make easier to know what to free at the end of migration
401  *
402  * This way we always know who is the owner of each "pages" struct,
403  * and we don't need any locking.  It belongs to the migration thread
404  * or to the channel thread.  Switching is safe because the migration
405  * thread is using the channel mutex when changing it, and the channel
406  * have to had finish with its own, otherwise pending_job can't be
407  * false.
408  */
409 
410 static int multifd_send_pages(QEMUFile *f)
411 {
412     int i;
413     static int next_channel;
414     MultiFDSendParams *p = NULL; /* make happy gcc */
415     MultiFDPages_t *pages = multifd_send_state->pages;
416     uint64_t transferred;
417 
418     if (qatomic_read(&multifd_send_state->exiting)) {
419         return -1;
420     }
421 
422     qemu_sem_wait(&multifd_send_state->channels_ready);
423     /*
424      * next_channel can remain from a previous migration that was
425      * using more channels, so ensure it doesn't overflow if the
426      * limit is lower now.
427      */
428     next_channel %= migrate_multifd_channels();
429     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
430         p = &multifd_send_state->params[i];
431 
432         qemu_mutex_lock(&p->mutex);
433         if (p->quit) {
434             error_report("%s: channel %d has already quit!", __func__, i);
435             qemu_mutex_unlock(&p->mutex);
436             return -1;
437         }
438         if (!p->pending_job) {
439             p->pending_job++;
440             next_channel = (i + 1) % migrate_multifd_channels();
441             break;
442         }
443         qemu_mutex_unlock(&p->mutex);
444     }
445     assert(!p->pages->used);
446     assert(!p->pages->block);
447 
448     p->packet_num = multifd_send_state->packet_num++;
449     multifd_send_state->pages = p->pages;
450     p->pages = pages;
451     transferred = ((uint64_t) pages->used) * qemu_target_page_size()
452                 + p->packet_len;
453     qemu_file_update_transfer(f, transferred);
454     ram_counters.multifd_bytes += transferred;
455     ram_counters.transferred += transferred;
456     qemu_mutex_unlock(&p->mutex);
457     qemu_sem_post(&p->sem);
458 
459     return 1;
460 }
461 
462 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
463 {
464     MultiFDPages_t *pages = multifd_send_state->pages;
465 
466     if (!pages->block) {
467         pages->block = block;
468     }
469 
470     if (pages->block == block) {
471         pages->offset[pages->used] = offset;
472         pages->iov[pages->used].iov_base = block->host + offset;
473         pages->iov[pages->used].iov_len = qemu_target_page_size();
474         pages->used++;
475 
476         if (pages->used < pages->allocated) {
477             return 1;
478         }
479     }
480 
481     if (multifd_send_pages(f) < 0) {
482         return -1;
483     }
484 
485     if (pages->block != block) {
486         return  multifd_queue_page(f, block, offset);
487     }
488 
489     return 1;
490 }
491 
492 static void multifd_send_terminate_threads(Error *err)
493 {
494     int i;
495 
496     trace_multifd_send_terminate_threads(err != NULL);
497 
498     if (err) {
499         MigrationState *s = migrate_get_current();
500         migrate_set_error(s, err);
501         if (s->state == MIGRATION_STATUS_SETUP ||
502             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
503             s->state == MIGRATION_STATUS_DEVICE ||
504             s->state == MIGRATION_STATUS_ACTIVE) {
505             migrate_set_state(&s->state, s->state,
506                               MIGRATION_STATUS_FAILED);
507         }
508     }
509 
510     /*
511      * We don't want to exit each threads twice.  Depending on where
512      * we get the error, or if there are two independent errors in two
513      * threads at the same time, we can end calling this function
514      * twice.
515      */
516     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
517         return;
518     }
519 
520     for (i = 0; i < migrate_multifd_channels(); i++) {
521         MultiFDSendParams *p = &multifd_send_state->params[i];
522 
523         qemu_mutex_lock(&p->mutex);
524         p->quit = true;
525         qemu_sem_post(&p->sem);
526         qemu_mutex_unlock(&p->mutex);
527     }
528 }
529 
530 void multifd_save_cleanup(void)
531 {
532     int i;
533 
534     if (!migrate_use_multifd()) {
535         return;
536     }
537     multifd_send_terminate_threads(NULL);
538     for (i = 0; i < migrate_multifd_channels(); i++) {
539         MultiFDSendParams *p = &multifd_send_state->params[i];
540 
541         if (p->running) {
542             qemu_thread_join(&p->thread);
543         }
544     }
545     for (i = 0; i < migrate_multifd_channels(); i++) {
546         MultiFDSendParams *p = &multifd_send_state->params[i];
547         Error *local_err = NULL;
548 
549         socket_send_channel_destroy(p->c);
550         p->c = NULL;
551         qemu_mutex_destroy(&p->mutex);
552         qemu_sem_destroy(&p->sem);
553         qemu_sem_destroy(&p->sem_sync);
554         g_free(p->name);
555         p->name = NULL;
556         g_free(p->tls_hostname);
557         p->tls_hostname = NULL;
558         multifd_pages_clear(p->pages);
559         p->pages = NULL;
560         p->packet_len = 0;
561         g_free(p->packet);
562         p->packet = NULL;
563         multifd_send_state->ops->send_cleanup(p, &local_err);
564         if (local_err) {
565             migrate_set_error(migrate_get_current(), local_err);
566             error_free(local_err);
567         }
568     }
569     qemu_sem_destroy(&multifd_send_state->channels_ready);
570     g_free(multifd_send_state->params);
571     multifd_send_state->params = NULL;
572     multifd_pages_clear(multifd_send_state->pages);
573     multifd_send_state->pages = NULL;
574     g_free(multifd_send_state);
575     multifd_send_state = NULL;
576 }
577 
578 void multifd_send_sync_main(QEMUFile *f)
579 {
580     int i;
581 
582     if (!migrate_use_multifd()) {
583         return;
584     }
585     if (multifd_send_state->pages->used) {
586         if (multifd_send_pages(f) < 0) {
587             error_report("%s: multifd_send_pages fail", __func__);
588             return;
589         }
590     }
591     for (i = 0; i < migrate_multifd_channels(); i++) {
592         MultiFDSendParams *p = &multifd_send_state->params[i];
593 
594         trace_multifd_send_sync_main_signal(p->id);
595 
596         qemu_mutex_lock(&p->mutex);
597 
598         if (p->quit) {
599             error_report("%s: channel %d has already quit", __func__, i);
600             qemu_mutex_unlock(&p->mutex);
601             return;
602         }
603 
604         p->packet_num = multifd_send_state->packet_num++;
605         p->flags |= MULTIFD_FLAG_SYNC;
606         p->pending_job++;
607         qemu_file_update_transfer(f, p->packet_len);
608         ram_counters.multifd_bytes += p->packet_len;
609         ram_counters.transferred += p->packet_len;
610         qemu_mutex_unlock(&p->mutex);
611         qemu_sem_post(&p->sem);
612     }
613     for (i = 0; i < migrate_multifd_channels(); i++) {
614         MultiFDSendParams *p = &multifd_send_state->params[i];
615 
616         trace_multifd_send_sync_main_wait(p->id);
617         qemu_sem_wait(&p->sem_sync);
618     }
619     trace_multifd_send_sync_main(multifd_send_state->packet_num);
620 }
621 
622 static void *multifd_send_thread(void *opaque)
623 {
624     MultiFDSendParams *p = opaque;
625     Error *local_err = NULL;
626     int ret = 0;
627     uint32_t flags = 0;
628 
629     trace_multifd_send_thread_start(p->id);
630     rcu_register_thread();
631 
632     if (multifd_send_initial_packet(p, &local_err) < 0) {
633         ret = -1;
634         goto out;
635     }
636     /* initial packet */
637     p->num_packets = 1;
638 
639     while (true) {
640         qemu_sem_wait(&p->sem);
641 
642         if (qatomic_read(&multifd_send_state->exiting)) {
643             break;
644         }
645         qemu_mutex_lock(&p->mutex);
646 
647         if (p->pending_job) {
648             uint32_t used = p->pages->used;
649             uint64_t packet_num = p->packet_num;
650             flags = p->flags;
651 
652             if (used) {
653                 ret = multifd_send_state->ops->send_prepare(p, used,
654                                                             &local_err);
655                 if (ret != 0) {
656                     qemu_mutex_unlock(&p->mutex);
657                     break;
658                 }
659             }
660             multifd_send_fill_packet(p);
661             p->flags = 0;
662             p->num_packets++;
663             p->num_pages += used;
664             p->pages->used = 0;
665             p->pages->block = NULL;
666             qemu_mutex_unlock(&p->mutex);
667 
668             trace_multifd_send(p->id, packet_num, used, flags,
669                                p->next_packet_size);
670 
671             ret = qio_channel_write_all(p->c, (void *)p->packet,
672                                         p->packet_len, &local_err);
673             if (ret != 0) {
674                 break;
675             }
676 
677             if (used) {
678                 ret = multifd_send_state->ops->send_write(p, used, &local_err);
679                 if (ret != 0) {
680                     break;
681                 }
682             }
683 
684             qemu_mutex_lock(&p->mutex);
685             p->pending_job--;
686             qemu_mutex_unlock(&p->mutex);
687 
688             if (flags & MULTIFD_FLAG_SYNC) {
689                 qemu_sem_post(&p->sem_sync);
690             }
691             qemu_sem_post(&multifd_send_state->channels_ready);
692         } else if (p->quit) {
693             qemu_mutex_unlock(&p->mutex);
694             break;
695         } else {
696             qemu_mutex_unlock(&p->mutex);
697             /* sometimes there are spurious wakeups */
698         }
699     }
700 
701 out:
702     if (local_err) {
703         trace_multifd_send_error(p->id);
704         multifd_send_terminate_threads(local_err);
705         error_free(local_err);
706     }
707 
708     /*
709      * Error happen, I will exit, but I can't just leave, tell
710      * who pay attention to me.
711      */
712     if (ret != 0) {
713         qemu_sem_post(&p->sem_sync);
714         qemu_sem_post(&multifd_send_state->channels_ready);
715     }
716 
717     qemu_mutex_lock(&p->mutex);
718     p->running = false;
719     qemu_mutex_unlock(&p->mutex);
720 
721     rcu_unregister_thread();
722     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
723 
724     return NULL;
725 }
726 
727 static bool multifd_channel_connect(MultiFDSendParams *p,
728                                     QIOChannel *ioc,
729                                     Error *error);
730 
731 static void multifd_tls_outgoing_handshake(QIOTask *task,
732                                            gpointer opaque)
733 {
734     MultiFDSendParams *p = opaque;
735     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
736     Error *err = NULL;
737 
738     if (qio_task_propagate_error(task, &err)) {
739         trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
740     } else {
741         trace_multifd_tls_outgoing_handshake_complete(ioc);
742     }
743 
744     if (!multifd_channel_connect(p, ioc, err)) {
745         /*
746          * Error happen, mark multifd_send_thread status as 'quit' although it
747          * is not created, and then tell who pay attention to me.
748          */
749         p->quit = true;
750         qemu_sem_post(&multifd_send_state->channels_ready);
751         qemu_sem_post(&p->sem_sync);
752     }
753 }
754 
755 static void *multifd_tls_handshake_thread(void *opaque)
756 {
757     MultiFDSendParams *p = opaque;
758     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
759 
760     qio_channel_tls_handshake(tioc,
761                               multifd_tls_outgoing_handshake,
762                               p,
763                               NULL,
764                               NULL);
765     return NULL;
766 }
767 
768 static void multifd_tls_channel_connect(MultiFDSendParams *p,
769                                         QIOChannel *ioc,
770                                         Error **errp)
771 {
772     MigrationState *s = migrate_get_current();
773     const char *hostname = p->tls_hostname;
774     QIOChannelTLS *tioc;
775 
776     tioc = migration_tls_client_create(s, ioc, hostname, errp);
777     if (!tioc) {
778         return;
779     }
780 
781     object_unref(OBJECT(ioc));
782     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
783     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
784     p->c = QIO_CHANNEL(tioc);
785     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
786                        multifd_tls_handshake_thread, p,
787                        QEMU_THREAD_JOINABLE);
788 }
789 
790 static bool multifd_channel_connect(MultiFDSendParams *p,
791                                     QIOChannel *ioc,
792                                     Error *error)
793 {
794     MigrationState *s = migrate_get_current();
795 
796     trace_multifd_set_outgoing_channel(
797         ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
798 
799     if (!error) {
800         if (s->parameters.tls_creds &&
801             *s->parameters.tls_creds &&
802             !object_dynamic_cast(OBJECT(ioc),
803                                  TYPE_QIO_CHANNEL_TLS)) {
804             multifd_tls_channel_connect(p, ioc, &error);
805             if (!error) {
806                 /*
807                  * tls_channel_connect will call back to this
808                  * function after the TLS handshake,
809                  * so we mustn't call multifd_send_thread until then
810                  */
811                 return true;
812             } else {
813                 return false;
814             }
815         } else {
816             /* update for tls qio channel */
817             p->c = ioc;
818             qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
819                                    QEMU_THREAD_JOINABLE);
820        }
821        return true;
822     }
823 
824     return false;
825 }
826 
827 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
828                                              QIOChannel *ioc, Error *err)
829 {
830      migrate_set_error(migrate_get_current(), err);
831      /* Error happen, we need to tell who pay attention to me */
832      qemu_sem_post(&multifd_send_state->channels_ready);
833      qemu_sem_post(&p->sem_sync);
834      /*
835       * Although multifd_send_thread is not created, but main migration
836       * thread neet to judge whether it is running, so we need to mark
837       * its status.
838       */
839      p->quit = true;
840      object_unref(OBJECT(ioc));
841      error_free(err);
842 }
843 
844 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
845 {
846     MultiFDSendParams *p = opaque;
847     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
848     Error *local_err = NULL;
849 
850     trace_multifd_new_send_channel_async(p->id);
851     if (qio_task_propagate_error(task, &local_err)) {
852         goto cleanup;
853     } else {
854         p->c = QIO_CHANNEL(sioc);
855         qio_channel_set_delay(p->c, false);
856         p->running = true;
857         if (!multifd_channel_connect(p, sioc, local_err)) {
858             goto cleanup;
859         }
860         return;
861     }
862 
863 cleanup:
864     multifd_new_send_channel_cleanup(p, sioc, local_err);
865 }
866 
867 int multifd_save_setup(Error **errp)
868 {
869     int thread_count;
870     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
871     uint8_t i;
872     MigrationState *s;
873 
874     if (!migrate_use_multifd()) {
875         return 0;
876     }
877     s = migrate_get_current();
878     thread_count = migrate_multifd_channels();
879     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
880     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
881     multifd_send_state->pages = multifd_pages_init(page_count);
882     qemu_sem_init(&multifd_send_state->channels_ready, 0);
883     qatomic_set(&multifd_send_state->exiting, 0);
884     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
885 
886     for (i = 0; i < thread_count; i++) {
887         MultiFDSendParams *p = &multifd_send_state->params[i];
888 
889         qemu_mutex_init(&p->mutex);
890         qemu_sem_init(&p->sem, 0);
891         qemu_sem_init(&p->sem_sync, 0);
892         p->quit = false;
893         p->pending_job = 0;
894         p->id = i;
895         p->pages = multifd_pages_init(page_count);
896         p->packet_len = sizeof(MultiFDPacket_t)
897                       + sizeof(uint64_t) * page_count;
898         p->packet = g_malloc0(p->packet_len);
899         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
900         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
901         p->name = g_strdup_printf("multifdsend_%d", i);
902         p->tls_hostname = g_strdup(s->hostname);
903         socket_send_channel_create(multifd_new_send_channel_async, p);
904     }
905 
906     for (i = 0; i < thread_count; i++) {
907         MultiFDSendParams *p = &multifd_send_state->params[i];
908         Error *local_err = NULL;
909         int ret;
910 
911         ret = multifd_send_state->ops->send_setup(p, &local_err);
912         if (ret) {
913             error_propagate(errp, local_err);
914             return ret;
915         }
916     }
917     return 0;
918 }
919 
920 struct {
921     MultiFDRecvParams *params;
922     /* number of created threads */
923     int count;
924     /* syncs main thread and channels */
925     QemuSemaphore sem_sync;
926     /* global number of generated multifd packets */
927     uint64_t packet_num;
928     /* multifd ops */
929     MultiFDMethods *ops;
930 } *multifd_recv_state;
931 
932 static void multifd_recv_terminate_threads(Error *err)
933 {
934     int i;
935 
936     trace_multifd_recv_terminate_threads(err != NULL);
937 
938     if (err) {
939         MigrationState *s = migrate_get_current();
940         migrate_set_error(s, err);
941         if (s->state == MIGRATION_STATUS_SETUP ||
942             s->state == MIGRATION_STATUS_ACTIVE) {
943             migrate_set_state(&s->state, s->state,
944                               MIGRATION_STATUS_FAILED);
945         }
946     }
947 
948     for (i = 0; i < migrate_multifd_channels(); i++) {
949         MultiFDRecvParams *p = &multifd_recv_state->params[i];
950 
951         qemu_mutex_lock(&p->mutex);
952         p->quit = true;
953         /*
954          * We could arrive here for two reasons:
955          *  - normal quit, i.e. everything went fine, just finished
956          *  - error quit: We close the channels so the channel threads
957          *    finish the qio_channel_read_all_eof()
958          */
959         if (p->c) {
960             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
961         }
962         qemu_mutex_unlock(&p->mutex);
963     }
964 }
965 
966 int multifd_load_cleanup(Error **errp)
967 {
968     int i;
969 
970     if (!migrate_use_multifd()) {
971         return 0;
972     }
973     multifd_recv_terminate_threads(NULL);
974     for (i = 0; i < migrate_multifd_channels(); i++) {
975         MultiFDRecvParams *p = &multifd_recv_state->params[i];
976 
977         if (p->running) {
978             p->quit = true;
979             /*
980              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
981              * however try to wakeup it without harm in cleanup phase.
982              */
983             qemu_sem_post(&p->sem_sync);
984             qemu_thread_join(&p->thread);
985         }
986     }
987     for (i = 0; i < migrate_multifd_channels(); i++) {
988         MultiFDRecvParams *p = &multifd_recv_state->params[i];
989 
990         if (OBJECT(p->c)->ref == 1) {
991             migration_ioc_unregister_yank(p->c);
992         }
993 
994         object_unref(OBJECT(p->c));
995         p->c = NULL;
996         qemu_mutex_destroy(&p->mutex);
997         qemu_sem_destroy(&p->sem_sync);
998         g_free(p->name);
999         p->name = NULL;
1000         multifd_pages_clear(p->pages);
1001         p->pages = NULL;
1002         p->packet_len = 0;
1003         g_free(p->packet);
1004         p->packet = NULL;
1005         multifd_recv_state->ops->recv_cleanup(p);
1006     }
1007     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1008     g_free(multifd_recv_state->params);
1009     multifd_recv_state->params = NULL;
1010     g_free(multifd_recv_state);
1011     multifd_recv_state = NULL;
1012 
1013     return 0;
1014 }
1015 
1016 void multifd_recv_sync_main(void)
1017 {
1018     int i;
1019 
1020     if (!migrate_use_multifd()) {
1021         return;
1022     }
1023     for (i = 0; i < migrate_multifd_channels(); i++) {
1024         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1025 
1026         trace_multifd_recv_sync_main_wait(p->id);
1027         qemu_sem_wait(&multifd_recv_state->sem_sync);
1028     }
1029     for (i = 0; i < migrate_multifd_channels(); i++) {
1030         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1031 
1032         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1033             if (multifd_recv_state->packet_num < p->packet_num) {
1034                 multifd_recv_state->packet_num = p->packet_num;
1035             }
1036         }
1037         trace_multifd_recv_sync_main_signal(p->id);
1038         qemu_sem_post(&p->sem_sync);
1039     }
1040     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1041 }
1042 
1043 static void *multifd_recv_thread(void *opaque)
1044 {
1045     MultiFDRecvParams *p = opaque;
1046     Error *local_err = NULL;
1047     int ret;
1048 
1049     trace_multifd_recv_thread_start(p->id);
1050     rcu_register_thread();
1051 
1052     while (true) {
1053         uint32_t used;
1054         uint32_t flags;
1055 
1056         if (p->quit) {
1057             break;
1058         }
1059 
1060         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1061                                        p->packet_len, &local_err);
1062         if (ret == 0) {   /* EOF */
1063             break;
1064         }
1065         if (ret == -1) {   /* Error */
1066             break;
1067         }
1068 
1069         qemu_mutex_lock(&p->mutex);
1070         ret = multifd_recv_unfill_packet(p, &local_err);
1071         if (ret) {
1072             qemu_mutex_unlock(&p->mutex);
1073             break;
1074         }
1075 
1076         used = p->pages->used;
1077         flags = p->flags;
1078         /* recv methods don't know how to handle the SYNC flag */
1079         p->flags &= ~MULTIFD_FLAG_SYNC;
1080         trace_multifd_recv(p->id, p->packet_num, used, flags,
1081                            p->next_packet_size);
1082         p->num_packets++;
1083         p->num_pages += used;
1084         qemu_mutex_unlock(&p->mutex);
1085 
1086         if (used) {
1087             ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
1088             if (ret != 0) {
1089                 break;
1090             }
1091         }
1092 
1093         if (flags & MULTIFD_FLAG_SYNC) {
1094             qemu_sem_post(&multifd_recv_state->sem_sync);
1095             qemu_sem_wait(&p->sem_sync);
1096         }
1097     }
1098 
1099     if (local_err) {
1100         multifd_recv_terminate_threads(local_err);
1101         error_free(local_err);
1102     }
1103     qemu_mutex_lock(&p->mutex);
1104     p->running = false;
1105     qemu_mutex_unlock(&p->mutex);
1106 
1107     rcu_unregister_thread();
1108     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1109 
1110     return NULL;
1111 }
1112 
1113 int multifd_load_setup(Error **errp)
1114 {
1115     int thread_count;
1116     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1117     uint8_t i;
1118 
1119     if (!migrate_use_multifd()) {
1120         return 0;
1121     }
1122     thread_count = migrate_multifd_channels();
1123     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1124     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1125     qatomic_set(&multifd_recv_state->count, 0);
1126     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1127     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1128 
1129     for (i = 0; i < thread_count; i++) {
1130         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1131 
1132         qemu_mutex_init(&p->mutex);
1133         qemu_sem_init(&p->sem_sync, 0);
1134         p->quit = false;
1135         p->id = i;
1136         p->pages = multifd_pages_init(page_count);
1137         p->packet_len = sizeof(MultiFDPacket_t)
1138                       + sizeof(uint64_t) * page_count;
1139         p->packet = g_malloc0(p->packet_len);
1140         p->name = g_strdup_printf("multifdrecv_%d", i);
1141     }
1142 
1143     for (i = 0; i < thread_count; i++) {
1144         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1145         Error *local_err = NULL;
1146         int ret;
1147 
1148         ret = multifd_recv_state->ops->recv_setup(p, &local_err);
1149         if (ret) {
1150             error_propagate(errp, local_err);
1151             return ret;
1152         }
1153     }
1154     return 0;
1155 }
1156 
1157 bool multifd_recv_all_channels_created(void)
1158 {
1159     int thread_count = migrate_multifd_channels();
1160 
1161     if (!migrate_use_multifd()) {
1162         return true;
1163     }
1164 
1165     if (!multifd_recv_state) {
1166         /* Called before any connections created */
1167         return false;
1168     }
1169 
1170     return thread_count == qatomic_read(&multifd_recv_state->count);
1171 }
1172 
1173 /*
1174  * Try to receive all multifd channels to get ready for the migration.
1175  * - Return true and do not set @errp when correctly receiving all channels;
1176  * - Return false and do not set @errp when correctly receiving the current one;
1177  * - Return false and set @errp when failing to receive the current channel.
1178  */
1179 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1180 {
1181     MultiFDRecvParams *p;
1182     Error *local_err = NULL;
1183     int id;
1184 
1185     id = multifd_recv_initial_packet(ioc, &local_err);
1186     if (id < 0) {
1187         multifd_recv_terminate_threads(local_err);
1188         error_propagate_prepend(errp, local_err,
1189                                 "failed to receive packet"
1190                                 " via multifd channel %d: ",
1191                                 qatomic_read(&multifd_recv_state->count));
1192         return false;
1193     }
1194     trace_multifd_recv_new_channel(id);
1195 
1196     p = &multifd_recv_state->params[id];
1197     if (p->c != NULL) {
1198         error_setg(&local_err, "multifd: received id '%d' already setup'",
1199                    id);
1200         multifd_recv_terminate_threads(local_err);
1201         error_propagate(errp, local_err);
1202         return false;
1203     }
1204     p->c = ioc;
1205     object_ref(OBJECT(ioc));
1206     /* initial packet */
1207     p->num_packets = 1;
1208 
1209     p->running = true;
1210     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1211                        QEMU_THREAD_JOINABLE);
1212     qatomic_inc(&multifd_recv_state->count);
1213     return qatomic_read(&multifd_recv_state->count) ==
1214            migrate_multifd_channels();
1215 }
1216