xref: /qemu/migration/colo.c (revision 7271a819)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "sysemu/sysemu.h"
15 #include "qemu-file-channel.h"
16 #include "migration.h"
17 #include "qemu-file.h"
18 #include "savevm.h"
19 #include "migration/colo.h"
20 #include "block.h"
21 #include "io/channel-buffer.h"
22 #include "trace.h"
23 #include "qemu/error-report.h"
24 #include "migration/failover.h"
25 #include "replication.h"
26 #include "qmp-commands.h"
27 
28 static bool vmstate_loading;
29 
30 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
31 
32 bool migration_in_colo_state(void)
33 {
34     MigrationState *s = migrate_get_current();
35 
36     return (s->state == MIGRATION_STATUS_COLO);
37 }
38 
39 bool migration_incoming_in_colo_state(void)
40 {
41     MigrationIncomingState *mis = migration_incoming_get_current();
42 
43     return mis && (mis->state == MIGRATION_STATUS_COLO);
44 }
45 
46 static bool colo_runstate_is_stopped(void)
47 {
48     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
49 }
50 
51 static void secondary_vm_do_failover(void)
52 {
53     int old_state;
54     MigrationIncomingState *mis = migration_incoming_get_current();
55 
56     /* Can not do failover during the process of VM's loading VMstate, Or
57      * it will break the secondary VM.
58      */
59     if (vmstate_loading) {
60         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
61                         FAILOVER_STATUS_RELAUNCH);
62         if (old_state != FAILOVER_STATUS_ACTIVE) {
63             error_report("Unknown error while do failover for secondary VM,"
64                          "old_state: %s", FailoverStatus_str(old_state));
65         }
66         return;
67     }
68 
69     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
70                       MIGRATION_STATUS_COMPLETED);
71 
72     if (!autostart) {
73         error_report("\"-S\" qemu option will be ignored in secondary side");
74         /* recover runstate to normal migration finish state */
75         autostart = true;
76     }
77     /*
78      * Make sure COLO incoming thread not block in recv or send,
79      * If mis->from_src_file and mis->to_src_file use the same fd,
80      * The second shutdown() will return -1, we ignore this value,
81      * It is harmless.
82      */
83     if (mis->from_src_file) {
84         qemu_file_shutdown(mis->from_src_file);
85     }
86     if (mis->to_src_file) {
87         qemu_file_shutdown(mis->to_src_file);
88     }
89 
90     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
91                                    FAILOVER_STATUS_COMPLETED);
92     if (old_state != FAILOVER_STATUS_ACTIVE) {
93         error_report("Incorrect state (%s) while doing failover for "
94                      "secondary VM", FailoverStatus_str(old_state));
95         return;
96     }
97     /* Notify COLO incoming thread that failover work is finished */
98     qemu_sem_post(&mis->colo_incoming_sem);
99     /* For Secondary VM, jump to incoming co */
100     if (mis->migration_incoming_co) {
101         qemu_coroutine_enter(mis->migration_incoming_co);
102     }
103 }
104 
105 static void primary_vm_do_failover(void)
106 {
107     MigrationState *s = migrate_get_current();
108     int old_state;
109 
110     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
111                       MIGRATION_STATUS_COMPLETED);
112 
113     /*
114      * Wake up COLO thread which may blocked in recv() or send(),
115      * The s->rp_state.from_dst_file and s->to_dst_file may use the
116      * same fd, but we still shutdown the fd for twice, it is harmless.
117      */
118     if (s->to_dst_file) {
119         qemu_file_shutdown(s->to_dst_file);
120     }
121     if (s->rp_state.from_dst_file) {
122         qemu_file_shutdown(s->rp_state.from_dst_file);
123     }
124 
125     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
126                                    FAILOVER_STATUS_COMPLETED);
127     if (old_state != FAILOVER_STATUS_ACTIVE) {
128         error_report("Incorrect state (%s) while doing failover for Primary VM",
129                      FailoverStatus_str(old_state));
130         return;
131     }
132     /* Notify COLO thread that failover work is finished */
133     qemu_sem_post(&s->colo_exit_sem);
134 }
135 
136 void colo_do_failover(MigrationState *s)
137 {
138     /* Make sure VM stopped while failover happened. */
139     if (!colo_runstate_is_stopped()) {
140         vm_stop_force_state(RUN_STATE_COLO);
141     }
142 
143     if (get_colo_mode() == COLO_MODE_PRIMARY) {
144         primary_vm_do_failover();
145     } else {
146         secondary_vm_do_failover();
147     }
148 }
149 
150 void qmp_xen_set_replication(bool enable, bool primary,
151                              bool has_failover, bool failover,
152                              Error **errp)
153 {
154 #ifdef CONFIG_REPLICATION
155     ReplicationMode mode = primary ?
156                            REPLICATION_MODE_PRIMARY :
157                            REPLICATION_MODE_SECONDARY;
158 
159     if (has_failover && enable) {
160         error_setg(errp, "Parameter 'failover' is only for"
161                    " stopping replication");
162         return;
163     }
164 
165     if (enable) {
166         replication_start_all(mode, errp);
167     } else {
168         if (!has_failover) {
169             failover = NULL;
170         }
171         replication_stop_all(failover, failover ? NULL : errp);
172     }
173 #else
174     abort();
175 #endif
176 }
177 
178 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
179 {
180 #ifdef CONFIG_REPLICATION
181     Error *err = NULL;
182     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
183 
184     replication_get_error_all(&err);
185     if (err) {
186         s->error = true;
187         s->has_desc = true;
188         s->desc = g_strdup(error_get_pretty(err));
189     } else {
190         s->error = false;
191     }
192 
193     error_free(err);
194     return s;
195 #else
196     abort();
197 #endif
198 }
199 
200 void qmp_xen_colo_do_checkpoint(Error **errp)
201 {
202 #ifdef CONFIG_REPLICATION
203     replication_do_checkpoint_all(errp);
204 #else
205     abort();
206 #endif
207 }
208 
209 static void colo_send_message(QEMUFile *f, COLOMessage msg,
210                               Error **errp)
211 {
212     int ret;
213 
214     if (msg >= COLO_MESSAGE__MAX) {
215         error_setg(errp, "%s: Invalid message", __func__);
216         return;
217     }
218     qemu_put_be32(f, msg);
219     qemu_fflush(f);
220 
221     ret = qemu_file_get_error(f);
222     if (ret < 0) {
223         error_setg_errno(errp, -ret, "Can't send COLO message");
224     }
225     trace_colo_send_message(COLOMessage_str(msg));
226 }
227 
228 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
229                                     uint64_t value, Error **errp)
230 {
231     Error *local_err = NULL;
232     int ret;
233 
234     colo_send_message(f, msg, &local_err);
235     if (local_err) {
236         error_propagate(errp, local_err);
237         return;
238     }
239     qemu_put_be64(f, value);
240     qemu_fflush(f);
241 
242     ret = qemu_file_get_error(f);
243     if (ret < 0) {
244         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
245                          COLOMessage_str(msg));
246     }
247 }
248 
249 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
250 {
251     COLOMessage msg;
252     int ret;
253 
254     msg = qemu_get_be32(f);
255     ret = qemu_file_get_error(f);
256     if (ret < 0) {
257         error_setg_errno(errp, -ret, "Can't receive COLO message");
258         return msg;
259     }
260     if (msg >= COLO_MESSAGE__MAX) {
261         error_setg(errp, "%s: Invalid message", __func__);
262         return msg;
263     }
264     trace_colo_receive_message(COLOMessage_str(msg));
265     return msg;
266 }
267 
268 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
269                                        Error **errp)
270 {
271     COLOMessage msg;
272     Error *local_err = NULL;
273 
274     msg = colo_receive_message(f, &local_err);
275     if (local_err) {
276         error_propagate(errp, local_err);
277         return;
278     }
279     if (msg != expect_msg) {
280         error_setg(errp, "Unexpected COLO message %d, expected %d",
281                           msg, expect_msg);
282     }
283 }
284 
285 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
286                                            Error **errp)
287 {
288     Error *local_err = NULL;
289     uint64_t value;
290     int ret;
291 
292     colo_receive_check_message(f, expect_msg, &local_err);
293     if (local_err) {
294         error_propagate(errp, local_err);
295         return 0;
296     }
297 
298     value = qemu_get_be64(f);
299     ret = qemu_file_get_error(f);
300     if (ret < 0) {
301         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
302                          COLOMessage_str(expect_msg));
303     }
304     return value;
305 }
306 
307 static int colo_do_checkpoint_transaction(MigrationState *s,
308                                           QIOChannelBuffer *bioc,
309                                           QEMUFile *fb)
310 {
311     Error *local_err = NULL;
312     int ret = -1;
313 
314     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
315                       &local_err);
316     if (local_err) {
317         goto out;
318     }
319 
320     colo_receive_check_message(s->rp_state.from_dst_file,
321                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
322     if (local_err) {
323         goto out;
324     }
325     /* Reset channel-buffer directly */
326     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
327     bioc->usage = 0;
328 
329     qemu_mutex_lock_iothread();
330     if (failover_get_state() != FAILOVER_STATUS_NONE) {
331         qemu_mutex_unlock_iothread();
332         goto out;
333     }
334     vm_stop_force_state(RUN_STATE_COLO);
335     qemu_mutex_unlock_iothread();
336     trace_colo_vm_state_change("run", "stop");
337     /*
338      * Failover request bh could be called after vm_stop_force_state(),
339      * So we need check failover_request_is_active() again.
340      */
341     if (failover_get_state() != FAILOVER_STATUS_NONE) {
342         goto out;
343     }
344 
345     /* Disable block migration */
346     migrate_set_block_enabled(false, &local_err);
347     qemu_savevm_state_header(fb);
348     qemu_savevm_state_setup(fb);
349     qemu_mutex_lock_iothread();
350     qemu_savevm_state_complete_precopy(fb, false, false);
351     qemu_mutex_unlock_iothread();
352 
353     qemu_fflush(fb);
354 
355     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
356     if (local_err) {
357         goto out;
358     }
359     /*
360      * We need the size of the VMstate data in Secondary side,
361      * With which we can decide how much data should be read.
362      */
363     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
364                             bioc->usage, &local_err);
365     if (local_err) {
366         goto out;
367     }
368 
369     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
370     qemu_fflush(s->to_dst_file);
371     ret = qemu_file_get_error(s->to_dst_file);
372     if (ret < 0) {
373         goto out;
374     }
375 
376     colo_receive_check_message(s->rp_state.from_dst_file,
377                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
378     if (local_err) {
379         goto out;
380     }
381 
382     colo_receive_check_message(s->rp_state.from_dst_file,
383                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
384     if (local_err) {
385         goto out;
386     }
387 
388     ret = 0;
389 
390     qemu_mutex_lock_iothread();
391     vm_start();
392     qemu_mutex_unlock_iothread();
393     trace_colo_vm_state_change("stop", "run");
394 
395 out:
396     if (local_err) {
397         error_report_err(local_err);
398     }
399     return ret;
400 }
401 
402 static void colo_process_checkpoint(MigrationState *s)
403 {
404     QIOChannelBuffer *bioc;
405     QEMUFile *fb = NULL;
406     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
407     Error *local_err = NULL;
408     int ret;
409 
410     failover_init_state();
411 
412     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
413     if (!s->rp_state.from_dst_file) {
414         error_report("Open QEMUFile from_dst_file failed");
415         goto out;
416     }
417 
418     /*
419      * Wait for Secondary finish loading VM states and enter COLO
420      * restore.
421      */
422     colo_receive_check_message(s->rp_state.from_dst_file,
423                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
424     if (local_err) {
425         goto out;
426     }
427     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
428     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
429     object_unref(OBJECT(bioc));
430 
431     qemu_mutex_lock_iothread();
432     vm_start();
433     qemu_mutex_unlock_iothread();
434     trace_colo_vm_state_change("stop", "run");
435 
436     timer_mod(s->colo_delay_timer,
437             current_time + s->parameters.x_checkpoint_delay);
438 
439     while (s->state == MIGRATION_STATUS_COLO) {
440         if (failover_get_state() != FAILOVER_STATUS_NONE) {
441             error_report("failover request");
442             goto out;
443         }
444 
445         qemu_sem_wait(&s->colo_checkpoint_sem);
446 
447         ret = colo_do_checkpoint_transaction(s, bioc, fb);
448         if (ret < 0) {
449             goto out;
450         }
451     }
452 
453 out:
454     /* Throw the unreported error message after exited from loop */
455     if (local_err) {
456         error_report_err(local_err);
457     }
458 
459     if (fb) {
460         qemu_fclose(fb);
461     }
462 
463     timer_del(s->colo_delay_timer);
464 
465     /* Hope this not to be too long to wait here */
466     qemu_sem_wait(&s->colo_exit_sem);
467     qemu_sem_destroy(&s->colo_exit_sem);
468     /*
469      * Must be called after failover BH is completed,
470      * Or the failover BH may shutdown the wrong fd that
471      * re-used by other threads after we release here.
472      */
473     if (s->rp_state.from_dst_file) {
474         qemu_fclose(s->rp_state.from_dst_file);
475     }
476 }
477 
478 void colo_checkpoint_notify(void *opaque)
479 {
480     MigrationState *s = opaque;
481     int64_t next_notify_time;
482 
483     qemu_sem_post(&s->colo_checkpoint_sem);
484     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
485     next_notify_time = s->colo_checkpoint_time +
486                     s->parameters.x_checkpoint_delay;
487     timer_mod(s->colo_delay_timer, next_notify_time);
488 }
489 
490 void migrate_start_colo_process(MigrationState *s)
491 {
492     qemu_mutex_unlock_iothread();
493     qemu_sem_init(&s->colo_checkpoint_sem, 0);
494     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
495                                 colo_checkpoint_notify, s);
496 
497     qemu_sem_init(&s->colo_exit_sem, 0);
498     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
499                       MIGRATION_STATUS_COLO);
500     colo_process_checkpoint(s);
501     qemu_mutex_lock_iothread();
502 }
503 
504 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
505                                      Error **errp)
506 {
507     COLOMessage msg;
508     Error *local_err = NULL;
509 
510     msg = colo_receive_message(f, &local_err);
511     if (local_err) {
512         error_propagate(errp, local_err);
513         return;
514     }
515 
516     switch (msg) {
517     case COLO_MESSAGE_CHECKPOINT_REQUEST:
518         *checkpoint_request = 1;
519         break;
520     default:
521         *checkpoint_request = 0;
522         error_setg(errp, "Got unknown COLO message: %d", msg);
523         break;
524     }
525 }
526 
527 void *colo_process_incoming_thread(void *opaque)
528 {
529     MigrationIncomingState *mis = opaque;
530     QEMUFile *fb = NULL;
531     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
532     uint64_t total_size;
533     uint64_t value;
534     Error *local_err = NULL;
535 
536     qemu_sem_init(&mis->colo_incoming_sem, 0);
537 
538     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
539                       MIGRATION_STATUS_COLO);
540 
541     failover_init_state();
542 
543     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
544     if (!mis->to_src_file) {
545         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
546         goto out;
547     }
548     /*
549      * Note: the communication between Primary side and Secondary side
550      * should be sequential, we set the fd to unblocked in migration incoming
551      * coroutine, and here we are in the COLO incoming thread, so it is ok to
552      * set the fd back to blocked.
553      */
554     qemu_file_set_blocking(mis->from_src_file, true);
555 
556     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
557     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
558     object_unref(OBJECT(bioc));
559 
560     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
561                       &local_err);
562     if (local_err) {
563         goto out;
564     }
565 
566     while (mis->state == MIGRATION_STATUS_COLO) {
567         int request = 0;
568 
569         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
570         if (local_err) {
571             goto out;
572         }
573         assert(request);
574         if (failover_get_state() != FAILOVER_STATUS_NONE) {
575             error_report("failover request");
576             goto out;
577         }
578 
579         /* FIXME: This is unnecessary for periodic checkpoint mode */
580         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
581                      &local_err);
582         if (local_err) {
583             goto out;
584         }
585 
586         colo_receive_check_message(mis->from_src_file,
587                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
588         if (local_err) {
589             goto out;
590         }
591 
592         value = colo_receive_message_value(mis->from_src_file,
593                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
594         if (local_err) {
595             goto out;
596         }
597 
598         /*
599          * Read VM device state data into channel buffer,
600          * It's better to re-use the memory allocated.
601          * Here we need to handle the channel buffer directly.
602          */
603         if (value > bioc->capacity) {
604             bioc->capacity = value;
605             bioc->data = g_realloc(bioc->data, bioc->capacity);
606         }
607         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
608         if (total_size != value) {
609             error_report("Got %" PRIu64 " VMState data, less than expected"
610                         " %" PRIu64, total_size, value);
611             goto out;
612         }
613         bioc->usage = total_size;
614         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
615 
616         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
617                      &local_err);
618         if (local_err) {
619             goto out;
620         }
621 
622         qemu_mutex_lock_iothread();
623         qemu_system_reset(SHUTDOWN_CAUSE_NONE);
624         vmstate_loading = true;
625         if (qemu_loadvm_state(fb) < 0) {
626             error_report("COLO: loadvm failed");
627             qemu_mutex_unlock_iothread();
628             goto out;
629         }
630 
631         vmstate_loading = false;
632         qemu_mutex_unlock_iothread();
633 
634         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
635             failover_set_state(FAILOVER_STATUS_RELAUNCH,
636                             FAILOVER_STATUS_NONE);
637             failover_request_active(NULL);
638             goto out;
639         }
640 
641         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
642                      &local_err);
643         if (local_err) {
644             goto out;
645         }
646     }
647 
648 out:
649     vmstate_loading = false;
650     /* Throw the unreported error message after exited from loop */
651     if (local_err) {
652         error_report_err(local_err);
653     }
654 
655     if (fb) {
656         qemu_fclose(fb);
657     }
658 
659     /* Hope this not to be too long to loop here */
660     qemu_sem_wait(&mis->colo_incoming_sem);
661     qemu_sem_destroy(&mis->colo_incoming_sem);
662     /* Must be called after failover BH is completed */
663     if (mis->to_src_file) {
664         qemu_fclose(mis->to_src_file);
665     }
666     migration_incoming_exit_colo();
667 
668     return NULL;
669 }
670