xref: /qemu/block/replication.c (revision ab9056ff)
1 /*
2  * Replication Block filter
3  *
4  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
5  * Copyright (c) 2016 Intel Corporation
6  * Copyright (c) 2016 FUJITSU LIMITED
7  *
8  * Author:
9  *   Wen Congyang <wency@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu/module.h"
17 #include "qemu/option.h"
18 #include "block/nbd.h"
19 #include "block/blockjob.h"
20 #include "block/block_int.h"
21 #include "block/block_backup.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qapi/qmp/qdict.h"
25 #include "replication.h"
26 
27 typedef enum {
28     BLOCK_REPLICATION_NONE,             /* block replication is not started */
29     BLOCK_REPLICATION_RUNNING,          /* block replication is running */
30     BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
31     BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
32     BLOCK_REPLICATION_DONE,             /* block replication is done */
33 } ReplicationStage;
34 
35 typedef struct BDRVReplicationState {
36     ReplicationMode mode;
37     ReplicationStage stage;
38     BdrvChild *active_disk;
39     BlockJob *commit_job;
40     BdrvChild *hidden_disk;
41     BdrvChild *secondary_disk;
42     BlockJob *backup_job;
43     char *top_id;
44     ReplicationState *rs;
45     Error *blocker;
46     bool orig_hidden_read_only;
47     bool orig_secondary_read_only;
48     int error;
49 } BDRVReplicationState;
50 
51 static void replication_start(ReplicationState *rs, ReplicationMode mode,
52                               Error **errp);
53 static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
54 static void replication_get_error(ReplicationState *rs, Error **errp);
55 static void replication_stop(ReplicationState *rs, bool failover,
56                              Error **errp);
57 
58 #define REPLICATION_MODE        "mode"
59 #define REPLICATION_TOP_ID      "top-id"
60 static QemuOptsList replication_runtime_opts = {
61     .name = "replication",
62     .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
63     .desc = {
64         {
65             .name = REPLICATION_MODE,
66             .type = QEMU_OPT_STRING,
67         },
68         {
69             .name = REPLICATION_TOP_ID,
70             .type = QEMU_OPT_STRING,
71         },
72         { /* end of list */ }
73     },
74 };
75 
76 static ReplicationOps replication_ops = {
77     .start = replication_start,
78     .checkpoint = replication_do_checkpoint,
79     .get_error = replication_get_error,
80     .stop = replication_stop,
81 };
82 
83 static int replication_open(BlockDriverState *bs, QDict *options,
84                             int flags, Error **errp)
85 {
86     int ret;
87     BDRVReplicationState *s = bs->opaque;
88     Error *local_err = NULL;
89     QemuOpts *opts = NULL;
90     const char *mode;
91     const char *top_id;
92 
93     bs->file = bdrv_open_child(NULL, options, "file", bs, &child_file,
94                                false, errp);
95     if (!bs->file) {
96         return -EINVAL;
97     }
98 
99     ret = -EINVAL;
100     opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
101     qemu_opts_absorb_qdict(opts, options, &local_err);
102     if (local_err) {
103         goto fail;
104     }
105 
106     mode = qemu_opt_get(opts, REPLICATION_MODE);
107     if (!mode) {
108         error_setg(&local_err, "Missing the option mode");
109         goto fail;
110     }
111 
112     if (!strcmp(mode, "primary")) {
113         s->mode = REPLICATION_MODE_PRIMARY;
114         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
115         if (top_id) {
116             error_setg(&local_err, "The primary side does not support option top-id");
117             goto fail;
118         }
119     } else if (!strcmp(mode, "secondary")) {
120         s->mode = REPLICATION_MODE_SECONDARY;
121         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
122         s->top_id = g_strdup(top_id);
123         if (!s->top_id) {
124             error_setg(&local_err, "Missing the option top-id");
125             goto fail;
126         }
127     } else {
128         error_setg(&local_err,
129                    "The option mode's value should be primary or secondary");
130         goto fail;
131     }
132 
133     s->rs = replication_new(bs, &replication_ops);
134 
135     ret = 0;
136 
137 fail:
138     qemu_opts_del(opts);
139     error_propagate(errp, local_err);
140 
141     return ret;
142 }
143 
144 static void replication_close(BlockDriverState *bs)
145 {
146     BDRVReplicationState *s = bs->opaque;
147 
148     if (s->stage == BLOCK_REPLICATION_RUNNING) {
149         replication_stop(s->rs, false, NULL);
150     }
151     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
152         job_cancel_sync(&s->commit_job->job);
153     }
154 
155     if (s->mode == REPLICATION_MODE_SECONDARY) {
156         g_free(s->top_id);
157     }
158 
159     replication_remove(s->rs);
160 }
161 
162 static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
163                                    const BdrvChildRole *role,
164                                    BlockReopenQueue *reopen_queue,
165                                    uint64_t perm, uint64_t shared,
166                                    uint64_t *nperm, uint64_t *nshared)
167 {
168     *nperm = BLK_PERM_CONSISTENT_READ;
169     if ((bs->open_flags & (BDRV_O_INACTIVE | BDRV_O_RDWR)) == BDRV_O_RDWR) {
170         *nperm |= BLK_PERM_WRITE;
171     }
172     *nshared = BLK_PERM_CONSISTENT_READ \
173                | BLK_PERM_WRITE \
174                | BLK_PERM_WRITE_UNCHANGED;
175     return;
176 }
177 
178 static int64_t replication_getlength(BlockDriverState *bs)
179 {
180     return bdrv_getlength(bs->file->bs);
181 }
182 
183 static int replication_get_io_status(BDRVReplicationState *s)
184 {
185     switch (s->stage) {
186     case BLOCK_REPLICATION_NONE:
187         return -EIO;
188     case BLOCK_REPLICATION_RUNNING:
189         return 0;
190     case BLOCK_REPLICATION_FAILOVER:
191         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
192     case BLOCK_REPLICATION_FAILOVER_FAILED:
193         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
194     case BLOCK_REPLICATION_DONE:
195         /*
196          * active commit job completes, and active disk and secondary_disk
197          * is swapped, so we can operate bs->file directly
198          */
199         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
200     default:
201         abort();
202     }
203 }
204 
205 static int replication_return_value(BDRVReplicationState *s, int ret)
206 {
207     if (s->mode == REPLICATION_MODE_SECONDARY) {
208         return ret;
209     }
210 
211     if (ret < 0) {
212         s->error = ret;
213         ret = 0;
214     }
215 
216     return ret;
217 }
218 
219 static coroutine_fn int replication_co_readv(BlockDriverState *bs,
220                                              int64_t sector_num,
221                                              int remaining_sectors,
222                                              QEMUIOVector *qiov)
223 {
224     BDRVReplicationState *s = bs->opaque;
225     int ret;
226 
227     if (s->mode == REPLICATION_MODE_PRIMARY) {
228         /* We only use it to forward primary write requests */
229         return -EIO;
230     }
231 
232     ret = replication_get_io_status(s);
233     if (ret < 0) {
234         return ret;
235     }
236 
237     ret = bdrv_co_preadv(bs->file, sector_num * BDRV_SECTOR_SIZE,
238                          remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
239 
240     return replication_return_value(s, ret);
241 }
242 
243 static coroutine_fn int replication_co_writev(BlockDriverState *bs,
244                                               int64_t sector_num,
245                                               int remaining_sectors,
246                                               QEMUIOVector *qiov,
247                                               int flags)
248 {
249     BDRVReplicationState *s = bs->opaque;
250     QEMUIOVector hd_qiov;
251     uint64_t bytes_done = 0;
252     BdrvChild *top = bs->file;
253     BdrvChild *base = s->secondary_disk;
254     BdrvChild *target;
255     int ret;
256     int64_t n;
257 
258     assert(!flags);
259     ret = replication_get_io_status(s);
260     if (ret < 0) {
261         goto out;
262     }
263 
264     if (ret == 0) {
265         ret = bdrv_co_pwritev(top, sector_num * BDRV_SECTOR_SIZE,
266                               remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
267         return replication_return_value(s, ret);
268     }
269 
270     /*
271      * Failover failed, only write to active disk if the sectors
272      * have already been allocated in active disk/hidden disk.
273      */
274     qemu_iovec_init(&hd_qiov, qiov->niov);
275     while (remaining_sectors > 0) {
276         int64_t count;
277 
278         ret = bdrv_is_allocated_above(top->bs, base->bs, false,
279                                       sector_num * BDRV_SECTOR_SIZE,
280                                       remaining_sectors * BDRV_SECTOR_SIZE,
281                                       &count);
282         if (ret < 0) {
283             goto out1;
284         }
285 
286         assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
287         n = count >> BDRV_SECTOR_BITS;
288         qemu_iovec_reset(&hd_qiov);
289         qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
290 
291         target = ret ? top : base;
292         ret = bdrv_co_pwritev(target, sector_num * BDRV_SECTOR_SIZE,
293                               n * BDRV_SECTOR_SIZE, &hd_qiov, 0);
294         if (ret < 0) {
295             goto out1;
296         }
297 
298         remaining_sectors -= n;
299         sector_num += n;
300         bytes_done += count;
301     }
302 
303 out1:
304     qemu_iovec_destroy(&hd_qiov);
305 out:
306     return ret;
307 }
308 
309 static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
310                                                     BlockDriverState *candidate)
311 {
312     return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
313 }
314 
315 static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
316 {
317     Error *local_err = NULL;
318     int ret;
319 
320     if (!s->backup_job) {
321         error_setg(errp, "Backup job was cancelled unexpectedly");
322         return;
323     }
324 
325     backup_do_checkpoint(s->backup_job, &local_err);
326     if (local_err) {
327         error_propagate(errp, local_err);
328         return;
329     }
330 
331     if (!s->active_disk->bs->drv) {
332         error_setg(errp, "Active disk %s is ejected",
333                    s->active_disk->bs->node_name);
334         return;
335     }
336 
337     ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
338     if (ret < 0) {
339         error_setg(errp, "Cannot make active disk empty");
340         return;
341     }
342 
343     if (!s->hidden_disk->bs->drv) {
344         error_setg(errp, "Hidden disk %s is ejected",
345                    s->hidden_disk->bs->node_name);
346         return;
347     }
348 
349     ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
350     if (ret < 0) {
351         error_setg(errp, "Cannot make hidden disk empty");
352         return;
353     }
354 }
355 
356 /* This function is supposed to be called twice:
357  * first with writable = true, then with writable = false.
358  * The first call puts s->hidden_disk and s->secondary_disk in
359  * r/w mode, and the second puts them back in their original state.
360  */
361 static void reopen_backing_file(BlockDriverState *bs, bool writable,
362                                 Error **errp)
363 {
364     BDRVReplicationState *s = bs->opaque;
365     BlockReopenQueue *reopen_queue = NULL;
366     Error *local_err = NULL;
367 
368     if (writable) {
369         s->orig_hidden_read_only = bdrv_is_read_only(s->hidden_disk->bs);
370         s->orig_secondary_read_only = bdrv_is_read_only(s->secondary_disk->bs);
371     }
372 
373     bdrv_subtree_drained_begin(s->hidden_disk->bs);
374     bdrv_subtree_drained_begin(s->secondary_disk->bs);
375 
376     if (s->orig_hidden_read_only) {
377         QDict *opts = qdict_new();
378         qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
379         reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs,
380                                          opts, true);
381     }
382 
383     if (s->orig_secondary_read_only) {
384         QDict *opts = qdict_new();
385         qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
386         reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
387                                          opts, true);
388     }
389 
390     if (reopen_queue) {
391         bdrv_reopen_multiple(reopen_queue, &local_err);
392         error_propagate(errp, local_err);
393     }
394 
395     bdrv_subtree_drained_end(s->hidden_disk->bs);
396     bdrv_subtree_drained_end(s->secondary_disk->bs);
397 }
398 
399 static void backup_job_cleanup(BlockDriverState *bs)
400 {
401     BDRVReplicationState *s = bs->opaque;
402     BlockDriverState *top_bs;
403 
404     top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
405     if (!top_bs) {
406         return;
407     }
408     bdrv_op_unblock_all(top_bs, s->blocker);
409     error_free(s->blocker);
410     reopen_backing_file(bs, false, NULL);
411 }
412 
413 static void backup_job_completed(void *opaque, int ret)
414 {
415     BlockDriverState *bs = opaque;
416     BDRVReplicationState *s = bs->opaque;
417 
418     if (s->stage != BLOCK_REPLICATION_FAILOVER) {
419         /* The backup job is cancelled unexpectedly */
420         s->error = -EIO;
421     }
422 
423     backup_job_cleanup(bs);
424 }
425 
426 static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
427 {
428     BdrvChild *child;
429 
430     /* The bs itself is the top_bs */
431     if (top_bs == bs) {
432         return true;
433     }
434 
435     /* Iterate over top_bs's children */
436     QLIST_FOREACH(child, &top_bs->children, next) {
437         if (child->bs == bs || check_top_bs(child->bs, bs)) {
438             return true;
439         }
440     }
441 
442     return false;
443 }
444 
445 static void replication_start(ReplicationState *rs, ReplicationMode mode,
446                               Error **errp)
447 {
448     BlockDriverState *bs = rs->opaque;
449     BDRVReplicationState *s;
450     BlockDriverState *top_bs;
451     int64_t active_length, hidden_length, disk_length;
452     AioContext *aio_context;
453     Error *local_err = NULL;
454 
455     aio_context = bdrv_get_aio_context(bs);
456     aio_context_acquire(aio_context);
457     s = bs->opaque;
458 
459     if (s->stage != BLOCK_REPLICATION_NONE) {
460         error_setg(errp, "Block replication is running or done");
461         aio_context_release(aio_context);
462         return;
463     }
464 
465     if (s->mode != mode) {
466         error_setg(errp, "The parameter mode's value is invalid, needs %d,"
467                    " but got %d", s->mode, mode);
468         aio_context_release(aio_context);
469         return;
470     }
471 
472     switch (s->mode) {
473     case REPLICATION_MODE_PRIMARY:
474         break;
475     case REPLICATION_MODE_SECONDARY:
476         s->active_disk = bs->file;
477         if (!s->active_disk || !s->active_disk->bs ||
478                                     !s->active_disk->bs->backing) {
479             error_setg(errp, "Active disk doesn't have backing file");
480             aio_context_release(aio_context);
481             return;
482         }
483 
484         s->hidden_disk = s->active_disk->bs->backing;
485         if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
486             error_setg(errp, "Hidden disk doesn't have backing file");
487             aio_context_release(aio_context);
488             return;
489         }
490 
491         s->secondary_disk = s->hidden_disk->bs->backing;
492         if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
493             error_setg(errp, "The secondary disk doesn't have block backend");
494             aio_context_release(aio_context);
495             return;
496         }
497 
498         /* verify the length */
499         active_length = bdrv_getlength(s->active_disk->bs);
500         hidden_length = bdrv_getlength(s->hidden_disk->bs);
501         disk_length = bdrv_getlength(s->secondary_disk->bs);
502         if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
503             active_length != hidden_length || hidden_length != disk_length) {
504             error_setg(errp, "Active disk, hidden disk, secondary disk's length"
505                        " are not the same");
506             aio_context_release(aio_context);
507             return;
508         }
509 
510         /* Must be true, or the bdrv_getlength() calls would have failed */
511         assert(s->active_disk->bs->drv && s->hidden_disk->bs->drv);
512 
513         if (!s->active_disk->bs->drv->bdrv_make_empty ||
514             !s->hidden_disk->bs->drv->bdrv_make_empty) {
515             error_setg(errp,
516                        "Active disk or hidden disk doesn't support make_empty");
517             aio_context_release(aio_context);
518             return;
519         }
520 
521         /* reopen the backing file in r/w mode */
522         reopen_backing_file(bs, true, &local_err);
523         if (local_err) {
524             error_propagate(errp, local_err);
525             aio_context_release(aio_context);
526             return;
527         }
528 
529         /* start backup job now */
530         error_setg(&s->blocker,
531                    "Block device is in use by internal backup job");
532 
533         top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
534         if (!top_bs || !bdrv_is_root_node(top_bs) ||
535             !check_top_bs(top_bs, bs)) {
536             error_setg(errp, "No top_bs or it is invalid");
537             reopen_backing_file(bs, false, NULL);
538             aio_context_release(aio_context);
539             return;
540         }
541         bdrv_op_block_all(top_bs, s->blocker);
542         bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
543 
544         s->backup_job = backup_job_create(
545                                 NULL, s->secondary_disk->bs, s->hidden_disk->bs,
546                                 0, MIRROR_SYNC_MODE_NONE, NULL, 0, false, NULL,
547                                 BLOCKDEV_ON_ERROR_REPORT,
548                                 BLOCKDEV_ON_ERROR_REPORT, JOB_INTERNAL,
549                                 backup_job_completed, bs, NULL, &local_err);
550         if (local_err) {
551             error_propagate(errp, local_err);
552             backup_job_cleanup(bs);
553             aio_context_release(aio_context);
554             return;
555         }
556         job_start(&s->backup_job->job);
557         break;
558     default:
559         aio_context_release(aio_context);
560         abort();
561     }
562 
563     s->stage = BLOCK_REPLICATION_RUNNING;
564 
565     if (s->mode == REPLICATION_MODE_SECONDARY) {
566         secondary_do_checkpoint(s, errp);
567     }
568 
569     s->error = 0;
570     aio_context_release(aio_context);
571 }
572 
573 static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
574 {
575     BlockDriverState *bs = rs->opaque;
576     BDRVReplicationState *s;
577     AioContext *aio_context;
578 
579     aio_context = bdrv_get_aio_context(bs);
580     aio_context_acquire(aio_context);
581     s = bs->opaque;
582 
583     if (s->mode == REPLICATION_MODE_SECONDARY) {
584         secondary_do_checkpoint(s, errp);
585     }
586     aio_context_release(aio_context);
587 }
588 
589 static void replication_get_error(ReplicationState *rs, Error **errp)
590 {
591     BlockDriverState *bs = rs->opaque;
592     BDRVReplicationState *s;
593     AioContext *aio_context;
594 
595     aio_context = bdrv_get_aio_context(bs);
596     aio_context_acquire(aio_context);
597     s = bs->opaque;
598 
599     if (s->stage != BLOCK_REPLICATION_RUNNING) {
600         error_setg(errp, "Block replication is not running");
601         aio_context_release(aio_context);
602         return;
603     }
604 
605     if (s->error) {
606         error_setg(errp, "I/O error occurred");
607         aio_context_release(aio_context);
608         return;
609     }
610     aio_context_release(aio_context);
611 }
612 
613 static void replication_done(void *opaque, int ret)
614 {
615     BlockDriverState *bs = opaque;
616     BDRVReplicationState *s = bs->opaque;
617 
618     if (ret == 0) {
619         s->stage = BLOCK_REPLICATION_DONE;
620 
621         s->active_disk = NULL;
622         s->secondary_disk = NULL;
623         s->hidden_disk = NULL;
624         s->error = 0;
625     } else {
626         s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
627         s->error = -EIO;
628     }
629 }
630 
631 static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
632 {
633     BlockDriverState *bs = rs->opaque;
634     BDRVReplicationState *s;
635     AioContext *aio_context;
636 
637     aio_context = bdrv_get_aio_context(bs);
638     aio_context_acquire(aio_context);
639     s = bs->opaque;
640 
641     if (s->stage != BLOCK_REPLICATION_RUNNING) {
642         error_setg(errp, "Block replication is not running");
643         aio_context_release(aio_context);
644         return;
645     }
646 
647     switch (s->mode) {
648     case REPLICATION_MODE_PRIMARY:
649         s->stage = BLOCK_REPLICATION_DONE;
650         s->error = 0;
651         break;
652     case REPLICATION_MODE_SECONDARY:
653         /*
654          * This BDS will be closed, and the job should be completed
655          * before the BDS is closed, because we will access hidden
656          * disk, secondary disk in backup_job_completed().
657          */
658         if (s->backup_job) {
659             job_cancel_sync(&s->backup_job->job);
660         }
661 
662         if (!failover) {
663             secondary_do_checkpoint(s, errp);
664             s->stage = BLOCK_REPLICATION_DONE;
665             aio_context_release(aio_context);
666             return;
667         }
668 
669         s->stage = BLOCK_REPLICATION_FAILOVER;
670         s->commit_job = commit_active_start(
671                             NULL, s->active_disk->bs, s->secondary_disk->bs,
672                             JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
673                             NULL, replication_done, bs, true, errp);
674         break;
675     default:
676         aio_context_release(aio_context);
677         abort();
678     }
679     aio_context_release(aio_context);
680 }
681 
682 static const char *const replication_strong_runtime_opts[] = {
683     REPLICATION_MODE,
684     REPLICATION_TOP_ID,
685 
686     NULL
687 };
688 
689 static BlockDriver bdrv_replication = {
690     .format_name                = "replication",
691     .instance_size              = sizeof(BDRVReplicationState),
692 
693     .bdrv_open                  = replication_open,
694     .bdrv_close                 = replication_close,
695     .bdrv_child_perm            = replication_child_perm,
696 
697     .bdrv_getlength             = replication_getlength,
698     .bdrv_co_readv              = replication_co_readv,
699     .bdrv_co_writev             = replication_co_writev,
700 
701     .is_filter                  = true,
702     .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
703 
704     .has_variable_length        = true,
705     .strong_runtime_opts        = replication_strong_runtime_opts,
706 };
707 
708 static void bdrv_replication_init(void)
709 {
710     bdrv_register(&bdrv_replication);
711 }
712 
713 block_init(bdrv_replication_init);
714