xref: /qemu/migration/colo.c (revision 33848cee)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 
23 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
24 
25 bool colo_supported(void)
26 {
27     return true;
28 }
29 
30 bool migration_in_colo_state(void)
31 {
32     MigrationState *s = migrate_get_current();
33 
34     return (s->state == MIGRATION_STATUS_COLO);
35 }
36 
37 bool migration_incoming_in_colo_state(void)
38 {
39     MigrationIncomingState *mis = migration_incoming_get_current();
40 
41     return mis && (mis->state == MIGRATION_STATUS_COLO);
42 }
43 
44 static bool colo_runstate_is_stopped(void)
45 {
46     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
47 }
48 
49 static void secondary_vm_do_failover(void)
50 {
51     int old_state;
52     MigrationIncomingState *mis = migration_incoming_get_current();
53 
54     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
55                       MIGRATION_STATUS_COMPLETED);
56 
57     if (!autostart) {
58         error_report("\"-S\" qemu option will be ignored in secondary side");
59         /* recover runstate to normal migration finish state */
60         autostart = true;
61     }
62 
63     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
64                                    FAILOVER_STATUS_COMPLETED);
65     if (old_state != FAILOVER_STATUS_ACTIVE) {
66         error_report("Incorrect state (%s) while doing failover for "
67                      "secondary VM", FailoverStatus_lookup[old_state]);
68         return;
69     }
70     /* For Secondary VM, jump to incoming co */
71     if (mis->migration_incoming_co) {
72         qemu_coroutine_enter(mis->migration_incoming_co);
73     }
74 }
75 
76 static void primary_vm_do_failover(void)
77 {
78     MigrationState *s = migrate_get_current();
79     int old_state;
80 
81     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
82                       MIGRATION_STATUS_COMPLETED);
83 
84     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
85                                    FAILOVER_STATUS_COMPLETED);
86     if (old_state != FAILOVER_STATUS_ACTIVE) {
87         error_report("Incorrect state (%s) while doing failover for Primary VM",
88                      FailoverStatus_lookup[old_state]);
89         return;
90     }
91 }
92 
93 void colo_do_failover(MigrationState *s)
94 {
95     /* Make sure VM stopped while failover happened. */
96     if (!colo_runstate_is_stopped()) {
97         vm_stop_force_state(RUN_STATE_COLO);
98     }
99 
100     if (get_colo_mode() == COLO_MODE_PRIMARY) {
101         primary_vm_do_failover();
102     } else {
103         secondary_vm_do_failover();
104     }
105 }
106 
107 static void colo_send_message(QEMUFile *f, COLOMessage msg,
108                               Error **errp)
109 {
110     int ret;
111 
112     if (msg >= COLO_MESSAGE__MAX) {
113         error_setg(errp, "%s: Invalid message", __func__);
114         return;
115     }
116     qemu_put_be32(f, msg);
117     qemu_fflush(f);
118 
119     ret = qemu_file_get_error(f);
120     if (ret < 0) {
121         error_setg_errno(errp, -ret, "Can't send COLO message");
122     }
123     trace_colo_send_message(COLOMessage_lookup[msg]);
124 }
125 
126 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
127                                     uint64_t value, Error **errp)
128 {
129     Error *local_err = NULL;
130     int ret;
131 
132     colo_send_message(f, msg, &local_err);
133     if (local_err) {
134         error_propagate(errp, local_err);
135         return;
136     }
137     qemu_put_be64(f, value);
138     qemu_fflush(f);
139 
140     ret = qemu_file_get_error(f);
141     if (ret < 0) {
142         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
143                          COLOMessage_lookup[msg]);
144     }
145 }
146 
147 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
148 {
149     COLOMessage msg;
150     int ret;
151 
152     msg = qemu_get_be32(f);
153     ret = qemu_file_get_error(f);
154     if (ret < 0) {
155         error_setg_errno(errp, -ret, "Can't receive COLO message");
156         return msg;
157     }
158     if (msg >= COLO_MESSAGE__MAX) {
159         error_setg(errp, "%s: Invalid message", __func__);
160         return msg;
161     }
162     trace_colo_receive_message(COLOMessage_lookup[msg]);
163     return msg;
164 }
165 
166 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
167                                        Error **errp)
168 {
169     COLOMessage msg;
170     Error *local_err = NULL;
171 
172     msg = colo_receive_message(f, &local_err);
173     if (local_err) {
174         error_propagate(errp, local_err);
175         return;
176     }
177     if (msg != expect_msg) {
178         error_setg(errp, "Unexpected COLO message %d, expected %d",
179                           msg, expect_msg);
180     }
181 }
182 
183 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
184                                            Error **errp)
185 {
186     Error *local_err = NULL;
187     uint64_t value;
188     int ret;
189 
190     colo_receive_check_message(f, expect_msg, &local_err);
191     if (local_err) {
192         error_propagate(errp, local_err);
193         return 0;
194     }
195 
196     value = qemu_get_be64(f);
197     ret = qemu_file_get_error(f);
198     if (ret < 0) {
199         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
200                          COLOMessage_lookup[expect_msg]);
201     }
202     return value;
203 }
204 
205 static int colo_do_checkpoint_transaction(MigrationState *s,
206                                           QIOChannelBuffer *bioc,
207                                           QEMUFile *fb)
208 {
209     Error *local_err = NULL;
210     int ret = -1;
211 
212     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
213                       &local_err);
214     if (local_err) {
215         goto out;
216     }
217 
218     colo_receive_check_message(s->rp_state.from_dst_file,
219                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
220     if (local_err) {
221         goto out;
222     }
223     /* Reset channel-buffer directly */
224     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
225     bioc->usage = 0;
226 
227     qemu_mutex_lock_iothread();
228     if (failover_get_state() != FAILOVER_STATUS_NONE) {
229         qemu_mutex_unlock_iothread();
230         goto out;
231     }
232     vm_stop_force_state(RUN_STATE_COLO);
233     qemu_mutex_unlock_iothread();
234     trace_colo_vm_state_change("run", "stop");
235     /*
236      * Failover request bh could be called after vm_stop_force_state(),
237      * So we need check failover_request_is_active() again.
238      */
239     if (failover_get_state() != FAILOVER_STATUS_NONE) {
240         goto out;
241     }
242 
243     /* Disable block migration */
244     s->params.blk = 0;
245     s->params.shared = 0;
246     qemu_savevm_state_header(fb);
247     qemu_savevm_state_begin(fb, &s->params);
248     qemu_mutex_lock_iothread();
249     qemu_savevm_state_complete_precopy(fb, false);
250     qemu_mutex_unlock_iothread();
251 
252     qemu_fflush(fb);
253 
254     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
255     if (local_err) {
256         goto out;
257     }
258     /*
259      * We need the size of the VMstate data in Secondary side,
260      * With which we can decide how much data should be read.
261      */
262     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
263                             bioc->usage, &local_err);
264     if (local_err) {
265         goto out;
266     }
267 
268     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
269     qemu_fflush(s->to_dst_file);
270     ret = qemu_file_get_error(s->to_dst_file);
271     if (ret < 0) {
272         goto out;
273     }
274 
275     colo_receive_check_message(s->rp_state.from_dst_file,
276                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
277     if (local_err) {
278         goto out;
279     }
280 
281     colo_receive_check_message(s->rp_state.from_dst_file,
282                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
283     if (local_err) {
284         goto out;
285     }
286 
287     ret = 0;
288 
289     qemu_mutex_lock_iothread();
290     vm_start();
291     qemu_mutex_unlock_iothread();
292     trace_colo_vm_state_change("stop", "run");
293 
294 out:
295     if (local_err) {
296         error_report_err(local_err);
297     }
298     return ret;
299 }
300 
301 static void colo_process_checkpoint(MigrationState *s)
302 {
303     QIOChannelBuffer *bioc;
304     QEMUFile *fb = NULL;
305     int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
306     Error *local_err = NULL;
307     int ret;
308 
309     failover_init_state();
310 
311     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
312     if (!s->rp_state.from_dst_file) {
313         error_report("Open QEMUFile from_dst_file failed");
314         goto out;
315     }
316 
317     /*
318      * Wait for Secondary finish loading VM states and enter COLO
319      * restore.
320      */
321     colo_receive_check_message(s->rp_state.from_dst_file,
322                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
323     if (local_err) {
324         goto out;
325     }
326     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
327     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
328     object_unref(OBJECT(bioc));
329 
330     qemu_mutex_lock_iothread();
331     vm_start();
332     qemu_mutex_unlock_iothread();
333     trace_colo_vm_state_change("stop", "run");
334 
335     while (s->state == MIGRATION_STATUS_COLO) {
336         if (failover_get_state() != FAILOVER_STATUS_NONE) {
337             error_report("failover request");
338             goto out;
339         }
340 
341         current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
342         if (current_time - checkpoint_time <
343             s->parameters.x_checkpoint_delay) {
344             int64_t delay_ms;
345 
346             delay_ms = s->parameters.x_checkpoint_delay -
347                        (current_time - checkpoint_time);
348             g_usleep(delay_ms * 1000);
349         }
350         ret = colo_do_checkpoint_transaction(s, bioc, fb);
351         if (ret < 0) {
352             goto out;
353         }
354         checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
355     }
356 
357 out:
358     /* Throw the unreported error message after exited from loop */
359     if (local_err) {
360         error_report_err(local_err);
361     }
362 
363     if (fb) {
364         qemu_fclose(fb);
365     }
366 
367     if (s->rp_state.from_dst_file) {
368         qemu_fclose(s->rp_state.from_dst_file);
369     }
370 }
371 
372 void migrate_start_colo_process(MigrationState *s)
373 {
374     qemu_mutex_unlock_iothread();
375     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
376                       MIGRATION_STATUS_COLO);
377     colo_process_checkpoint(s);
378     qemu_mutex_lock_iothread();
379 }
380 
381 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
382                                      Error **errp)
383 {
384     COLOMessage msg;
385     Error *local_err = NULL;
386 
387     msg = colo_receive_message(f, &local_err);
388     if (local_err) {
389         error_propagate(errp, local_err);
390         return;
391     }
392 
393     switch (msg) {
394     case COLO_MESSAGE_CHECKPOINT_REQUEST:
395         *checkpoint_request = 1;
396         break;
397     default:
398         *checkpoint_request = 0;
399         error_setg(errp, "Got unknown COLO message: %d", msg);
400         break;
401     }
402 }
403 
404 void *colo_process_incoming_thread(void *opaque)
405 {
406     MigrationIncomingState *mis = opaque;
407     QEMUFile *fb = NULL;
408     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
409     uint64_t total_size;
410     uint64_t value;
411     Error *local_err = NULL;
412 
413     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
414                       MIGRATION_STATUS_COLO);
415 
416     failover_init_state();
417 
418     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
419     if (!mis->to_src_file) {
420         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
421         goto out;
422     }
423     /*
424      * Note: the communication between Primary side and Secondary side
425      * should be sequential, we set the fd to unblocked in migration incoming
426      * coroutine, and here we are in the COLO incoming thread, so it is ok to
427      * set the fd back to blocked.
428      */
429     qemu_file_set_blocking(mis->from_src_file, true);
430 
431     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
432     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
433     object_unref(OBJECT(bioc));
434 
435     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
436                       &local_err);
437     if (local_err) {
438         goto out;
439     }
440 
441     while (mis->state == MIGRATION_STATUS_COLO) {
442         int request = 0;
443 
444         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
445         if (local_err) {
446             goto out;
447         }
448         assert(request);
449         if (failover_get_state() != FAILOVER_STATUS_NONE) {
450             error_report("failover request");
451             goto out;
452         }
453 
454         /* FIXME: This is unnecessary for periodic checkpoint mode */
455         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
456                      &local_err);
457         if (local_err) {
458             goto out;
459         }
460 
461         colo_receive_check_message(mis->from_src_file,
462                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
463         if (local_err) {
464             goto out;
465         }
466 
467         value = colo_receive_message_value(mis->from_src_file,
468                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
469         if (local_err) {
470             goto out;
471         }
472 
473         /*
474          * Read VM device state data into channel buffer,
475          * It's better to re-use the memory allocated.
476          * Here we need to handle the channel buffer directly.
477          */
478         if (value > bioc->capacity) {
479             bioc->capacity = value;
480             bioc->data = g_realloc(bioc->data, bioc->capacity);
481         }
482         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
483         if (total_size != value) {
484             error_report("Got %" PRIu64 " VMState data, less than expected"
485                         " %" PRIu64, total_size, value);
486             goto out;
487         }
488         bioc->usage = total_size;
489         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
490 
491         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
492                      &local_err);
493         if (local_err) {
494             goto out;
495         }
496 
497         qemu_mutex_lock_iothread();
498         qemu_system_reset(VMRESET_SILENT);
499         if (qemu_loadvm_state(fb) < 0) {
500             error_report("COLO: loadvm failed");
501             qemu_mutex_unlock_iothread();
502             goto out;
503         }
504         qemu_mutex_unlock_iothread();
505 
506         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
507                      &local_err);
508         if (local_err) {
509             goto out;
510         }
511     }
512 
513 out:
514     /* Throw the unreported error message after exited from loop */
515     if (local_err) {
516         error_report_err(local_err);
517     }
518 
519     if (fb) {
520         qemu_fclose(fb);
521     }
522 
523     if (mis->to_src_file) {
524         qemu_fclose(mis->to_src_file);
525     }
526     migration_incoming_exit_colo();
527 
528     return NULL;
529 }
530