xref: /qemu/migration/colo.c (revision 76eb88b1)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "sysemu/sysemu.h"
15 #include "qapi/error.h"
16 #include "qapi/qapi-commands-migration.h"
17 #include "migration.h"
18 #include "qemu-file.h"
19 #include "savevm.h"
20 #include "migration/colo.h"
21 #include "block.h"
22 #include "io/channel-buffer.h"
23 #include "trace.h"
24 #include "qemu/error-report.h"
25 #include "qemu/main-loop.h"
26 #include "qemu/rcu.h"
27 #include "migration/failover.h"
28 #include "migration/ram.h"
29 #ifdef CONFIG_REPLICATION
30 #include "block/replication.h"
31 #endif
32 #include "net/colo-compare.h"
33 #include "net/colo.h"
34 #include "block/block.h"
35 #include "qapi/qapi-events-migration.h"
36 #include "qapi/qmp/qerror.h"
37 #include "sysemu/cpus.h"
38 #include "sysemu/runstate.h"
39 #include "net/filter.h"
40 
41 static bool vmstate_loading;
42 static Notifier packets_compare_notifier;
43 
44 /* User need to know colo mode after COLO failover */
45 static COLOMode last_colo_mode;
46 
47 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
48 
49 bool migration_in_colo_state(void)
50 {
51     MigrationState *s = migrate_get_current();
52 
53     return (s->state == MIGRATION_STATUS_COLO);
54 }
55 
56 bool migration_incoming_in_colo_state(void)
57 {
58     MigrationIncomingState *mis = migration_incoming_get_current();
59 
60     return mis && (mis->state == MIGRATION_STATUS_COLO);
61 }
62 
63 static bool colo_runstate_is_stopped(void)
64 {
65     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
66 }
67 
68 static void secondary_vm_do_failover(void)
69 {
70 /* COLO needs enable block-replication */
71 #ifdef CONFIG_REPLICATION
72     int old_state;
73     MigrationIncomingState *mis = migration_incoming_get_current();
74     Error *local_err = NULL;
75 
76     /* Can not do failover during the process of VM's loading VMstate, Or
77      * it will break the secondary VM.
78      */
79     if (vmstate_loading) {
80         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
81                         FAILOVER_STATUS_RELAUNCH);
82         if (old_state != FAILOVER_STATUS_ACTIVE) {
83             error_report("Unknown error while do failover for secondary VM,"
84                          "old_state: %s", FailoverStatus_str(old_state));
85         }
86         return;
87     }
88 
89     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
90                       MIGRATION_STATUS_COMPLETED);
91 
92     replication_stop_all(true, &local_err);
93     if (local_err) {
94         error_report_err(local_err);
95         local_err = NULL;
96     }
97 
98     /* Notify all filters of all NIC to do checkpoint */
99     colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
100     if (local_err) {
101         error_report_err(local_err);
102     }
103 
104     if (!autostart) {
105         error_report("\"-S\" qemu option will be ignored in secondary side");
106         /* recover runstate to normal migration finish state */
107         autostart = true;
108     }
109     /*
110      * Make sure COLO incoming thread not block in recv or send,
111      * If mis->from_src_file and mis->to_src_file use the same fd,
112      * The second shutdown() will return -1, we ignore this value,
113      * It is harmless.
114      */
115     if (mis->from_src_file) {
116         qemu_file_shutdown(mis->from_src_file);
117     }
118     if (mis->to_src_file) {
119         qemu_file_shutdown(mis->to_src_file);
120     }
121 
122     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
123                                    FAILOVER_STATUS_COMPLETED);
124     if (old_state != FAILOVER_STATUS_ACTIVE) {
125         error_report("Incorrect state (%s) while doing failover for "
126                      "secondary VM", FailoverStatus_str(old_state));
127         return;
128     }
129     /* Notify COLO incoming thread that failover work is finished */
130     qemu_sem_post(&mis->colo_incoming_sem);
131 
132     /* For Secondary VM, jump to incoming co */
133     if (mis->migration_incoming_co) {
134         qemu_coroutine_enter(mis->migration_incoming_co);
135     }
136 #else
137     abort();
138 #endif
139 }
140 
141 static void primary_vm_do_failover(void)
142 {
143 #ifdef CONFIG_REPLICATION
144     MigrationState *s = migrate_get_current();
145     int old_state;
146     Error *local_err = NULL;
147 
148     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
149                       MIGRATION_STATUS_COMPLETED);
150     /*
151      * kick COLO thread which might wait at
152      * qemu_sem_wait(&s->colo_checkpoint_sem).
153      */
154     colo_checkpoint_notify(s);
155 
156     /*
157      * Wake up COLO thread which may blocked in recv() or send(),
158      * The s->rp_state.from_dst_file and s->to_dst_file may use the
159      * same fd, but we still shutdown the fd for twice, it is harmless.
160      */
161     if (s->to_dst_file) {
162         qemu_file_shutdown(s->to_dst_file);
163     }
164     if (s->rp_state.from_dst_file) {
165         qemu_file_shutdown(s->rp_state.from_dst_file);
166     }
167 
168     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
169                                    FAILOVER_STATUS_COMPLETED);
170     if (old_state != FAILOVER_STATUS_ACTIVE) {
171         error_report("Incorrect state (%s) while doing failover for Primary VM",
172                      FailoverStatus_str(old_state));
173         return;
174     }
175 
176     replication_stop_all(true, &local_err);
177     if (local_err) {
178         error_report_err(local_err);
179         local_err = NULL;
180     }
181 
182     /* Notify COLO thread that failover work is finished */
183     qemu_sem_post(&s->colo_exit_sem);
184 #else
185     abort();
186 #endif
187 }
188 
189 COLOMode get_colo_mode(void)
190 {
191     if (migration_in_colo_state()) {
192         return COLO_MODE_PRIMARY;
193     } else if (migration_incoming_in_colo_state()) {
194         return COLO_MODE_SECONDARY;
195     } else {
196         return COLO_MODE_NONE;
197     }
198 }
199 
200 void colo_do_failover(void)
201 {
202     /* Make sure VM stopped while failover happened. */
203     if (!colo_runstate_is_stopped()) {
204         vm_stop_force_state(RUN_STATE_COLO);
205     }
206 
207     switch (last_colo_mode = get_colo_mode()) {
208     case COLO_MODE_PRIMARY:
209         primary_vm_do_failover();
210         break;
211     case COLO_MODE_SECONDARY:
212         secondary_vm_do_failover();
213         break;
214     default:
215         error_report("colo_do_failover failed because the colo mode"
216                      " could not be obtained");
217     }
218 }
219 
220 #ifdef CONFIG_REPLICATION
221 void qmp_xen_set_replication(bool enable, bool primary,
222                              bool has_failover, bool failover,
223                              Error **errp)
224 {
225     ReplicationMode mode = primary ?
226                            REPLICATION_MODE_PRIMARY :
227                            REPLICATION_MODE_SECONDARY;
228 
229     if (has_failover && enable) {
230         error_setg(errp, "Parameter 'failover' is only for"
231                    " stopping replication");
232         return;
233     }
234 
235     if (enable) {
236         replication_start_all(mode, errp);
237     } else {
238         if (!has_failover) {
239             failover = NULL;
240         }
241         replication_stop_all(failover, failover ? NULL : errp);
242     }
243 }
244 
245 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
246 {
247     Error *err = NULL;
248     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
249 
250     replication_get_error_all(&err);
251     if (err) {
252         s->error = true;
253         s->has_desc = true;
254         s->desc = g_strdup(error_get_pretty(err));
255     } else {
256         s->error = false;
257     }
258 
259     error_free(err);
260     return s;
261 }
262 
263 void qmp_xen_colo_do_checkpoint(Error **errp)
264 {
265     Error *err = NULL;
266 
267     replication_do_checkpoint_all(&err);
268     if (err) {
269         error_propagate(errp, err);
270         return;
271     }
272     /* Notify all filters of all NIC to do checkpoint */
273     colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
274 }
275 #endif
276 
277 COLOStatus *qmp_query_colo_status(Error **errp)
278 {
279     COLOStatus *s = g_new0(COLOStatus, 1);
280 
281     s->mode = get_colo_mode();
282     s->last_mode = last_colo_mode;
283 
284     switch (failover_get_state()) {
285     case FAILOVER_STATUS_NONE:
286         s->reason = COLO_EXIT_REASON_NONE;
287         break;
288     case FAILOVER_STATUS_COMPLETED:
289         s->reason = COLO_EXIT_REASON_REQUEST;
290         break;
291     default:
292         if (migration_in_colo_state()) {
293             s->reason = COLO_EXIT_REASON_PROCESSING;
294         } else {
295             s->reason = COLO_EXIT_REASON_ERROR;
296         }
297     }
298 
299     return s;
300 }
301 
302 static void colo_send_message(QEMUFile *f, COLOMessage msg,
303                               Error **errp)
304 {
305     int ret;
306 
307     if (msg >= COLO_MESSAGE__MAX) {
308         error_setg(errp, "%s: Invalid message", __func__);
309         return;
310     }
311     qemu_put_be32(f, msg);
312     qemu_fflush(f);
313 
314     ret = qemu_file_get_error(f);
315     if (ret < 0) {
316         error_setg_errno(errp, -ret, "Can't send COLO message");
317     }
318     trace_colo_send_message(COLOMessage_str(msg));
319 }
320 
321 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
322                                     uint64_t value, Error **errp)
323 {
324     Error *local_err = NULL;
325     int ret;
326 
327     colo_send_message(f, msg, &local_err);
328     if (local_err) {
329         error_propagate(errp, local_err);
330         return;
331     }
332     qemu_put_be64(f, value);
333     qemu_fflush(f);
334 
335     ret = qemu_file_get_error(f);
336     if (ret < 0) {
337         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
338                          COLOMessage_str(msg));
339     }
340 }
341 
342 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
343 {
344     COLOMessage msg;
345     int ret;
346 
347     msg = qemu_get_be32(f);
348     ret = qemu_file_get_error(f);
349     if (ret < 0) {
350         error_setg_errno(errp, -ret, "Can't receive COLO message");
351         return msg;
352     }
353     if (msg >= COLO_MESSAGE__MAX) {
354         error_setg(errp, "%s: Invalid message", __func__);
355         return msg;
356     }
357     trace_colo_receive_message(COLOMessage_str(msg));
358     return msg;
359 }
360 
361 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
362                                        Error **errp)
363 {
364     COLOMessage msg;
365     Error *local_err = NULL;
366 
367     msg = colo_receive_message(f, &local_err);
368     if (local_err) {
369         error_propagate(errp, local_err);
370         return;
371     }
372     if (msg != expect_msg) {
373         error_setg(errp, "Unexpected COLO message %d, expected %d",
374                           msg, expect_msg);
375     }
376 }
377 
378 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
379                                            Error **errp)
380 {
381     Error *local_err = NULL;
382     uint64_t value;
383     int ret;
384 
385     colo_receive_check_message(f, expect_msg, &local_err);
386     if (local_err) {
387         error_propagate(errp, local_err);
388         return 0;
389     }
390 
391     value = qemu_get_be64(f);
392     ret = qemu_file_get_error(f);
393     if (ret < 0) {
394         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
395                          COLOMessage_str(expect_msg));
396     }
397     return value;
398 }
399 
400 static int colo_do_checkpoint_transaction(MigrationState *s,
401                                           QIOChannelBuffer *bioc,
402                                           QEMUFile *fb)
403 {
404     Error *local_err = NULL;
405     int ret = -1;
406 
407     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
408                       &local_err);
409     if (local_err) {
410         goto out;
411     }
412 
413     colo_receive_check_message(s->rp_state.from_dst_file,
414                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
415     if (local_err) {
416         goto out;
417     }
418     /* Reset channel-buffer directly */
419     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
420     bioc->usage = 0;
421 
422     qemu_mutex_lock_iothread();
423     if (failover_get_state() != FAILOVER_STATUS_NONE) {
424         qemu_mutex_unlock_iothread();
425         goto out;
426     }
427     vm_stop_force_state(RUN_STATE_COLO);
428     qemu_mutex_unlock_iothread();
429     trace_colo_vm_state_change("run", "stop");
430     /*
431      * Failover request bh could be called after vm_stop_force_state(),
432      * So we need check failover_request_is_active() again.
433      */
434     if (failover_get_state() != FAILOVER_STATUS_NONE) {
435         goto out;
436     }
437     qemu_mutex_lock_iothread();
438 
439 #ifdef CONFIG_REPLICATION
440     replication_do_checkpoint_all(&local_err);
441     if (local_err) {
442         qemu_mutex_unlock_iothread();
443         goto out;
444     }
445 #else
446         abort();
447 #endif
448 
449     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
450     if (local_err) {
451         qemu_mutex_unlock_iothread();
452         goto out;
453     }
454     /* Note: device state is saved into buffer */
455     ret = qemu_save_device_state(fb);
456 
457     qemu_mutex_unlock_iothread();
458     if (ret < 0) {
459         goto out;
460     }
461 
462     if (migrate_auto_converge()) {
463         mig_throttle_counter_reset();
464     }
465     /*
466      * Only save VM's live state, which not including device state.
467      * TODO: We may need a timeout mechanism to prevent COLO process
468      * to be blocked here.
469      */
470     qemu_savevm_live_state(s->to_dst_file);
471 
472     qemu_fflush(fb);
473 
474     /*
475      * We need the size of the VMstate data in Secondary side,
476      * With which we can decide how much data should be read.
477      */
478     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
479                             bioc->usage, &local_err);
480     if (local_err) {
481         goto out;
482     }
483 
484     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
485     qemu_fflush(s->to_dst_file);
486     ret = qemu_file_get_error(s->to_dst_file);
487     if (ret < 0) {
488         goto out;
489     }
490 
491     colo_receive_check_message(s->rp_state.from_dst_file,
492                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
493     if (local_err) {
494         goto out;
495     }
496 
497     qemu_event_reset(&s->colo_checkpoint_event);
498     colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
499     if (local_err) {
500         goto out;
501     }
502 
503     colo_receive_check_message(s->rp_state.from_dst_file,
504                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
505     if (local_err) {
506         goto out;
507     }
508 
509     ret = 0;
510 
511     qemu_mutex_lock_iothread();
512     vm_start();
513     qemu_mutex_unlock_iothread();
514     trace_colo_vm_state_change("stop", "run");
515 
516 out:
517     if (local_err) {
518         error_report_err(local_err);
519     }
520     return ret;
521 }
522 
523 static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
524 {
525     colo_checkpoint_notify(data);
526 }
527 
528 static void colo_process_checkpoint(MigrationState *s)
529 {
530     QIOChannelBuffer *bioc;
531     QEMUFile *fb = NULL;
532     Error *local_err = NULL;
533     int ret;
534 
535     if (get_colo_mode() != COLO_MODE_PRIMARY) {
536         error_report("COLO mode must be COLO_MODE_PRIMARY");
537         return;
538     }
539 
540     failover_init_state();
541 
542     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
543     if (!s->rp_state.from_dst_file) {
544         error_report("Open QEMUFile from_dst_file failed");
545         goto out;
546     }
547 
548     packets_compare_notifier.notify = colo_compare_notify_checkpoint;
549     colo_compare_register_notifier(&packets_compare_notifier);
550 
551     /*
552      * Wait for Secondary finish loading VM states and enter COLO
553      * restore.
554      */
555     colo_receive_check_message(s->rp_state.from_dst_file,
556                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
557     if (local_err) {
558         goto out;
559     }
560     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
561     fb = qemu_file_new_output(QIO_CHANNEL(bioc));
562     object_unref(OBJECT(bioc));
563 
564     qemu_mutex_lock_iothread();
565 #ifdef CONFIG_REPLICATION
566     replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
567     if (local_err) {
568         qemu_mutex_unlock_iothread();
569         goto out;
570     }
571 #else
572         abort();
573 #endif
574 
575     vm_start();
576     qemu_mutex_unlock_iothread();
577     trace_colo_vm_state_change("stop", "run");
578 
579     timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
580               s->parameters.x_checkpoint_delay);
581 
582     while (s->state == MIGRATION_STATUS_COLO) {
583         if (failover_get_state() != FAILOVER_STATUS_NONE) {
584             error_report("failover request");
585             goto out;
586         }
587 
588         qemu_event_wait(&s->colo_checkpoint_event);
589 
590         if (s->state != MIGRATION_STATUS_COLO) {
591             goto out;
592         }
593         ret = colo_do_checkpoint_transaction(s, bioc, fb);
594         if (ret < 0) {
595             goto out;
596         }
597     }
598 
599 out:
600     /* Throw the unreported error message after exited from loop */
601     if (local_err) {
602         error_report_err(local_err);
603     }
604 
605     if (fb) {
606         qemu_fclose(fb);
607     }
608 
609     /*
610      * There are only two reasons we can get here, some error happened
611      * or the user triggered failover.
612      */
613     switch (failover_get_state()) {
614     case FAILOVER_STATUS_COMPLETED:
615         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
616                                   COLO_EXIT_REASON_REQUEST);
617         break;
618     default:
619         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
620                                   COLO_EXIT_REASON_ERROR);
621     }
622 
623     /* Hope this not to be too long to wait here */
624     qemu_sem_wait(&s->colo_exit_sem);
625     qemu_sem_destroy(&s->colo_exit_sem);
626 
627     /*
628      * It is safe to unregister notifier after failover finished.
629      * Besides, colo_delay_timer and colo_checkpoint_sem can't be
630      * released before unregister notifier, or there will be use-after-free
631      * error.
632      */
633     colo_compare_unregister_notifier(&packets_compare_notifier);
634     timer_free(s->colo_delay_timer);
635     qemu_event_destroy(&s->colo_checkpoint_event);
636 
637     /*
638      * Must be called after failover BH is completed,
639      * Or the failover BH may shutdown the wrong fd that
640      * re-used by other threads after we release here.
641      */
642     if (s->rp_state.from_dst_file) {
643         qemu_fclose(s->rp_state.from_dst_file);
644         s->rp_state.from_dst_file = NULL;
645     }
646 }
647 
648 void colo_checkpoint_notify(void *opaque)
649 {
650     MigrationState *s = opaque;
651     int64_t next_notify_time;
652 
653     qemu_event_set(&s->colo_checkpoint_event);
654     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
655     next_notify_time = s->colo_checkpoint_time +
656                     s->parameters.x_checkpoint_delay;
657     timer_mod(s->colo_delay_timer, next_notify_time);
658 }
659 
660 void migrate_start_colo_process(MigrationState *s)
661 {
662     qemu_mutex_unlock_iothread();
663     qemu_event_init(&s->colo_checkpoint_event, false);
664     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
665                                 colo_checkpoint_notify, s);
666 
667     qemu_sem_init(&s->colo_exit_sem, 0);
668     colo_process_checkpoint(s);
669     qemu_mutex_lock_iothread();
670 }
671 
672 static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
673                       QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
674 {
675     uint64_t total_size;
676     uint64_t value;
677     Error *local_err = NULL;
678     int ret;
679 
680     qemu_mutex_lock_iothread();
681     vm_stop_force_state(RUN_STATE_COLO);
682     qemu_mutex_unlock_iothread();
683     trace_colo_vm_state_change("run", "stop");
684 
685     /* FIXME: This is unnecessary for periodic checkpoint mode */
686     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
687                  &local_err);
688     if (local_err) {
689         error_propagate(errp, local_err);
690         return;
691     }
692 
693     colo_receive_check_message(mis->from_src_file,
694                        COLO_MESSAGE_VMSTATE_SEND, &local_err);
695     if (local_err) {
696         error_propagate(errp, local_err);
697         return;
698     }
699 
700     qemu_mutex_lock_iothread();
701     cpu_synchronize_all_states();
702     ret = qemu_loadvm_state_main(mis->from_src_file, mis);
703     qemu_mutex_unlock_iothread();
704 
705     if (ret < 0) {
706         error_setg(errp, "Load VM's live state (ram) error");
707         return;
708     }
709 
710     value = colo_receive_message_value(mis->from_src_file,
711                              COLO_MESSAGE_VMSTATE_SIZE, &local_err);
712     if (local_err) {
713         error_propagate(errp, local_err);
714         return;
715     }
716 
717     /*
718      * Read VM device state data into channel buffer,
719      * It's better to re-use the memory allocated.
720      * Here we need to handle the channel buffer directly.
721      */
722     if (value > bioc->capacity) {
723         bioc->capacity = value;
724         bioc->data = g_realloc(bioc->data, bioc->capacity);
725     }
726     total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
727     if (total_size != value) {
728         error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
729                     " %" PRIu64, total_size, value);
730         return;
731     }
732     bioc->usage = total_size;
733     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
734 
735     colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
736                  &local_err);
737     if (local_err) {
738         error_propagate(errp, local_err);
739         return;
740     }
741 
742     qemu_mutex_lock_iothread();
743     vmstate_loading = true;
744     colo_flush_ram_cache();
745     ret = qemu_load_device_state(fb);
746     if (ret < 0) {
747         error_setg(errp, "COLO: load device state failed");
748         vmstate_loading = false;
749         qemu_mutex_unlock_iothread();
750         return;
751     }
752 
753 #ifdef CONFIG_REPLICATION
754     replication_get_error_all(&local_err);
755     if (local_err) {
756         error_propagate(errp, local_err);
757         vmstate_loading = false;
758         qemu_mutex_unlock_iothread();
759         return;
760     }
761 
762     /* discard colo disk buffer */
763     replication_do_checkpoint_all(&local_err);
764     if (local_err) {
765         error_propagate(errp, local_err);
766         vmstate_loading = false;
767         qemu_mutex_unlock_iothread();
768         return;
769     }
770 #else
771     abort();
772 #endif
773     /* Notify all filters of all NIC to do checkpoint */
774     colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
775 
776     if (local_err) {
777         error_propagate(errp, local_err);
778         vmstate_loading = false;
779         qemu_mutex_unlock_iothread();
780         return;
781     }
782 
783     vmstate_loading = false;
784     vm_start();
785     qemu_mutex_unlock_iothread();
786     trace_colo_vm_state_change("stop", "run");
787 
788     if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
789         return;
790     }
791 
792     colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
793                  &local_err);
794     error_propagate(errp, local_err);
795 }
796 
797 static void colo_wait_handle_message(MigrationIncomingState *mis,
798                 QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
799 {
800     COLOMessage msg;
801     Error *local_err = NULL;
802 
803     msg = colo_receive_message(mis->from_src_file, &local_err);
804     if (local_err) {
805         error_propagate(errp, local_err);
806         return;
807     }
808 
809     switch (msg) {
810     case COLO_MESSAGE_CHECKPOINT_REQUEST:
811         colo_incoming_process_checkpoint(mis, fb, bioc, errp);
812         break;
813     default:
814         error_setg(errp, "Got unknown COLO message: %d", msg);
815         break;
816     }
817 }
818 
819 void colo_shutdown(void)
820 {
821     MigrationIncomingState *mis = NULL;
822     MigrationState *s = NULL;
823 
824     switch (get_colo_mode()) {
825     case COLO_MODE_PRIMARY:
826         s = migrate_get_current();
827         qemu_event_set(&s->colo_checkpoint_event);
828         qemu_sem_post(&s->colo_exit_sem);
829         break;
830     case COLO_MODE_SECONDARY:
831         mis = migration_incoming_get_current();
832         qemu_sem_post(&mis->colo_incoming_sem);
833         break;
834     default:
835         break;
836     }
837 }
838 
839 void *colo_process_incoming_thread(void *opaque)
840 {
841     MigrationIncomingState *mis = opaque;
842     QEMUFile *fb = NULL;
843     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
844     Error *local_err = NULL;
845 
846     rcu_register_thread();
847     qemu_sem_init(&mis->colo_incoming_sem, 0);
848 
849     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
850                       MIGRATION_STATUS_COLO);
851 
852     if (get_colo_mode() != COLO_MODE_SECONDARY) {
853         error_report("COLO mode must be COLO_MODE_SECONDARY");
854         return NULL;
855     }
856 
857     failover_init_state();
858 
859     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
860     if (!mis->to_src_file) {
861         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
862         goto out;
863     }
864     /*
865      * Note: the communication between Primary side and Secondary side
866      * should be sequential, we set the fd to unblocked in migration incoming
867      * coroutine, and here we are in the COLO incoming thread, so it is ok to
868      * set the fd back to blocked.
869      */
870     qemu_file_set_blocking(mis->from_src_file, true);
871 
872     colo_incoming_start_dirty_log();
873 
874     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
875     fb = qemu_file_new_input(QIO_CHANNEL(bioc));
876     object_unref(OBJECT(bioc));
877 
878     qemu_mutex_lock_iothread();
879 #ifdef CONFIG_REPLICATION
880     replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
881     if (local_err) {
882         qemu_mutex_unlock_iothread();
883         goto out;
884     }
885 #else
886         abort();
887 #endif
888     vm_start();
889     qemu_mutex_unlock_iothread();
890     trace_colo_vm_state_change("stop", "run");
891 
892     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
893                       &local_err);
894     if (local_err) {
895         goto out;
896     }
897 
898     while (mis->state == MIGRATION_STATUS_COLO) {
899         colo_wait_handle_message(mis, fb, bioc, &local_err);
900         if (local_err) {
901             error_report_err(local_err);
902             break;
903         }
904 
905         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
906             failover_set_state(FAILOVER_STATUS_RELAUNCH,
907                             FAILOVER_STATUS_NONE);
908             failover_request_active(NULL);
909             break;
910         }
911 
912         if (failover_get_state() != FAILOVER_STATUS_NONE) {
913             error_report("failover request");
914             break;
915         }
916     }
917 
918 out:
919     /*
920      * There are only two reasons we can get here, some error happened
921      * or the user triggered failover.
922      */
923     switch (failover_get_state()) {
924     case FAILOVER_STATUS_COMPLETED:
925         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
926                                   COLO_EXIT_REASON_REQUEST);
927         break;
928     default:
929         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
930                                   COLO_EXIT_REASON_ERROR);
931     }
932 
933     if (fb) {
934         qemu_fclose(fb);
935     }
936 
937     /* Hope this not to be too long to loop here */
938     qemu_sem_wait(&mis->colo_incoming_sem);
939     qemu_sem_destroy(&mis->colo_incoming_sem);
940 
941     rcu_unregister_thread();
942     return NULL;
943 }
944