xref: /qemu/block/replication.c (revision 4ce1e15f)
1 /*
2  * Replication Block filter
3  *
4  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
5  * Copyright (c) 2016 Intel Corporation
6  * Copyright (c) 2016 FUJITSU LIMITED
7  *
8  * Author:
9  *   Wen Congyang <wency@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu/module.h"
17 #include "qemu/option.h"
18 #include "block/nbd.h"
19 #include "block/blockjob.h"
20 #include "block/block_int.h"
21 #include "block/block_backup.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qapi/qmp/qdict.h"
25 #include "replication.h"
26 
27 typedef enum {
28     BLOCK_REPLICATION_NONE,             /* block replication is not started */
29     BLOCK_REPLICATION_RUNNING,          /* block replication is running */
30     BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
31     BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
32     BLOCK_REPLICATION_DONE,             /* block replication is done */
33 } ReplicationStage;
34 
35 typedef struct BDRVReplicationState {
36     ReplicationMode mode;
37     ReplicationStage stage;
38     BdrvChild *active_disk;
39     BlockJob *commit_job;
40     BdrvChild *hidden_disk;
41     BdrvChild *secondary_disk;
42     BlockJob *backup_job;
43     char *top_id;
44     ReplicationState *rs;
45     Error *blocker;
46     bool orig_hidden_read_only;
47     bool orig_secondary_read_only;
48     int error;
49 } BDRVReplicationState;
50 
51 static void replication_start(ReplicationState *rs, ReplicationMode mode,
52                               Error **errp);
53 static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
54 static void replication_get_error(ReplicationState *rs, Error **errp);
55 static void replication_stop(ReplicationState *rs, bool failover,
56                              Error **errp);
57 
58 #define REPLICATION_MODE        "mode"
59 #define REPLICATION_TOP_ID      "top-id"
60 static QemuOptsList replication_runtime_opts = {
61     .name = "replication",
62     .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
63     .desc = {
64         {
65             .name = REPLICATION_MODE,
66             .type = QEMU_OPT_STRING,
67         },
68         {
69             .name = REPLICATION_TOP_ID,
70             .type = QEMU_OPT_STRING,
71         },
72         { /* end of list */ }
73     },
74 };
75 
76 static ReplicationOps replication_ops = {
77     .start = replication_start,
78     .checkpoint = replication_do_checkpoint,
79     .get_error = replication_get_error,
80     .stop = replication_stop,
81 };
82 
83 static int replication_open(BlockDriverState *bs, QDict *options,
84                             int flags, Error **errp)
85 {
86     int ret;
87     BDRVReplicationState *s = bs->opaque;
88     Error *local_err = NULL;
89     QemuOpts *opts = NULL;
90     const char *mode;
91     const char *top_id;
92 
93     bs->file = bdrv_open_child(NULL, options, "file", bs, &child_file,
94                                false, errp);
95     if (!bs->file) {
96         return -EINVAL;
97     }
98 
99     ret = -EINVAL;
100     opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
101     qemu_opts_absorb_qdict(opts, options, &local_err);
102     if (local_err) {
103         goto fail;
104     }
105 
106     mode = qemu_opt_get(opts, REPLICATION_MODE);
107     if (!mode) {
108         error_setg(&local_err, "Missing the option mode");
109         goto fail;
110     }
111 
112     if (!strcmp(mode, "primary")) {
113         s->mode = REPLICATION_MODE_PRIMARY;
114         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
115         if (top_id) {
116             error_setg(&local_err, "The primary side does not support option top-id");
117             goto fail;
118         }
119     } else if (!strcmp(mode, "secondary")) {
120         s->mode = REPLICATION_MODE_SECONDARY;
121         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
122         s->top_id = g_strdup(top_id);
123         if (!s->top_id) {
124             error_setg(&local_err, "Missing the option top-id");
125             goto fail;
126         }
127     } else {
128         error_setg(&local_err,
129                    "The option mode's value should be primary or secondary");
130         goto fail;
131     }
132 
133     s->rs = replication_new(bs, &replication_ops);
134 
135     ret = 0;
136 
137 fail:
138     qemu_opts_del(opts);
139     error_propagate(errp, local_err);
140 
141     return ret;
142 }
143 
144 static void replication_close(BlockDriverState *bs)
145 {
146     BDRVReplicationState *s = bs->opaque;
147 
148     if (s->stage == BLOCK_REPLICATION_RUNNING) {
149         replication_stop(s->rs, false, NULL);
150     }
151     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
152         job_cancel_sync(&s->commit_job->job);
153     }
154 
155     if (s->mode == REPLICATION_MODE_SECONDARY) {
156         g_free(s->top_id);
157     }
158 
159     replication_remove(s->rs);
160 }
161 
162 static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
163                                    const BdrvChildRole *role,
164                                    BlockReopenQueue *reopen_queue,
165                                    uint64_t perm, uint64_t shared,
166                                    uint64_t *nperm, uint64_t *nshared)
167 {
168     *nperm = BLK_PERM_CONSISTENT_READ;
169     if ((bs->open_flags & (BDRV_O_INACTIVE | BDRV_O_RDWR)) == BDRV_O_RDWR) {
170         *nperm |= BLK_PERM_WRITE;
171     }
172     *nshared = BLK_PERM_CONSISTENT_READ \
173                | BLK_PERM_WRITE \
174                | BLK_PERM_WRITE_UNCHANGED;
175     return;
176 }
177 
178 static int64_t replication_getlength(BlockDriverState *bs)
179 {
180     return bdrv_getlength(bs->file->bs);
181 }
182 
183 static int replication_get_io_status(BDRVReplicationState *s)
184 {
185     switch (s->stage) {
186     case BLOCK_REPLICATION_NONE:
187         return -EIO;
188     case BLOCK_REPLICATION_RUNNING:
189         return 0;
190     case BLOCK_REPLICATION_FAILOVER:
191         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
192     case BLOCK_REPLICATION_FAILOVER_FAILED:
193         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
194     case BLOCK_REPLICATION_DONE:
195         /*
196          * active commit job completes, and active disk and secondary_disk
197          * is swapped, so we can operate bs->file directly
198          */
199         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
200     default:
201         abort();
202     }
203 }
204 
205 static int replication_return_value(BDRVReplicationState *s, int ret)
206 {
207     if (s->mode == REPLICATION_MODE_SECONDARY) {
208         return ret;
209     }
210 
211     if (ret < 0) {
212         s->error = ret;
213         ret = 0;
214     }
215 
216     return ret;
217 }
218 
219 static coroutine_fn int replication_co_readv(BlockDriverState *bs,
220                                              int64_t sector_num,
221                                              int remaining_sectors,
222                                              QEMUIOVector *qiov)
223 {
224     BDRVReplicationState *s = bs->opaque;
225     int ret;
226 
227     if (s->mode == REPLICATION_MODE_PRIMARY) {
228         /* We only use it to forward primary write requests */
229         return -EIO;
230     }
231 
232     ret = replication_get_io_status(s);
233     if (ret < 0) {
234         return ret;
235     }
236 
237     ret = bdrv_co_preadv(bs->file, sector_num * BDRV_SECTOR_SIZE,
238                          remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
239 
240     return replication_return_value(s, ret);
241 }
242 
243 static coroutine_fn int replication_co_writev(BlockDriverState *bs,
244                                               int64_t sector_num,
245                                               int remaining_sectors,
246                                               QEMUIOVector *qiov,
247                                               int flags)
248 {
249     BDRVReplicationState *s = bs->opaque;
250     QEMUIOVector hd_qiov;
251     uint64_t bytes_done = 0;
252     BdrvChild *top = bs->file;
253     BdrvChild *base = s->secondary_disk;
254     BdrvChild *target;
255     int ret;
256     int64_t n;
257 
258     assert(!flags);
259     ret = replication_get_io_status(s);
260     if (ret < 0) {
261         goto out;
262     }
263 
264     if (ret == 0) {
265         ret = bdrv_co_pwritev(top, sector_num * BDRV_SECTOR_SIZE,
266                               remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
267         return replication_return_value(s, ret);
268     }
269 
270     /*
271      * Failover failed, only write to active disk if the sectors
272      * have already been allocated in active disk/hidden disk.
273      */
274     qemu_iovec_init(&hd_qiov, qiov->niov);
275     while (remaining_sectors > 0) {
276         int64_t count;
277 
278         ret = bdrv_is_allocated_above(top->bs, base->bs, false,
279                                       sector_num * BDRV_SECTOR_SIZE,
280                                       remaining_sectors * BDRV_SECTOR_SIZE,
281                                       &count);
282         if (ret < 0) {
283             goto out1;
284         }
285 
286         assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
287         n = count >> BDRV_SECTOR_BITS;
288         qemu_iovec_reset(&hd_qiov);
289         qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
290 
291         target = ret ? top : base;
292         ret = bdrv_co_pwritev(target, sector_num * BDRV_SECTOR_SIZE,
293                               n * BDRV_SECTOR_SIZE, &hd_qiov, 0);
294         if (ret < 0) {
295             goto out1;
296         }
297 
298         remaining_sectors -= n;
299         sector_num += n;
300         bytes_done += count;
301     }
302 
303 out1:
304     qemu_iovec_destroy(&hd_qiov);
305 out:
306     return ret;
307 }
308 
309 static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
310 {
311     Error *local_err = NULL;
312     int ret;
313 
314     if (!s->backup_job) {
315         error_setg(errp, "Backup job was cancelled unexpectedly");
316         return;
317     }
318 
319     backup_do_checkpoint(s->backup_job, &local_err);
320     if (local_err) {
321         error_propagate(errp, local_err);
322         return;
323     }
324 
325     if (!s->active_disk->bs->drv) {
326         error_setg(errp, "Active disk %s is ejected",
327                    s->active_disk->bs->node_name);
328         return;
329     }
330 
331     ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
332     if (ret < 0) {
333         error_setg(errp, "Cannot make active disk empty");
334         return;
335     }
336 
337     if (!s->hidden_disk->bs->drv) {
338         error_setg(errp, "Hidden disk %s is ejected",
339                    s->hidden_disk->bs->node_name);
340         return;
341     }
342 
343     ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
344     if (ret < 0) {
345         error_setg(errp, "Cannot make hidden disk empty");
346         return;
347     }
348 }
349 
350 /* This function is supposed to be called twice:
351  * first with writable = true, then with writable = false.
352  * The first call puts s->hidden_disk and s->secondary_disk in
353  * r/w mode, and the second puts them back in their original state.
354  */
355 static void reopen_backing_file(BlockDriverState *bs, bool writable,
356                                 Error **errp)
357 {
358     BDRVReplicationState *s = bs->opaque;
359     BlockReopenQueue *reopen_queue = NULL;
360     Error *local_err = NULL;
361 
362     if (writable) {
363         s->orig_hidden_read_only = bdrv_is_read_only(s->hidden_disk->bs);
364         s->orig_secondary_read_only = bdrv_is_read_only(s->secondary_disk->bs);
365     }
366 
367     bdrv_subtree_drained_begin(s->hidden_disk->bs);
368     bdrv_subtree_drained_begin(s->secondary_disk->bs);
369 
370     if (s->orig_hidden_read_only) {
371         QDict *opts = qdict_new();
372         qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
373         reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs,
374                                          opts, true);
375     }
376 
377     if (s->orig_secondary_read_only) {
378         QDict *opts = qdict_new();
379         qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
380         reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
381                                          opts, true);
382     }
383 
384     if (reopen_queue) {
385         bdrv_reopen_multiple(reopen_queue, &local_err);
386         error_propagate(errp, local_err);
387     }
388 
389     bdrv_subtree_drained_end(s->hidden_disk->bs);
390     bdrv_subtree_drained_end(s->secondary_disk->bs);
391 }
392 
393 static void backup_job_cleanup(BlockDriverState *bs)
394 {
395     BDRVReplicationState *s = bs->opaque;
396     BlockDriverState *top_bs;
397 
398     top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
399     if (!top_bs) {
400         return;
401     }
402     bdrv_op_unblock_all(top_bs, s->blocker);
403     error_free(s->blocker);
404     reopen_backing_file(bs, false, NULL);
405 }
406 
407 static void backup_job_completed(void *opaque, int ret)
408 {
409     BlockDriverState *bs = opaque;
410     BDRVReplicationState *s = bs->opaque;
411 
412     if (s->stage != BLOCK_REPLICATION_FAILOVER) {
413         /* The backup job is cancelled unexpectedly */
414         s->error = -EIO;
415     }
416 
417     backup_job_cleanup(bs);
418 }
419 
420 static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
421 {
422     BdrvChild *child;
423 
424     /* The bs itself is the top_bs */
425     if (top_bs == bs) {
426         return true;
427     }
428 
429     /* Iterate over top_bs's children */
430     QLIST_FOREACH(child, &top_bs->children, next) {
431         if (child->bs == bs || check_top_bs(child->bs, bs)) {
432             return true;
433         }
434     }
435 
436     return false;
437 }
438 
439 static void replication_start(ReplicationState *rs, ReplicationMode mode,
440                               Error **errp)
441 {
442     BlockDriverState *bs = rs->opaque;
443     BDRVReplicationState *s;
444     BlockDriverState *top_bs;
445     int64_t active_length, hidden_length, disk_length;
446     AioContext *aio_context;
447     Error *local_err = NULL;
448 
449     aio_context = bdrv_get_aio_context(bs);
450     aio_context_acquire(aio_context);
451     s = bs->opaque;
452 
453     if (s->stage == BLOCK_REPLICATION_DONE ||
454         s->stage == BLOCK_REPLICATION_FAILOVER) {
455         /*
456          * This case happens when a secondary is promoted to primary.
457          * Ignore the request because the secondary side of replication
458          * doesn't have to do anything anymore.
459          */
460         aio_context_release(aio_context);
461         return;
462     }
463 
464     if (s->stage != BLOCK_REPLICATION_NONE) {
465         error_setg(errp, "Block replication is running or done");
466         aio_context_release(aio_context);
467         return;
468     }
469 
470     if (s->mode != mode) {
471         error_setg(errp, "The parameter mode's value is invalid, needs %d,"
472                    " but got %d", s->mode, mode);
473         aio_context_release(aio_context);
474         return;
475     }
476 
477     switch (s->mode) {
478     case REPLICATION_MODE_PRIMARY:
479         break;
480     case REPLICATION_MODE_SECONDARY:
481         s->active_disk = bs->file;
482         if (!s->active_disk || !s->active_disk->bs ||
483                                     !s->active_disk->bs->backing) {
484             error_setg(errp, "Active disk doesn't have backing file");
485             aio_context_release(aio_context);
486             return;
487         }
488 
489         s->hidden_disk = s->active_disk->bs->backing;
490         if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
491             error_setg(errp, "Hidden disk doesn't have backing file");
492             aio_context_release(aio_context);
493             return;
494         }
495 
496         s->secondary_disk = s->hidden_disk->bs->backing;
497         if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
498             error_setg(errp, "The secondary disk doesn't have block backend");
499             aio_context_release(aio_context);
500             return;
501         }
502 
503         /* verify the length */
504         active_length = bdrv_getlength(s->active_disk->bs);
505         hidden_length = bdrv_getlength(s->hidden_disk->bs);
506         disk_length = bdrv_getlength(s->secondary_disk->bs);
507         if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
508             active_length != hidden_length || hidden_length != disk_length) {
509             error_setg(errp, "Active disk, hidden disk, secondary disk's length"
510                        " are not the same");
511             aio_context_release(aio_context);
512             return;
513         }
514 
515         /* Must be true, or the bdrv_getlength() calls would have failed */
516         assert(s->active_disk->bs->drv && s->hidden_disk->bs->drv);
517 
518         if (!s->active_disk->bs->drv->bdrv_make_empty ||
519             !s->hidden_disk->bs->drv->bdrv_make_empty) {
520             error_setg(errp,
521                        "Active disk or hidden disk doesn't support make_empty");
522             aio_context_release(aio_context);
523             return;
524         }
525 
526         /* reopen the backing file in r/w mode */
527         reopen_backing_file(bs, true, &local_err);
528         if (local_err) {
529             error_propagate(errp, local_err);
530             aio_context_release(aio_context);
531             return;
532         }
533 
534         /* start backup job now */
535         error_setg(&s->blocker,
536                    "Block device is in use by internal backup job");
537 
538         top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
539         if (!top_bs || !bdrv_is_root_node(top_bs) ||
540             !check_top_bs(top_bs, bs)) {
541             error_setg(errp, "No top_bs or it is invalid");
542             reopen_backing_file(bs, false, NULL);
543             aio_context_release(aio_context);
544             return;
545         }
546         bdrv_op_block_all(top_bs, s->blocker);
547         bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
548 
549         s->backup_job = backup_job_create(
550                                 NULL, s->secondary_disk->bs, s->hidden_disk->bs,
551                                 0, MIRROR_SYNC_MODE_NONE, NULL, 0, false, NULL,
552                                 BLOCKDEV_ON_ERROR_REPORT,
553                                 BLOCKDEV_ON_ERROR_REPORT, JOB_INTERNAL,
554                                 backup_job_completed, bs, NULL, &local_err);
555         if (local_err) {
556             error_propagate(errp, local_err);
557             backup_job_cleanup(bs);
558             aio_context_release(aio_context);
559             return;
560         }
561         job_start(&s->backup_job->job);
562         break;
563     default:
564         aio_context_release(aio_context);
565         abort();
566     }
567 
568     s->stage = BLOCK_REPLICATION_RUNNING;
569 
570     if (s->mode == REPLICATION_MODE_SECONDARY) {
571         secondary_do_checkpoint(s, errp);
572     }
573 
574     s->error = 0;
575     aio_context_release(aio_context);
576 }
577 
578 static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
579 {
580     BlockDriverState *bs = rs->opaque;
581     BDRVReplicationState *s;
582     AioContext *aio_context;
583 
584     aio_context = bdrv_get_aio_context(bs);
585     aio_context_acquire(aio_context);
586     s = bs->opaque;
587 
588     if (s->stage == BLOCK_REPLICATION_DONE ||
589         s->stage == BLOCK_REPLICATION_FAILOVER) {
590         /*
591          * This case happens when a secondary was promoted to primary.
592          * Ignore the request because the secondary side of replication
593          * doesn't have to do anything anymore.
594          */
595         aio_context_release(aio_context);
596         return;
597     }
598 
599     if (s->mode == REPLICATION_MODE_SECONDARY) {
600         secondary_do_checkpoint(s, errp);
601     }
602     aio_context_release(aio_context);
603 }
604 
605 static void replication_get_error(ReplicationState *rs, Error **errp)
606 {
607     BlockDriverState *bs = rs->opaque;
608     BDRVReplicationState *s;
609     AioContext *aio_context;
610 
611     aio_context = bdrv_get_aio_context(bs);
612     aio_context_acquire(aio_context);
613     s = bs->opaque;
614 
615     if (s->stage == BLOCK_REPLICATION_NONE) {
616         error_setg(errp, "Block replication is not running");
617         aio_context_release(aio_context);
618         return;
619     }
620 
621     if (s->error) {
622         error_setg(errp, "I/O error occurred");
623         aio_context_release(aio_context);
624         return;
625     }
626     aio_context_release(aio_context);
627 }
628 
629 static void replication_done(void *opaque, int ret)
630 {
631     BlockDriverState *bs = opaque;
632     BDRVReplicationState *s = bs->opaque;
633 
634     if (ret == 0) {
635         s->stage = BLOCK_REPLICATION_DONE;
636 
637         s->active_disk = NULL;
638         s->secondary_disk = NULL;
639         s->hidden_disk = NULL;
640         s->error = 0;
641     } else {
642         s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
643         s->error = -EIO;
644     }
645 }
646 
647 static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
648 {
649     BlockDriverState *bs = rs->opaque;
650     BDRVReplicationState *s;
651     AioContext *aio_context;
652 
653     aio_context = bdrv_get_aio_context(bs);
654     aio_context_acquire(aio_context);
655     s = bs->opaque;
656 
657     if (s->stage == BLOCK_REPLICATION_DONE ||
658         s->stage == BLOCK_REPLICATION_FAILOVER) {
659         /*
660          * This case happens when a secondary was promoted to primary.
661          * Ignore the request because the secondary side of replication
662          * doesn't have to do anything anymore.
663          */
664         aio_context_release(aio_context);
665         return;
666     }
667 
668     if (s->stage != BLOCK_REPLICATION_RUNNING) {
669         error_setg(errp, "Block replication is not running");
670         aio_context_release(aio_context);
671         return;
672     }
673 
674     switch (s->mode) {
675     case REPLICATION_MODE_PRIMARY:
676         s->stage = BLOCK_REPLICATION_DONE;
677         s->error = 0;
678         break;
679     case REPLICATION_MODE_SECONDARY:
680         /*
681          * This BDS will be closed, and the job should be completed
682          * before the BDS is closed, because we will access hidden
683          * disk, secondary disk in backup_job_completed().
684          */
685         if (s->backup_job) {
686             job_cancel_sync(&s->backup_job->job);
687         }
688 
689         if (!failover) {
690             secondary_do_checkpoint(s, errp);
691             s->stage = BLOCK_REPLICATION_DONE;
692             aio_context_release(aio_context);
693             return;
694         }
695 
696         s->stage = BLOCK_REPLICATION_FAILOVER;
697         s->commit_job = commit_active_start(
698                             NULL, s->active_disk->bs, s->secondary_disk->bs,
699                             JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
700                             NULL, replication_done, bs, true, errp);
701         break;
702     default:
703         aio_context_release(aio_context);
704         abort();
705     }
706     aio_context_release(aio_context);
707 }
708 
709 static const char *const replication_strong_runtime_opts[] = {
710     REPLICATION_MODE,
711     REPLICATION_TOP_ID,
712 
713     NULL
714 };
715 
716 static BlockDriver bdrv_replication = {
717     .format_name                = "replication",
718     .instance_size              = sizeof(BDRVReplicationState),
719 
720     .bdrv_open                  = replication_open,
721     .bdrv_close                 = replication_close,
722     .bdrv_child_perm            = replication_child_perm,
723 
724     .bdrv_getlength             = replication_getlength,
725     .bdrv_co_readv              = replication_co_readv,
726     .bdrv_co_writev             = replication_co_writev,
727 
728     .is_filter                  = true,
729 
730     .has_variable_length        = true,
731     .strong_runtime_opts        = replication_strong_runtime_opts,
732 };
733 
734 static void bdrv_replication_init(void)
735 {
736     bdrv_register(&bdrv_replication);
737 }
738 
739 block_init(bdrv_replication_init);
740