xref: /qemu/block/replication.c (revision 39164c13)
1 /*
2  * Replication Block filter
3  *
4  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
5  * Copyright (c) 2016 Intel Corporation
6  * Copyright (c) 2016 FUJITSU LIMITED
7  *
8  * Author:
9  *   Wen Congyang <wency@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "block/nbd.h"
18 #include "block/blockjob.h"
19 #include "block/block_int.h"
20 #include "block/block_backup.h"
21 #include "sysemu/block-backend.h"
22 #include "qapi/error.h"
23 #include "replication.h"
24 
25 typedef struct BDRVReplicationState {
26     ReplicationMode mode;
27     int replication_state;
28     BdrvChild *active_disk;
29     BdrvChild *hidden_disk;
30     BdrvChild *secondary_disk;
31     char *top_id;
32     ReplicationState *rs;
33     Error *blocker;
34     int orig_hidden_flags;
35     int orig_secondary_flags;
36     int error;
37 } BDRVReplicationState;
38 
39 enum {
40     BLOCK_REPLICATION_NONE,             /* block replication is not started */
41     BLOCK_REPLICATION_RUNNING,          /* block replication is running */
42     BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
43     BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
44     BLOCK_REPLICATION_DONE,             /* block replication is done */
45 };
46 
47 static void replication_start(ReplicationState *rs, ReplicationMode mode,
48                               Error **errp);
49 static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
50 static void replication_get_error(ReplicationState *rs, Error **errp);
51 static void replication_stop(ReplicationState *rs, bool failover,
52                              Error **errp);
53 
54 #define REPLICATION_MODE        "mode"
55 #define REPLICATION_TOP_ID      "top-id"
56 static QemuOptsList replication_runtime_opts = {
57     .name = "replication",
58     .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
59     .desc = {
60         {
61             .name = REPLICATION_MODE,
62             .type = QEMU_OPT_STRING,
63         },
64         {
65             .name = REPLICATION_TOP_ID,
66             .type = QEMU_OPT_STRING,
67         },
68         { /* end of list */ }
69     },
70 };
71 
72 static ReplicationOps replication_ops = {
73     .start = replication_start,
74     .checkpoint = replication_do_checkpoint,
75     .get_error = replication_get_error,
76     .stop = replication_stop,
77 };
78 
79 static int replication_open(BlockDriverState *bs, QDict *options,
80                             int flags, Error **errp)
81 {
82     int ret;
83     BDRVReplicationState *s = bs->opaque;
84     Error *local_err = NULL;
85     QemuOpts *opts = NULL;
86     const char *mode;
87     const char *top_id;
88 
89     bs->file = bdrv_open_child(NULL, options, "file", bs, &child_file,
90                                false, errp);
91     if (!bs->file) {
92         return -EINVAL;
93     }
94 
95     ret = -EINVAL;
96     opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
97     qemu_opts_absorb_qdict(opts, options, &local_err);
98     if (local_err) {
99         goto fail;
100     }
101 
102     mode = qemu_opt_get(opts, REPLICATION_MODE);
103     if (!mode) {
104         error_setg(&local_err, "Missing the option mode");
105         goto fail;
106     }
107 
108     if (!strcmp(mode, "primary")) {
109         s->mode = REPLICATION_MODE_PRIMARY;
110         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
111         if (top_id) {
112             error_setg(&local_err, "The primary side does not support option top-id");
113             goto fail;
114         }
115     } else if (!strcmp(mode, "secondary")) {
116         s->mode = REPLICATION_MODE_SECONDARY;
117         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
118         s->top_id = g_strdup(top_id);
119         if (!s->top_id) {
120             error_setg(&local_err, "Missing the option top-id");
121             goto fail;
122         }
123     } else {
124         error_setg(&local_err,
125                    "The option mode's value should be primary or secondary");
126         goto fail;
127     }
128 
129     s->rs = replication_new(bs, &replication_ops);
130 
131     ret = 0;
132 
133 fail:
134     qemu_opts_del(opts);
135     error_propagate(errp, local_err);
136 
137     return ret;
138 }
139 
140 static void replication_close(BlockDriverState *bs)
141 {
142     BDRVReplicationState *s = bs->opaque;
143 
144     if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
145         replication_stop(s->rs, false, NULL);
146     }
147     if (s->replication_state == BLOCK_REPLICATION_FAILOVER) {
148         block_job_cancel_sync(s->active_disk->bs->job);
149     }
150 
151     if (s->mode == REPLICATION_MODE_SECONDARY) {
152         g_free(s->top_id);
153     }
154 
155     replication_remove(s->rs);
156 }
157 
158 static int64_t replication_getlength(BlockDriverState *bs)
159 {
160     return bdrv_getlength(bs->file->bs);
161 }
162 
163 static int replication_get_io_status(BDRVReplicationState *s)
164 {
165     switch (s->replication_state) {
166     case BLOCK_REPLICATION_NONE:
167         return -EIO;
168     case BLOCK_REPLICATION_RUNNING:
169         return 0;
170     case BLOCK_REPLICATION_FAILOVER:
171         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
172     case BLOCK_REPLICATION_FAILOVER_FAILED:
173         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
174     case BLOCK_REPLICATION_DONE:
175         /*
176          * active commit job completes, and active disk and secondary_disk
177          * is swapped, so we can operate bs->file directly
178          */
179         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
180     default:
181         abort();
182     }
183 }
184 
185 static int replication_return_value(BDRVReplicationState *s, int ret)
186 {
187     if (s->mode == REPLICATION_MODE_SECONDARY) {
188         return ret;
189     }
190 
191     if (ret < 0) {
192         s->error = ret;
193         ret = 0;
194     }
195 
196     return ret;
197 }
198 
199 static coroutine_fn int replication_co_readv(BlockDriverState *bs,
200                                              int64_t sector_num,
201                                              int remaining_sectors,
202                                              QEMUIOVector *qiov)
203 {
204     BDRVReplicationState *s = bs->opaque;
205     BdrvChild *child = s->secondary_disk;
206     BlockJob *job = NULL;
207     CowRequest req;
208     int ret;
209 
210     if (s->mode == REPLICATION_MODE_PRIMARY) {
211         /* We only use it to forward primary write requests */
212         return -EIO;
213     }
214 
215     ret = replication_get_io_status(s);
216     if (ret < 0) {
217         return ret;
218     }
219 
220     if (child && child->bs) {
221         job = child->bs->job;
222     }
223 
224     if (job) {
225         backup_wait_for_overlapping_requests(child->bs->job, sector_num,
226                                              remaining_sectors);
227         backup_cow_request_begin(&req, child->bs->job, sector_num,
228                                  remaining_sectors);
229         ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors,
230                             qiov);
231         backup_cow_request_end(&req);
232         goto out;
233     }
234 
235     ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors, qiov);
236 out:
237     return replication_return_value(s, ret);
238 }
239 
240 static coroutine_fn int replication_co_writev(BlockDriverState *bs,
241                                               int64_t sector_num,
242                                               int remaining_sectors,
243                                               QEMUIOVector *qiov)
244 {
245     BDRVReplicationState *s = bs->opaque;
246     QEMUIOVector hd_qiov;
247     uint64_t bytes_done = 0;
248     BdrvChild *top = bs->file;
249     BdrvChild *base = s->secondary_disk;
250     BdrvChild *target;
251     int ret, n;
252 
253     ret = replication_get_io_status(s);
254     if (ret < 0) {
255         goto out;
256     }
257 
258     if (ret == 0) {
259         ret = bdrv_co_writev(top, sector_num,
260                              remaining_sectors, qiov);
261         return replication_return_value(s, ret);
262     }
263 
264     /*
265      * Failover failed, only write to active disk if the sectors
266      * have already been allocated in active disk/hidden disk.
267      */
268     qemu_iovec_init(&hd_qiov, qiov->niov);
269     while (remaining_sectors > 0) {
270         ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
271                                       remaining_sectors, &n);
272         if (ret < 0) {
273             goto out1;
274         }
275 
276         qemu_iovec_reset(&hd_qiov);
277         qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
278 
279         target = ret ? top : base;
280         ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
281         if (ret < 0) {
282             goto out1;
283         }
284 
285         remaining_sectors -= n;
286         sector_num += n;
287         bytes_done += n * BDRV_SECTOR_SIZE;
288     }
289 
290 out1:
291     qemu_iovec_destroy(&hd_qiov);
292 out:
293     return ret;
294 }
295 
296 static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
297                                                     BlockDriverState *candidate)
298 {
299     return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
300 }
301 
302 static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
303 {
304     Error *local_err = NULL;
305     int ret;
306 
307     if (!s->secondary_disk->bs->job) {
308         error_setg(errp, "Backup job was cancelled unexpectedly");
309         return;
310     }
311 
312     backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
313     if (local_err) {
314         error_propagate(errp, local_err);
315         return;
316     }
317 
318     ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
319     if (ret < 0) {
320         error_setg(errp, "Cannot make active disk empty");
321         return;
322     }
323 
324     ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
325     if (ret < 0) {
326         error_setg(errp, "Cannot make hidden disk empty");
327         return;
328     }
329 }
330 
331 static void reopen_backing_file(BlockDriverState *bs, bool writable,
332                                 Error **errp)
333 {
334     BDRVReplicationState *s = bs->opaque;
335     BlockReopenQueue *reopen_queue = NULL;
336     int orig_hidden_flags, orig_secondary_flags;
337     int new_hidden_flags, new_secondary_flags;
338     Error *local_err = NULL;
339 
340     if (writable) {
341         orig_hidden_flags = s->orig_hidden_flags =
342                                 bdrv_get_flags(s->hidden_disk->bs);
343         new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
344                                                     ~BDRV_O_INACTIVE;
345         orig_secondary_flags = s->orig_secondary_flags =
346                                 bdrv_get_flags(s->secondary_disk->bs);
347         new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
348                                                      ~BDRV_O_INACTIVE;
349     } else {
350         orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
351                                                     ~BDRV_O_INACTIVE;
352         new_hidden_flags = s->orig_hidden_flags;
353         orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
354                                                     ~BDRV_O_INACTIVE;
355         new_secondary_flags = s->orig_secondary_flags;
356     }
357 
358     if (orig_hidden_flags != new_hidden_flags) {
359         reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
360                                          new_hidden_flags);
361     }
362 
363     if (!(orig_secondary_flags & BDRV_O_RDWR)) {
364         reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
365                                          NULL, new_secondary_flags);
366     }
367 
368     if (reopen_queue) {
369         bdrv_reopen_multiple(bdrv_get_aio_context(bs),
370                              reopen_queue, &local_err);
371         error_propagate(errp, local_err);
372     }
373 }
374 
375 static void backup_job_cleanup(BlockDriverState *bs)
376 {
377     BDRVReplicationState *s = bs->opaque;
378     BlockDriverState *top_bs;
379 
380     top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
381     if (!top_bs) {
382         return;
383     }
384     bdrv_op_unblock_all(top_bs, s->blocker);
385     error_free(s->blocker);
386     reopen_backing_file(bs, false, NULL);
387 }
388 
389 static void backup_job_completed(void *opaque, int ret)
390 {
391     BlockDriverState *bs = opaque;
392     BDRVReplicationState *s = bs->opaque;
393 
394     if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
395         /* The backup job is cancelled unexpectedly */
396         s->error = -EIO;
397     }
398 
399     backup_job_cleanup(bs);
400 }
401 
402 static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
403 {
404     BdrvChild *child;
405 
406     /* The bs itself is the top_bs */
407     if (top_bs == bs) {
408         return true;
409     }
410 
411     /* Iterate over top_bs's children */
412     QLIST_FOREACH(child, &top_bs->children, next) {
413         if (child->bs == bs || check_top_bs(child->bs, bs)) {
414             return true;
415         }
416     }
417 
418     return false;
419 }
420 
421 static void replication_start(ReplicationState *rs, ReplicationMode mode,
422                               Error **errp)
423 {
424     BlockDriverState *bs = rs->opaque;
425     BDRVReplicationState *s;
426     BlockDriverState *top_bs;
427     int64_t active_length, hidden_length, disk_length;
428     AioContext *aio_context;
429     Error *local_err = NULL;
430     BlockJob *job;
431 
432     aio_context = bdrv_get_aio_context(bs);
433     aio_context_acquire(aio_context);
434     s = bs->opaque;
435 
436     if (s->replication_state != BLOCK_REPLICATION_NONE) {
437         error_setg(errp, "Block replication is running or done");
438         aio_context_release(aio_context);
439         return;
440     }
441 
442     if (s->mode != mode) {
443         error_setg(errp, "The parameter mode's value is invalid, needs %d,"
444                    " but got %d", s->mode, mode);
445         aio_context_release(aio_context);
446         return;
447     }
448 
449     switch (s->mode) {
450     case REPLICATION_MODE_PRIMARY:
451         break;
452     case REPLICATION_MODE_SECONDARY:
453         s->active_disk = bs->file;
454         if (!s->active_disk || !s->active_disk->bs ||
455                                     !s->active_disk->bs->backing) {
456             error_setg(errp, "Active disk doesn't have backing file");
457             aio_context_release(aio_context);
458             return;
459         }
460 
461         s->hidden_disk = s->active_disk->bs->backing;
462         if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
463             error_setg(errp, "Hidden disk doesn't have backing file");
464             aio_context_release(aio_context);
465             return;
466         }
467 
468         s->secondary_disk = s->hidden_disk->bs->backing;
469         if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
470             error_setg(errp, "The secondary disk doesn't have block backend");
471             aio_context_release(aio_context);
472             return;
473         }
474 
475         /* verify the length */
476         active_length = bdrv_getlength(s->active_disk->bs);
477         hidden_length = bdrv_getlength(s->hidden_disk->bs);
478         disk_length = bdrv_getlength(s->secondary_disk->bs);
479         if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
480             active_length != hidden_length || hidden_length != disk_length) {
481             error_setg(errp, "Active disk, hidden disk, secondary disk's length"
482                        " are not the same");
483             aio_context_release(aio_context);
484             return;
485         }
486 
487         if (!s->active_disk->bs->drv->bdrv_make_empty ||
488             !s->hidden_disk->bs->drv->bdrv_make_empty) {
489             error_setg(errp,
490                        "Active disk or hidden disk doesn't support make_empty");
491             aio_context_release(aio_context);
492             return;
493         }
494 
495         /* reopen the backing file in r/w mode */
496         reopen_backing_file(bs, true, &local_err);
497         if (local_err) {
498             error_propagate(errp, local_err);
499             aio_context_release(aio_context);
500             return;
501         }
502 
503         /* start backup job now */
504         error_setg(&s->blocker,
505                    "Block device is in use by internal backup job");
506 
507         top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
508         if (!top_bs || !bdrv_is_root_node(top_bs) ||
509             !check_top_bs(top_bs, bs)) {
510             error_setg(errp, "No top_bs or it is invalid");
511             reopen_backing_file(bs, false, NULL);
512             aio_context_release(aio_context);
513             return;
514         }
515         bdrv_op_block_all(top_bs, s->blocker);
516         bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
517 
518         job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs,
519                                 0, MIRROR_SYNC_MODE_NONE, NULL, false,
520                                 BLOCKDEV_ON_ERROR_REPORT,
521                                 BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL,
522                                 backup_job_completed, bs, NULL, &local_err);
523         if (local_err) {
524             error_propagate(errp, local_err);
525             backup_job_cleanup(bs);
526             aio_context_release(aio_context);
527             return;
528         }
529         block_job_start(job);
530         break;
531     default:
532         aio_context_release(aio_context);
533         abort();
534     }
535 
536     s->replication_state = BLOCK_REPLICATION_RUNNING;
537 
538     if (s->mode == REPLICATION_MODE_SECONDARY) {
539         secondary_do_checkpoint(s, errp);
540     }
541 
542     s->error = 0;
543     aio_context_release(aio_context);
544 }
545 
546 static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
547 {
548     BlockDriverState *bs = rs->opaque;
549     BDRVReplicationState *s;
550     AioContext *aio_context;
551 
552     aio_context = bdrv_get_aio_context(bs);
553     aio_context_acquire(aio_context);
554     s = bs->opaque;
555 
556     if (s->mode == REPLICATION_MODE_SECONDARY) {
557         secondary_do_checkpoint(s, errp);
558     }
559     aio_context_release(aio_context);
560 }
561 
562 static void replication_get_error(ReplicationState *rs, Error **errp)
563 {
564     BlockDriverState *bs = rs->opaque;
565     BDRVReplicationState *s;
566     AioContext *aio_context;
567 
568     aio_context = bdrv_get_aio_context(bs);
569     aio_context_acquire(aio_context);
570     s = bs->opaque;
571 
572     if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
573         error_setg(errp, "Block replication is not running");
574         aio_context_release(aio_context);
575         return;
576     }
577 
578     if (s->error) {
579         error_setg(errp, "I/O error occurred");
580         aio_context_release(aio_context);
581         return;
582     }
583     aio_context_release(aio_context);
584 }
585 
586 static void replication_done(void *opaque, int ret)
587 {
588     BlockDriverState *bs = opaque;
589     BDRVReplicationState *s = bs->opaque;
590 
591     if (ret == 0) {
592         s->replication_state = BLOCK_REPLICATION_DONE;
593 
594         /* refresh top bs's filename */
595         bdrv_refresh_filename(bs);
596         s->active_disk = NULL;
597         s->secondary_disk = NULL;
598         s->hidden_disk = NULL;
599         s->error = 0;
600     } else {
601         s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
602         s->error = -EIO;
603     }
604 }
605 
606 static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
607 {
608     BlockDriverState *bs = rs->opaque;
609     BDRVReplicationState *s;
610     AioContext *aio_context;
611 
612     aio_context = bdrv_get_aio_context(bs);
613     aio_context_acquire(aio_context);
614     s = bs->opaque;
615 
616     if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
617         error_setg(errp, "Block replication is not running");
618         aio_context_release(aio_context);
619         return;
620     }
621 
622     switch (s->mode) {
623     case REPLICATION_MODE_PRIMARY:
624         s->replication_state = BLOCK_REPLICATION_DONE;
625         s->error = 0;
626         break;
627     case REPLICATION_MODE_SECONDARY:
628         /*
629          * This BDS will be closed, and the job should be completed
630          * before the BDS is closed, because we will access hidden
631          * disk, secondary disk in backup_job_completed().
632          */
633         if (s->secondary_disk->bs->job) {
634             block_job_cancel_sync(s->secondary_disk->bs->job);
635         }
636 
637         if (!failover) {
638             secondary_do_checkpoint(s, errp);
639             s->replication_state = BLOCK_REPLICATION_DONE;
640             aio_context_release(aio_context);
641             return;
642         }
643 
644         s->replication_state = BLOCK_REPLICATION_FAILOVER;
645         commit_active_start(NULL, s->active_disk->bs, s->secondary_disk->bs,
646                             BLOCK_JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
647                             replication_done, bs, errp, true);
648         break;
649     default:
650         aio_context_release(aio_context);
651         abort();
652     }
653     aio_context_release(aio_context);
654 }
655 
656 BlockDriver bdrv_replication = {
657     .format_name                = "replication",
658     .protocol_name              = "replication",
659     .instance_size              = sizeof(BDRVReplicationState),
660 
661     .bdrv_open                  = replication_open,
662     .bdrv_close                 = replication_close,
663 
664     .bdrv_getlength             = replication_getlength,
665     .bdrv_co_readv              = replication_co_readv,
666     .bdrv_co_writev             = replication_co_writev,
667 
668     .is_filter                  = true,
669     .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
670 
671     .has_variable_length        = true,
672 };
673 
674 static void bdrv_replication_init(void)
675 {
676     bdrv_register(&bdrv_replication);
677 }
678 
679 block_init(bdrv_replication_init);
680