xref: /qemu/block/replication.c (revision 7271a819)
1 /*
2  * Replication Block filter
3  *
4  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
5  * Copyright (c) 2016 Intel Corporation
6  * Copyright (c) 2016 FUJITSU LIMITED
7  *
8  * Author:
9  *   Wen Congyang <wency@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "block/nbd.h"
18 #include "block/blockjob.h"
19 #include "block/block_int.h"
20 #include "block/block_backup.h"
21 #include "sysemu/block-backend.h"
22 #include "qapi/error.h"
23 #include "replication.h"
24 
25 typedef enum {
26     BLOCK_REPLICATION_NONE,             /* block replication is not started */
27     BLOCK_REPLICATION_RUNNING,          /* block replication is running */
28     BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
29     BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
30     BLOCK_REPLICATION_DONE,             /* block replication is done */
31 } ReplicationStage;
32 
33 typedef struct BDRVReplicationState {
34     ReplicationMode mode;
35     ReplicationStage stage;
36     BdrvChild *active_disk;
37     BdrvChild *hidden_disk;
38     BdrvChild *secondary_disk;
39     char *top_id;
40     ReplicationState *rs;
41     Error *blocker;
42     int orig_hidden_flags;
43     int orig_secondary_flags;
44     int error;
45 } BDRVReplicationState;
46 
47 static void replication_start(ReplicationState *rs, ReplicationMode mode,
48                               Error **errp);
49 static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
50 static void replication_get_error(ReplicationState *rs, Error **errp);
51 static void replication_stop(ReplicationState *rs, bool failover,
52                              Error **errp);
53 
54 #define REPLICATION_MODE        "mode"
55 #define REPLICATION_TOP_ID      "top-id"
56 static QemuOptsList replication_runtime_opts = {
57     .name = "replication",
58     .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
59     .desc = {
60         {
61             .name = REPLICATION_MODE,
62             .type = QEMU_OPT_STRING,
63         },
64         {
65             .name = REPLICATION_TOP_ID,
66             .type = QEMU_OPT_STRING,
67         },
68         { /* end of list */ }
69     },
70 };
71 
72 static ReplicationOps replication_ops = {
73     .start = replication_start,
74     .checkpoint = replication_do_checkpoint,
75     .get_error = replication_get_error,
76     .stop = replication_stop,
77 };
78 
79 static int replication_open(BlockDriverState *bs, QDict *options,
80                             int flags, Error **errp)
81 {
82     int ret;
83     BDRVReplicationState *s = bs->opaque;
84     Error *local_err = NULL;
85     QemuOpts *opts = NULL;
86     const char *mode;
87     const char *top_id;
88 
89     bs->file = bdrv_open_child(NULL, options, "file", bs, &child_file,
90                                false, errp);
91     if (!bs->file) {
92         return -EINVAL;
93     }
94 
95     ret = -EINVAL;
96     opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
97     qemu_opts_absorb_qdict(opts, options, &local_err);
98     if (local_err) {
99         goto fail;
100     }
101 
102     mode = qemu_opt_get(opts, REPLICATION_MODE);
103     if (!mode) {
104         error_setg(&local_err, "Missing the option mode");
105         goto fail;
106     }
107 
108     if (!strcmp(mode, "primary")) {
109         s->mode = REPLICATION_MODE_PRIMARY;
110         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
111         if (top_id) {
112             error_setg(&local_err, "The primary side does not support option top-id");
113             goto fail;
114         }
115     } else if (!strcmp(mode, "secondary")) {
116         s->mode = REPLICATION_MODE_SECONDARY;
117         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
118         s->top_id = g_strdup(top_id);
119         if (!s->top_id) {
120             error_setg(&local_err, "Missing the option top-id");
121             goto fail;
122         }
123     } else {
124         error_setg(&local_err,
125                    "The option mode's value should be primary or secondary");
126         goto fail;
127     }
128 
129     s->rs = replication_new(bs, &replication_ops);
130 
131     ret = 0;
132 
133 fail:
134     qemu_opts_del(opts);
135     error_propagate(errp, local_err);
136 
137     return ret;
138 }
139 
140 static void replication_close(BlockDriverState *bs)
141 {
142     BDRVReplicationState *s = bs->opaque;
143 
144     if (s->stage == BLOCK_REPLICATION_RUNNING) {
145         replication_stop(s->rs, false, NULL);
146     }
147     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
148         block_job_cancel_sync(s->active_disk->bs->job);
149     }
150 
151     if (s->mode == REPLICATION_MODE_SECONDARY) {
152         g_free(s->top_id);
153     }
154 
155     replication_remove(s->rs);
156 }
157 
158 static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
159                                    const BdrvChildRole *role,
160                                    BlockReopenQueue *reopen_queue,
161                                    uint64_t perm, uint64_t shared,
162                                    uint64_t *nperm, uint64_t *nshared)
163 {
164     *nperm = *nshared = BLK_PERM_CONSISTENT_READ \
165                         | BLK_PERM_WRITE \
166                         | BLK_PERM_WRITE_UNCHANGED;
167 
168     return;
169 }
170 
171 static int64_t replication_getlength(BlockDriverState *bs)
172 {
173     return bdrv_getlength(bs->file->bs);
174 }
175 
176 static int replication_get_io_status(BDRVReplicationState *s)
177 {
178     switch (s->stage) {
179     case BLOCK_REPLICATION_NONE:
180         return -EIO;
181     case BLOCK_REPLICATION_RUNNING:
182         return 0;
183     case BLOCK_REPLICATION_FAILOVER:
184         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
185     case BLOCK_REPLICATION_FAILOVER_FAILED:
186         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
187     case BLOCK_REPLICATION_DONE:
188         /*
189          * active commit job completes, and active disk and secondary_disk
190          * is swapped, so we can operate bs->file directly
191          */
192         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
193     default:
194         abort();
195     }
196 }
197 
198 static int replication_return_value(BDRVReplicationState *s, int ret)
199 {
200     if (s->mode == REPLICATION_MODE_SECONDARY) {
201         return ret;
202     }
203 
204     if (ret < 0) {
205         s->error = ret;
206         ret = 0;
207     }
208 
209     return ret;
210 }
211 
212 static coroutine_fn int replication_co_readv(BlockDriverState *bs,
213                                              int64_t sector_num,
214                                              int remaining_sectors,
215                                              QEMUIOVector *qiov)
216 {
217     BDRVReplicationState *s = bs->opaque;
218     BdrvChild *child = s->secondary_disk;
219     BlockJob *job = NULL;
220     CowRequest req;
221     int ret;
222 
223     if (s->mode == REPLICATION_MODE_PRIMARY) {
224         /* We only use it to forward primary write requests */
225         return -EIO;
226     }
227 
228     ret = replication_get_io_status(s);
229     if (ret < 0) {
230         return ret;
231     }
232 
233     if (child && child->bs) {
234         job = child->bs->job;
235     }
236 
237     if (job) {
238         uint64_t remaining_bytes = remaining_sectors * BDRV_SECTOR_SIZE;
239 
240         backup_wait_for_overlapping_requests(child->bs->job,
241                                              sector_num * BDRV_SECTOR_SIZE,
242                                              remaining_bytes);
243         backup_cow_request_begin(&req, child->bs->job,
244                                  sector_num * BDRV_SECTOR_SIZE,
245                                  remaining_bytes);
246         ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors,
247                             qiov);
248         backup_cow_request_end(&req);
249         goto out;
250     }
251 
252     ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors, qiov);
253 out:
254     return replication_return_value(s, ret);
255 }
256 
257 static coroutine_fn int replication_co_writev(BlockDriverState *bs,
258                                               int64_t sector_num,
259                                               int remaining_sectors,
260                                               QEMUIOVector *qiov)
261 {
262     BDRVReplicationState *s = bs->opaque;
263     QEMUIOVector hd_qiov;
264     uint64_t bytes_done = 0;
265     BdrvChild *top = bs->file;
266     BdrvChild *base = s->secondary_disk;
267     BdrvChild *target;
268     int ret;
269     int64_t n;
270 
271     ret = replication_get_io_status(s);
272     if (ret < 0) {
273         goto out;
274     }
275 
276     if (ret == 0) {
277         ret = bdrv_co_writev(top, sector_num,
278                              remaining_sectors, qiov);
279         return replication_return_value(s, ret);
280     }
281 
282     /*
283      * Failover failed, only write to active disk if the sectors
284      * have already been allocated in active disk/hidden disk.
285      */
286     qemu_iovec_init(&hd_qiov, qiov->niov);
287     while (remaining_sectors > 0) {
288         int64_t count;
289 
290         ret = bdrv_is_allocated_above(top->bs, base->bs,
291                                       sector_num * BDRV_SECTOR_SIZE,
292                                       remaining_sectors * BDRV_SECTOR_SIZE,
293                                       &count);
294         if (ret < 0) {
295             goto out1;
296         }
297 
298         assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
299         n = count >> BDRV_SECTOR_BITS;
300         qemu_iovec_reset(&hd_qiov);
301         qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
302 
303         target = ret ? top : base;
304         ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
305         if (ret < 0) {
306             goto out1;
307         }
308 
309         remaining_sectors -= n;
310         sector_num += n;
311         bytes_done += count;
312     }
313 
314 out1:
315     qemu_iovec_destroy(&hd_qiov);
316 out:
317     return ret;
318 }
319 
320 static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
321                                                     BlockDriverState *candidate)
322 {
323     return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
324 }
325 
326 static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
327 {
328     Error *local_err = NULL;
329     int ret;
330 
331     if (!s->secondary_disk->bs->job) {
332         error_setg(errp, "Backup job was cancelled unexpectedly");
333         return;
334     }
335 
336     backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
337     if (local_err) {
338         error_propagate(errp, local_err);
339         return;
340     }
341 
342     ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
343     if (ret < 0) {
344         error_setg(errp, "Cannot make active disk empty");
345         return;
346     }
347 
348     ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
349     if (ret < 0) {
350         error_setg(errp, "Cannot make hidden disk empty");
351         return;
352     }
353 }
354 
355 static void reopen_backing_file(BlockDriverState *bs, bool writable,
356                                 Error **errp)
357 {
358     BDRVReplicationState *s = bs->opaque;
359     BlockReopenQueue *reopen_queue = NULL;
360     int orig_hidden_flags, orig_secondary_flags;
361     int new_hidden_flags, new_secondary_flags;
362     Error *local_err = NULL;
363 
364     if (writable) {
365         orig_hidden_flags = s->orig_hidden_flags =
366                                 bdrv_get_flags(s->hidden_disk->bs);
367         new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
368                                                     ~BDRV_O_INACTIVE;
369         orig_secondary_flags = s->orig_secondary_flags =
370                                 bdrv_get_flags(s->secondary_disk->bs);
371         new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
372                                                      ~BDRV_O_INACTIVE;
373     } else {
374         orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
375                                                     ~BDRV_O_INACTIVE;
376         new_hidden_flags = s->orig_hidden_flags;
377         orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
378                                                     ~BDRV_O_INACTIVE;
379         new_secondary_flags = s->orig_secondary_flags;
380     }
381 
382     if (orig_hidden_flags != new_hidden_flags) {
383         reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
384                                          new_hidden_flags);
385     }
386 
387     if (!(orig_secondary_flags & BDRV_O_RDWR)) {
388         reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
389                                          NULL, new_secondary_flags);
390     }
391 
392     if (reopen_queue) {
393         bdrv_reopen_multiple(bdrv_get_aio_context(bs),
394                              reopen_queue, &local_err);
395         error_propagate(errp, local_err);
396     }
397 }
398 
399 static void backup_job_cleanup(BlockDriverState *bs)
400 {
401     BDRVReplicationState *s = bs->opaque;
402     BlockDriverState *top_bs;
403 
404     top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
405     if (!top_bs) {
406         return;
407     }
408     bdrv_op_unblock_all(top_bs, s->blocker);
409     error_free(s->blocker);
410     reopen_backing_file(bs, false, NULL);
411 }
412 
413 static void backup_job_completed(void *opaque, int ret)
414 {
415     BlockDriverState *bs = opaque;
416     BDRVReplicationState *s = bs->opaque;
417 
418     if (s->stage != BLOCK_REPLICATION_FAILOVER) {
419         /* The backup job is cancelled unexpectedly */
420         s->error = -EIO;
421     }
422 
423     backup_job_cleanup(bs);
424 }
425 
426 static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
427 {
428     BdrvChild *child;
429 
430     /* The bs itself is the top_bs */
431     if (top_bs == bs) {
432         return true;
433     }
434 
435     /* Iterate over top_bs's children */
436     QLIST_FOREACH(child, &top_bs->children, next) {
437         if (child->bs == bs || check_top_bs(child->bs, bs)) {
438             return true;
439         }
440     }
441 
442     return false;
443 }
444 
445 static void replication_start(ReplicationState *rs, ReplicationMode mode,
446                               Error **errp)
447 {
448     BlockDriverState *bs = rs->opaque;
449     BDRVReplicationState *s;
450     BlockDriverState *top_bs;
451     int64_t active_length, hidden_length, disk_length;
452     AioContext *aio_context;
453     Error *local_err = NULL;
454     BlockJob *job;
455 
456     aio_context = bdrv_get_aio_context(bs);
457     aio_context_acquire(aio_context);
458     s = bs->opaque;
459 
460     if (s->stage != BLOCK_REPLICATION_NONE) {
461         error_setg(errp, "Block replication is running or done");
462         aio_context_release(aio_context);
463         return;
464     }
465 
466     if (s->mode != mode) {
467         error_setg(errp, "The parameter mode's value is invalid, needs %d,"
468                    " but got %d", s->mode, mode);
469         aio_context_release(aio_context);
470         return;
471     }
472 
473     switch (s->mode) {
474     case REPLICATION_MODE_PRIMARY:
475         break;
476     case REPLICATION_MODE_SECONDARY:
477         s->active_disk = bs->file;
478         if (!s->active_disk || !s->active_disk->bs ||
479                                     !s->active_disk->bs->backing) {
480             error_setg(errp, "Active disk doesn't have backing file");
481             aio_context_release(aio_context);
482             return;
483         }
484 
485         s->hidden_disk = s->active_disk->bs->backing;
486         if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
487             error_setg(errp, "Hidden disk doesn't have backing file");
488             aio_context_release(aio_context);
489             return;
490         }
491 
492         s->secondary_disk = s->hidden_disk->bs->backing;
493         if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
494             error_setg(errp, "The secondary disk doesn't have block backend");
495             aio_context_release(aio_context);
496             return;
497         }
498 
499         /* verify the length */
500         active_length = bdrv_getlength(s->active_disk->bs);
501         hidden_length = bdrv_getlength(s->hidden_disk->bs);
502         disk_length = bdrv_getlength(s->secondary_disk->bs);
503         if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
504             active_length != hidden_length || hidden_length != disk_length) {
505             error_setg(errp, "Active disk, hidden disk, secondary disk's length"
506                        " are not the same");
507             aio_context_release(aio_context);
508             return;
509         }
510 
511         if (!s->active_disk->bs->drv->bdrv_make_empty ||
512             !s->hidden_disk->bs->drv->bdrv_make_empty) {
513             error_setg(errp,
514                        "Active disk or hidden disk doesn't support make_empty");
515             aio_context_release(aio_context);
516             return;
517         }
518 
519         /* reopen the backing file in r/w mode */
520         reopen_backing_file(bs, true, &local_err);
521         if (local_err) {
522             error_propagate(errp, local_err);
523             aio_context_release(aio_context);
524             return;
525         }
526 
527         /* start backup job now */
528         error_setg(&s->blocker,
529                    "Block device is in use by internal backup job");
530 
531         top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
532         if (!top_bs || !bdrv_is_root_node(top_bs) ||
533             !check_top_bs(top_bs, bs)) {
534             error_setg(errp, "No top_bs or it is invalid");
535             reopen_backing_file(bs, false, NULL);
536             aio_context_release(aio_context);
537             return;
538         }
539         bdrv_op_block_all(top_bs, s->blocker);
540         bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
541 
542         job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs,
543                                 0, MIRROR_SYNC_MODE_NONE, NULL, false,
544                                 BLOCKDEV_ON_ERROR_REPORT,
545                                 BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL,
546                                 backup_job_completed, bs, NULL, &local_err);
547         if (local_err) {
548             error_propagate(errp, local_err);
549             backup_job_cleanup(bs);
550             aio_context_release(aio_context);
551             return;
552         }
553         block_job_start(job);
554         break;
555     default:
556         aio_context_release(aio_context);
557         abort();
558     }
559 
560     s->stage = BLOCK_REPLICATION_RUNNING;
561 
562     if (s->mode == REPLICATION_MODE_SECONDARY) {
563         secondary_do_checkpoint(s, errp);
564     }
565 
566     s->error = 0;
567     aio_context_release(aio_context);
568 }
569 
570 static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
571 {
572     BlockDriverState *bs = rs->opaque;
573     BDRVReplicationState *s;
574     AioContext *aio_context;
575 
576     aio_context = bdrv_get_aio_context(bs);
577     aio_context_acquire(aio_context);
578     s = bs->opaque;
579 
580     if (s->mode == REPLICATION_MODE_SECONDARY) {
581         secondary_do_checkpoint(s, errp);
582     }
583     aio_context_release(aio_context);
584 }
585 
586 static void replication_get_error(ReplicationState *rs, Error **errp)
587 {
588     BlockDriverState *bs = rs->opaque;
589     BDRVReplicationState *s;
590     AioContext *aio_context;
591 
592     aio_context = bdrv_get_aio_context(bs);
593     aio_context_acquire(aio_context);
594     s = bs->opaque;
595 
596     if (s->stage != BLOCK_REPLICATION_RUNNING) {
597         error_setg(errp, "Block replication is not running");
598         aio_context_release(aio_context);
599         return;
600     }
601 
602     if (s->error) {
603         error_setg(errp, "I/O error occurred");
604         aio_context_release(aio_context);
605         return;
606     }
607     aio_context_release(aio_context);
608 }
609 
610 static void replication_done(void *opaque, int ret)
611 {
612     BlockDriverState *bs = opaque;
613     BDRVReplicationState *s = bs->opaque;
614 
615     if (ret == 0) {
616         s->stage = BLOCK_REPLICATION_DONE;
617 
618         /* refresh top bs's filename */
619         bdrv_refresh_filename(bs);
620         s->active_disk = NULL;
621         s->secondary_disk = NULL;
622         s->hidden_disk = NULL;
623         s->error = 0;
624     } else {
625         s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
626         s->error = -EIO;
627     }
628 }
629 
630 static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
631 {
632     BlockDriverState *bs = rs->opaque;
633     BDRVReplicationState *s;
634     AioContext *aio_context;
635 
636     aio_context = bdrv_get_aio_context(bs);
637     aio_context_acquire(aio_context);
638     s = bs->opaque;
639 
640     if (s->stage != BLOCK_REPLICATION_RUNNING) {
641         error_setg(errp, "Block replication is not running");
642         aio_context_release(aio_context);
643         return;
644     }
645 
646     switch (s->mode) {
647     case REPLICATION_MODE_PRIMARY:
648         s->stage = BLOCK_REPLICATION_DONE;
649         s->error = 0;
650         break;
651     case REPLICATION_MODE_SECONDARY:
652         /*
653          * This BDS will be closed, and the job should be completed
654          * before the BDS is closed, because we will access hidden
655          * disk, secondary disk in backup_job_completed().
656          */
657         if (s->secondary_disk->bs->job) {
658             block_job_cancel_sync(s->secondary_disk->bs->job);
659         }
660 
661         if (!failover) {
662             secondary_do_checkpoint(s, errp);
663             s->stage = BLOCK_REPLICATION_DONE;
664             aio_context_release(aio_context);
665             return;
666         }
667 
668         s->stage = BLOCK_REPLICATION_FAILOVER;
669         commit_active_start(NULL, s->active_disk->bs, s->secondary_disk->bs,
670                             BLOCK_JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
671                             NULL, replication_done, bs, true, errp);
672         break;
673     default:
674         aio_context_release(aio_context);
675         abort();
676     }
677     aio_context_release(aio_context);
678 }
679 
680 BlockDriver bdrv_replication = {
681     .format_name                = "replication",
682     .protocol_name              = "replication",
683     .instance_size              = sizeof(BDRVReplicationState),
684 
685     .bdrv_open                  = replication_open,
686     .bdrv_close                 = replication_close,
687     .bdrv_child_perm            = replication_child_perm,
688 
689     .bdrv_getlength             = replication_getlength,
690     .bdrv_co_readv              = replication_co_readv,
691     .bdrv_co_writev             = replication_co_writev,
692 
693     .is_filter                  = true,
694     .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
695 
696     .has_variable_length        = true,
697 };
698 
699 static void bdrv_replication_init(void)
700 {
701     bdrv_register(&bdrv_replication);
702 }
703 
704 block_init(bdrv_replication_init);
705