xref: /qemu/migration/multifd.c (revision dcc474c6)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "socket.h"
23 #include "qemu-file.h"
24 #include "trace.h"
25 #include "multifd.h"
26 
27 /* Multiple fd's */
28 
29 #define MULTIFD_MAGIC 0x11223344U
30 #define MULTIFD_VERSION 1
31 
32 typedef struct {
33     uint32_t magic;
34     uint32_t version;
35     unsigned char uuid[16]; /* QemuUUID */
36     uint8_t id;
37     uint8_t unused1[7];     /* Reserved for future use */
38     uint64_t unused2[4];    /* Reserved for future use */
39 } __attribute__((packed)) MultiFDInit_t;
40 
41 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
42 {
43     MultiFDInit_t msg = {};
44     int ret;
45 
46     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
47     msg.version = cpu_to_be32(MULTIFD_VERSION);
48     msg.id = p->id;
49     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
50 
51     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
52     if (ret != 0) {
53         return -1;
54     }
55     return 0;
56 }
57 
58 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
59 {
60     MultiFDInit_t msg;
61     int ret;
62 
63     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
64     if (ret != 0) {
65         return -1;
66     }
67 
68     msg.magic = be32_to_cpu(msg.magic);
69     msg.version = be32_to_cpu(msg.version);
70 
71     if (msg.magic != MULTIFD_MAGIC) {
72         error_setg(errp, "multifd: received packet magic %x "
73                    "expected %x", msg.magic, MULTIFD_MAGIC);
74         return -1;
75     }
76 
77     if (msg.version != MULTIFD_VERSION) {
78         error_setg(errp, "multifd: received packet version %d "
79                    "expected %d", msg.version, MULTIFD_VERSION);
80         return -1;
81     }
82 
83     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
84         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
85         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
86 
87         error_setg(errp, "multifd: received uuid '%s' and expected "
88                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
89         g_free(uuid);
90         g_free(msg_uuid);
91         return -1;
92     }
93 
94     if (msg.id > migrate_multifd_channels()) {
95         error_setg(errp, "multifd: received channel version %d "
96                    "expected %d", msg.version, MULTIFD_VERSION);
97         return -1;
98     }
99 
100     return msg.id;
101 }
102 
103 static MultiFDPages_t *multifd_pages_init(size_t size)
104 {
105     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
106 
107     pages->allocated = size;
108     pages->iov = g_new0(struct iovec, size);
109     pages->offset = g_new0(ram_addr_t, size);
110 
111     return pages;
112 }
113 
114 static void multifd_pages_clear(MultiFDPages_t *pages)
115 {
116     pages->used = 0;
117     pages->allocated = 0;
118     pages->packet_num = 0;
119     pages->block = NULL;
120     g_free(pages->iov);
121     pages->iov = NULL;
122     g_free(pages->offset);
123     pages->offset = NULL;
124     g_free(pages);
125 }
126 
127 static void multifd_send_fill_packet(MultiFDSendParams *p)
128 {
129     MultiFDPacket_t *packet = p->packet;
130     int i;
131 
132     packet->flags = cpu_to_be32(p->flags);
133     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
134     packet->pages_used = cpu_to_be32(p->pages->used);
135     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
136     packet->packet_num = cpu_to_be64(p->packet_num);
137 
138     if (p->pages->block) {
139         strncpy(packet->ramblock, p->pages->block->idstr, 256);
140     }
141 
142     for (i = 0; i < p->pages->used; i++) {
143         /* there are architectures where ram_addr_t is 32 bit */
144         uint64_t temp = p->pages->offset[i];
145 
146         packet->offset[i] = cpu_to_be64(temp);
147     }
148 }
149 
150 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
151 {
152     MultiFDPacket_t *packet = p->packet;
153     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
154     RAMBlock *block;
155     int i;
156 
157     packet->magic = be32_to_cpu(packet->magic);
158     if (packet->magic != MULTIFD_MAGIC) {
159         error_setg(errp, "multifd: received packet "
160                    "magic %x and expected magic %x",
161                    packet->magic, MULTIFD_MAGIC);
162         return -1;
163     }
164 
165     packet->version = be32_to_cpu(packet->version);
166     if (packet->version != MULTIFD_VERSION) {
167         error_setg(errp, "multifd: received packet "
168                    "version %d and expected version %d",
169                    packet->version, MULTIFD_VERSION);
170         return -1;
171     }
172 
173     p->flags = be32_to_cpu(packet->flags);
174 
175     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
176     /*
177      * If we received a packet that is 100 times bigger than expected
178      * just stop migration.  It is a magic number.
179      */
180     if (packet->pages_alloc > pages_max * 100) {
181         error_setg(errp, "multifd: received packet "
182                    "with size %d and expected a maximum size of %d",
183                    packet->pages_alloc, pages_max * 100) ;
184         return -1;
185     }
186     /*
187      * We received a packet that is bigger than expected but inside
188      * reasonable limits (see previous comment).  Just reallocate.
189      */
190     if (packet->pages_alloc > p->pages->allocated) {
191         multifd_pages_clear(p->pages);
192         p->pages = multifd_pages_init(packet->pages_alloc);
193     }
194 
195     p->pages->used = be32_to_cpu(packet->pages_used);
196     if (p->pages->used > packet->pages_alloc) {
197         error_setg(errp, "multifd: received packet "
198                    "with %d pages and expected maximum pages are %d",
199                    p->pages->used, packet->pages_alloc) ;
200         return -1;
201     }
202 
203     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
204     p->packet_num = be64_to_cpu(packet->packet_num);
205 
206     if (p->pages->used == 0) {
207         return 0;
208     }
209 
210     /* make sure that ramblock is 0 terminated */
211     packet->ramblock[255] = 0;
212     block = qemu_ram_block_by_name(packet->ramblock);
213     if (!block) {
214         error_setg(errp, "multifd: unknown ram block %s",
215                    packet->ramblock);
216         return -1;
217     }
218 
219     for (i = 0; i < p->pages->used; i++) {
220         uint64_t offset = be64_to_cpu(packet->offset[i]);
221 
222         if (offset > (block->used_length - qemu_target_page_size())) {
223             error_setg(errp, "multifd: offset too long %" PRIu64
224                        " (max " RAM_ADDR_FMT ")",
225                        offset, block->max_length);
226             return -1;
227         }
228         p->pages->iov[i].iov_base = block->host + offset;
229         p->pages->iov[i].iov_len = qemu_target_page_size();
230     }
231 
232     return 0;
233 }
234 
235 struct {
236     MultiFDSendParams *params;
237     /* array of pages to sent */
238     MultiFDPages_t *pages;
239     /* global number of generated multifd packets */
240     uint64_t packet_num;
241     /* send channels ready */
242     QemuSemaphore channels_ready;
243     /*
244      * Have we already run terminate threads.  There is a race when it
245      * happens that we got one error while we are exiting.
246      * We will use atomic operations.  Only valid values are 0 and 1.
247      */
248     int exiting;
249 } *multifd_send_state;
250 
251 /*
252  * How we use multifd_send_state->pages and channel->pages?
253  *
254  * We create a pages for each channel, and a main one.  Each time that
255  * we need to send a batch of pages we interchange the ones between
256  * multifd_send_state and the channel that is sending it.  There are
257  * two reasons for that:
258  *    - to not have to do so many mallocs during migration
259  *    - to make easier to know what to free at the end of migration
260  *
261  * This way we always know who is the owner of each "pages" struct,
262  * and we don't need any locking.  It belongs to the migration thread
263  * or to the channel thread.  Switching is safe because the migration
264  * thread is using the channel mutex when changing it, and the channel
265  * have to had finish with its own, otherwise pending_job can't be
266  * false.
267  */
268 
269 static int multifd_send_pages(QEMUFile *f)
270 {
271     int i;
272     static int next_channel;
273     MultiFDSendParams *p = NULL; /* make happy gcc */
274     MultiFDPages_t *pages = multifd_send_state->pages;
275     uint64_t transferred;
276 
277     if (atomic_read(&multifd_send_state->exiting)) {
278         return -1;
279     }
280 
281     qemu_sem_wait(&multifd_send_state->channels_ready);
282     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
283         p = &multifd_send_state->params[i];
284 
285         qemu_mutex_lock(&p->mutex);
286         if (p->quit) {
287             error_report("%s: channel %d has already quit!", __func__, i);
288             qemu_mutex_unlock(&p->mutex);
289             return -1;
290         }
291         if (!p->pending_job) {
292             p->pending_job++;
293             next_channel = (i + 1) % migrate_multifd_channels();
294             break;
295         }
296         qemu_mutex_unlock(&p->mutex);
297     }
298     assert(!p->pages->used);
299     assert(!p->pages->block);
300 
301     p->packet_num = multifd_send_state->packet_num++;
302     multifd_send_state->pages = p->pages;
303     p->pages = pages;
304     transferred = ((uint64_t) pages->used) * qemu_target_page_size()
305                 + p->packet_len;
306     qemu_file_update_transfer(f, transferred);
307     ram_counters.multifd_bytes += transferred;
308     ram_counters.transferred += transferred;;
309     qemu_mutex_unlock(&p->mutex);
310     qemu_sem_post(&p->sem);
311 
312     return 1;
313 }
314 
315 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
316 {
317     MultiFDPages_t *pages = multifd_send_state->pages;
318 
319     if (!pages->block) {
320         pages->block = block;
321     }
322 
323     if (pages->block == block) {
324         pages->offset[pages->used] = offset;
325         pages->iov[pages->used].iov_base = block->host + offset;
326         pages->iov[pages->used].iov_len = qemu_target_page_size();
327         pages->used++;
328 
329         if (pages->used < pages->allocated) {
330             return 1;
331         }
332     }
333 
334     if (multifd_send_pages(f) < 0) {
335         return -1;
336     }
337 
338     if (pages->block != block) {
339         return  multifd_queue_page(f, block, offset);
340     }
341 
342     return 1;
343 }
344 
345 static void multifd_send_terminate_threads(Error *err)
346 {
347     int i;
348 
349     trace_multifd_send_terminate_threads(err != NULL);
350 
351     if (err) {
352         MigrationState *s = migrate_get_current();
353         migrate_set_error(s, err);
354         if (s->state == MIGRATION_STATUS_SETUP ||
355             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
356             s->state == MIGRATION_STATUS_DEVICE ||
357             s->state == MIGRATION_STATUS_ACTIVE) {
358             migrate_set_state(&s->state, s->state,
359                               MIGRATION_STATUS_FAILED);
360         }
361     }
362 
363     /*
364      * We don't want to exit each threads twice.  Depending on where
365      * we get the error, or if there are two independent errors in two
366      * threads at the same time, we can end calling this function
367      * twice.
368      */
369     if (atomic_xchg(&multifd_send_state->exiting, 1)) {
370         return;
371     }
372 
373     for (i = 0; i < migrate_multifd_channels(); i++) {
374         MultiFDSendParams *p = &multifd_send_state->params[i];
375 
376         qemu_mutex_lock(&p->mutex);
377         p->quit = true;
378         qemu_sem_post(&p->sem);
379         qemu_mutex_unlock(&p->mutex);
380     }
381 }
382 
383 void multifd_save_cleanup(void)
384 {
385     int i;
386 
387     if (!migrate_use_multifd()) {
388         return;
389     }
390     multifd_send_terminate_threads(NULL);
391     for (i = 0; i < migrate_multifd_channels(); i++) {
392         MultiFDSendParams *p = &multifd_send_state->params[i];
393 
394         if (p->running) {
395             qemu_thread_join(&p->thread);
396         }
397     }
398     for (i = 0; i < migrate_multifd_channels(); i++) {
399         MultiFDSendParams *p = &multifd_send_state->params[i];
400 
401         socket_send_channel_destroy(p->c);
402         p->c = NULL;
403         qemu_mutex_destroy(&p->mutex);
404         qemu_sem_destroy(&p->sem);
405         qemu_sem_destroy(&p->sem_sync);
406         g_free(p->name);
407         p->name = NULL;
408         multifd_pages_clear(p->pages);
409         p->pages = NULL;
410         p->packet_len = 0;
411         g_free(p->packet);
412         p->packet = NULL;
413     }
414     qemu_sem_destroy(&multifd_send_state->channels_ready);
415     g_free(multifd_send_state->params);
416     multifd_send_state->params = NULL;
417     multifd_pages_clear(multifd_send_state->pages);
418     multifd_send_state->pages = NULL;
419     g_free(multifd_send_state);
420     multifd_send_state = NULL;
421 }
422 
423 void multifd_send_sync_main(QEMUFile *f)
424 {
425     int i;
426 
427     if (!migrate_use_multifd()) {
428         return;
429     }
430     if (multifd_send_state->pages->used) {
431         if (multifd_send_pages(f) < 0) {
432             error_report("%s: multifd_send_pages fail", __func__);
433             return;
434         }
435     }
436     for (i = 0; i < migrate_multifd_channels(); i++) {
437         MultiFDSendParams *p = &multifd_send_state->params[i];
438 
439         trace_multifd_send_sync_main_signal(p->id);
440 
441         qemu_mutex_lock(&p->mutex);
442 
443         if (p->quit) {
444             error_report("%s: channel %d has already quit", __func__, i);
445             qemu_mutex_unlock(&p->mutex);
446             return;
447         }
448 
449         p->packet_num = multifd_send_state->packet_num++;
450         p->flags |= MULTIFD_FLAG_SYNC;
451         p->pending_job++;
452         qemu_file_update_transfer(f, p->packet_len);
453         ram_counters.multifd_bytes += p->packet_len;
454         ram_counters.transferred += p->packet_len;
455         qemu_mutex_unlock(&p->mutex);
456         qemu_sem_post(&p->sem);
457     }
458     for (i = 0; i < migrate_multifd_channels(); i++) {
459         MultiFDSendParams *p = &multifd_send_state->params[i];
460 
461         trace_multifd_send_sync_main_wait(p->id);
462         qemu_sem_wait(&p->sem_sync);
463     }
464     trace_multifd_send_sync_main(multifd_send_state->packet_num);
465 }
466 
467 static void *multifd_send_thread(void *opaque)
468 {
469     MultiFDSendParams *p = opaque;
470     Error *local_err = NULL;
471     int ret = 0;
472     uint32_t flags = 0;
473 
474     trace_multifd_send_thread_start(p->id);
475     rcu_register_thread();
476 
477     if (multifd_send_initial_packet(p, &local_err) < 0) {
478         ret = -1;
479         goto out;
480     }
481     /* initial packet */
482     p->num_packets = 1;
483 
484     while (true) {
485         qemu_sem_wait(&p->sem);
486 
487         if (atomic_read(&multifd_send_state->exiting)) {
488             break;
489         }
490         qemu_mutex_lock(&p->mutex);
491 
492         if (p->pending_job) {
493             uint32_t used = p->pages->used;
494             uint64_t packet_num = p->packet_num;
495             flags = p->flags;
496 
497             p->next_packet_size = used * qemu_target_page_size();
498             multifd_send_fill_packet(p);
499             p->flags = 0;
500             p->num_packets++;
501             p->num_pages += used;
502             p->pages->used = 0;
503             p->pages->block = NULL;
504             qemu_mutex_unlock(&p->mutex);
505 
506             trace_multifd_send(p->id, packet_num, used, flags,
507                                p->next_packet_size);
508 
509             ret = qio_channel_write_all(p->c, (void *)p->packet,
510                                         p->packet_len, &local_err);
511             if (ret != 0) {
512                 break;
513             }
514 
515             if (used) {
516                 ret = qio_channel_writev_all(p->c, p->pages->iov,
517                                              used, &local_err);
518                 if (ret != 0) {
519                     break;
520                 }
521             }
522 
523             qemu_mutex_lock(&p->mutex);
524             p->pending_job--;
525             qemu_mutex_unlock(&p->mutex);
526 
527             if (flags & MULTIFD_FLAG_SYNC) {
528                 qemu_sem_post(&p->sem_sync);
529             }
530             qemu_sem_post(&multifd_send_state->channels_ready);
531         } else if (p->quit) {
532             qemu_mutex_unlock(&p->mutex);
533             break;
534         } else {
535             qemu_mutex_unlock(&p->mutex);
536             /* sometimes there are spurious wakeups */
537         }
538     }
539 
540 out:
541     if (local_err) {
542         trace_multifd_send_error(p->id);
543         multifd_send_terminate_threads(local_err);
544     }
545 
546     /*
547      * Error happen, I will exit, but I can't just leave, tell
548      * who pay attention to me.
549      */
550     if (ret != 0) {
551         qemu_sem_post(&p->sem_sync);
552         qemu_sem_post(&multifd_send_state->channels_ready);
553     }
554 
555     qemu_mutex_lock(&p->mutex);
556     p->running = false;
557     qemu_mutex_unlock(&p->mutex);
558 
559     rcu_unregister_thread();
560     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
561 
562     return NULL;
563 }
564 
565 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
566 {
567     MultiFDSendParams *p = opaque;
568     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
569     Error *local_err = NULL;
570 
571     trace_multifd_new_send_channel_async(p->id);
572     if (qio_task_propagate_error(task, &local_err)) {
573         migrate_set_error(migrate_get_current(), local_err);
574         /* Error happen, we need to tell who pay attention to me */
575         qemu_sem_post(&multifd_send_state->channels_ready);
576         qemu_sem_post(&p->sem_sync);
577         /*
578          * Although multifd_send_thread is not created, but main migration
579          * thread neet to judge whether it is running, so we need to mark
580          * its status.
581          */
582         p->quit = true;
583     } else {
584         p->c = QIO_CHANNEL(sioc);
585         qio_channel_set_delay(p->c, false);
586         p->running = true;
587         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
588                            QEMU_THREAD_JOINABLE);
589     }
590 }
591 
592 int multifd_save_setup(Error **errp)
593 {
594     int thread_count;
595     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
596     uint8_t i;
597 
598     if (!migrate_use_multifd()) {
599         return 0;
600     }
601     thread_count = migrate_multifd_channels();
602     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
603     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
604     multifd_send_state->pages = multifd_pages_init(page_count);
605     qemu_sem_init(&multifd_send_state->channels_ready, 0);
606     atomic_set(&multifd_send_state->exiting, 0);
607 
608     for (i = 0; i < thread_count; i++) {
609         MultiFDSendParams *p = &multifd_send_state->params[i];
610 
611         qemu_mutex_init(&p->mutex);
612         qemu_sem_init(&p->sem, 0);
613         qemu_sem_init(&p->sem_sync, 0);
614         p->quit = false;
615         p->pending_job = 0;
616         p->id = i;
617         p->pages = multifd_pages_init(page_count);
618         p->packet_len = sizeof(MultiFDPacket_t)
619                       + sizeof(uint64_t) * page_count;
620         p->packet = g_malloc0(p->packet_len);
621         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
622         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
623         p->name = g_strdup_printf("multifdsend_%d", i);
624         socket_send_channel_create(multifd_new_send_channel_async, p);
625     }
626     return 0;
627 }
628 
629 struct {
630     MultiFDRecvParams *params;
631     /* number of created threads */
632     int count;
633     /* syncs main thread and channels */
634     QemuSemaphore sem_sync;
635     /* global number of generated multifd packets */
636     uint64_t packet_num;
637 } *multifd_recv_state;
638 
639 static void multifd_recv_terminate_threads(Error *err)
640 {
641     int i;
642 
643     trace_multifd_recv_terminate_threads(err != NULL);
644 
645     if (err) {
646         MigrationState *s = migrate_get_current();
647         migrate_set_error(s, err);
648         if (s->state == MIGRATION_STATUS_SETUP ||
649             s->state == MIGRATION_STATUS_ACTIVE) {
650             migrate_set_state(&s->state, s->state,
651                               MIGRATION_STATUS_FAILED);
652         }
653     }
654 
655     for (i = 0; i < migrate_multifd_channels(); i++) {
656         MultiFDRecvParams *p = &multifd_recv_state->params[i];
657 
658         qemu_mutex_lock(&p->mutex);
659         p->quit = true;
660         /*
661          * We could arrive here for two reasons:
662          *  - normal quit, i.e. everything went fine, just finished
663          *  - error quit: We close the channels so the channel threads
664          *    finish the qio_channel_read_all_eof()
665          */
666         if (p->c) {
667             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
668         }
669         qemu_mutex_unlock(&p->mutex);
670     }
671 }
672 
673 int multifd_load_cleanup(Error **errp)
674 {
675     int i;
676     int ret = 0;
677 
678     if (!migrate_use_multifd()) {
679         return 0;
680     }
681     multifd_recv_terminate_threads(NULL);
682     for (i = 0; i < migrate_multifd_channels(); i++) {
683         MultiFDRecvParams *p = &multifd_recv_state->params[i];
684 
685         if (p->running) {
686             p->quit = true;
687             /*
688              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
689              * however try to wakeup it without harm in cleanup phase.
690              */
691             qemu_sem_post(&p->sem_sync);
692             qemu_thread_join(&p->thread);
693         }
694     }
695     for (i = 0; i < migrate_multifd_channels(); i++) {
696         MultiFDRecvParams *p = &multifd_recv_state->params[i];
697 
698         object_unref(OBJECT(p->c));
699         p->c = NULL;
700         qemu_mutex_destroy(&p->mutex);
701         qemu_sem_destroy(&p->sem_sync);
702         g_free(p->name);
703         p->name = NULL;
704         multifd_pages_clear(p->pages);
705         p->pages = NULL;
706         p->packet_len = 0;
707         g_free(p->packet);
708         p->packet = NULL;
709     }
710     qemu_sem_destroy(&multifd_recv_state->sem_sync);
711     g_free(multifd_recv_state->params);
712     multifd_recv_state->params = NULL;
713     g_free(multifd_recv_state);
714     multifd_recv_state = NULL;
715 
716     return ret;
717 }
718 
719 void multifd_recv_sync_main(void)
720 {
721     int i;
722 
723     if (!migrate_use_multifd()) {
724         return;
725     }
726     for (i = 0; i < migrate_multifd_channels(); i++) {
727         MultiFDRecvParams *p = &multifd_recv_state->params[i];
728 
729         trace_multifd_recv_sync_main_wait(p->id);
730         qemu_sem_wait(&multifd_recv_state->sem_sync);
731     }
732     for (i = 0; i < migrate_multifd_channels(); i++) {
733         MultiFDRecvParams *p = &multifd_recv_state->params[i];
734 
735         qemu_mutex_lock(&p->mutex);
736         if (multifd_recv_state->packet_num < p->packet_num) {
737             multifd_recv_state->packet_num = p->packet_num;
738         }
739         qemu_mutex_unlock(&p->mutex);
740         trace_multifd_recv_sync_main_signal(p->id);
741         qemu_sem_post(&p->sem_sync);
742     }
743     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
744 }
745 
746 static void *multifd_recv_thread(void *opaque)
747 {
748     MultiFDRecvParams *p = opaque;
749     Error *local_err = NULL;
750     int ret;
751 
752     trace_multifd_recv_thread_start(p->id);
753     rcu_register_thread();
754 
755     while (true) {
756         uint32_t used;
757         uint32_t flags;
758 
759         if (p->quit) {
760             break;
761         }
762 
763         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
764                                        p->packet_len, &local_err);
765         if (ret == 0) {   /* EOF */
766             break;
767         }
768         if (ret == -1) {   /* Error */
769             break;
770         }
771 
772         qemu_mutex_lock(&p->mutex);
773         ret = multifd_recv_unfill_packet(p, &local_err);
774         if (ret) {
775             qemu_mutex_unlock(&p->mutex);
776             break;
777         }
778 
779         used = p->pages->used;
780         flags = p->flags;
781         trace_multifd_recv(p->id, p->packet_num, used, flags,
782                            p->next_packet_size);
783         p->num_packets++;
784         p->num_pages += used;
785         qemu_mutex_unlock(&p->mutex);
786 
787         if (used) {
788             ret = qio_channel_readv_all(p->c, p->pages->iov,
789                                         used, &local_err);
790             if (ret != 0) {
791                 break;
792             }
793         }
794 
795         if (flags & MULTIFD_FLAG_SYNC) {
796             qemu_sem_post(&multifd_recv_state->sem_sync);
797             qemu_sem_wait(&p->sem_sync);
798         }
799     }
800 
801     if (local_err) {
802         multifd_recv_terminate_threads(local_err);
803     }
804     qemu_mutex_lock(&p->mutex);
805     p->running = false;
806     qemu_mutex_unlock(&p->mutex);
807 
808     rcu_unregister_thread();
809     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
810 
811     return NULL;
812 }
813 
814 int multifd_load_setup(Error **errp)
815 {
816     int thread_count;
817     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
818     uint8_t i;
819 
820     if (!migrate_use_multifd()) {
821         return 0;
822     }
823     thread_count = migrate_multifd_channels();
824     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
825     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
826     atomic_set(&multifd_recv_state->count, 0);
827     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
828 
829     for (i = 0; i < thread_count; i++) {
830         MultiFDRecvParams *p = &multifd_recv_state->params[i];
831 
832         qemu_mutex_init(&p->mutex);
833         qemu_sem_init(&p->sem_sync, 0);
834         p->quit = false;
835         p->id = i;
836         p->pages = multifd_pages_init(page_count);
837         p->packet_len = sizeof(MultiFDPacket_t)
838                       + sizeof(uint64_t) * page_count;
839         p->packet = g_malloc0(p->packet_len);
840         p->name = g_strdup_printf("multifdrecv_%d", i);
841     }
842     return 0;
843 }
844 
845 bool multifd_recv_all_channels_created(void)
846 {
847     int thread_count = migrate_multifd_channels();
848 
849     if (!migrate_use_multifd()) {
850         return true;
851     }
852 
853     return thread_count == atomic_read(&multifd_recv_state->count);
854 }
855 
856 /*
857  * Try to receive all multifd channels to get ready for the migration.
858  * - Return true and do not set @errp when correctly receving all channels;
859  * - Return false and do not set @errp when correctly receiving the current one;
860  * - Return false and set @errp when failing to receive the current channel.
861  */
862 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
863 {
864     MultiFDRecvParams *p;
865     Error *local_err = NULL;
866     int id;
867 
868     id = multifd_recv_initial_packet(ioc, &local_err);
869     if (id < 0) {
870         multifd_recv_terminate_threads(local_err);
871         error_propagate_prepend(errp, local_err,
872                                 "failed to receive packet"
873                                 " via multifd channel %d: ",
874                                 atomic_read(&multifd_recv_state->count));
875         return false;
876     }
877     trace_multifd_recv_new_channel(id);
878 
879     p = &multifd_recv_state->params[id];
880     if (p->c != NULL) {
881         error_setg(&local_err, "multifd: received id '%d' already setup'",
882                    id);
883         multifd_recv_terminate_threads(local_err);
884         error_propagate(errp, local_err);
885         return false;
886     }
887     p->c = ioc;
888     object_ref(OBJECT(ioc));
889     /* initial packet */
890     p->num_packets = 1;
891 
892     p->running = true;
893     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
894                        QEMU_THREAD_JOINABLE);
895     atomic_inc(&multifd_recv_state->count);
896     return atomic_read(&multifd_recv_state->count) ==
897            migrate_multifd_channels();
898 }
899 
900