xref: /qemu/block/mirror.c (revision 372b69f5)
1 /*
2  * Image mirroring
3  *
4  * Copyright Red Hat, Inc. 2012
5  *
6  * Authors:
7  *  Paolo Bonzini  <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qemu/cutils.h"
16 #include "qemu/coroutine.h"
17 #include "qemu/range.h"
18 #include "trace.h"
19 #include "block/blockjob_int.h"
20 #include "block/block_int.h"
21 #include "block/dirty-bitmap.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qemu/ratelimit.h"
25 #include "qemu/bitmap.h"
26 #include "qemu/memalign.h"
27 
28 #define MAX_IN_FLIGHT 16
29 #define MAX_IO_BYTES (1 << 20) /* 1 Mb */
30 #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
31 
32 /* The mirroring buffer is a list of granularity-sized chunks.
33  * Free chunks are organized in a list.
34  */
35 typedef struct MirrorBuffer {
36     QSIMPLEQ_ENTRY(MirrorBuffer) next;
37 } MirrorBuffer;
38 
39 typedef struct MirrorOp MirrorOp;
40 
41 typedef struct MirrorBlockJob {
42     BlockJob common;
43     BlockBackend *target;
44     BlockDriverState *mirror_top_bs;
45     BlockDriverState *base;
46     BlockDriverState *base_overlay;
47 
48     /* The name of the graph node to replace */
49     char *replaces;
50     /* The BDS to replace */
51     BlockDriverState *to_replace;
52     /* Used to block operations on the drive-mirror-replace target */
53     Error *replace_blocker;
54     bool is_none_mode;
55     BlockMirrorBackingMode backing_mode;
56     /* Whether the target image requires explicit zero-initialization */
57     bool zero_target;
58     /*
59      * To be accesssed with atomics. Written only under the BQL (required by the
60      * current implementation of mirror_change()).
61      */
62     MirrorCopyMode copy_mode;
63     BlockdevOnError on_source_error, on_target_error;
64     /*
65      * To be accessed with atomics.
66      *
67      * Set when the target is synced (dirty bitmap is clean, nothing in flight)
68      * and the job is running in active mode.
69      */
70     bool actively_synced;
71     bool should_complete;
72     int64_t granularity;
73     size_t buf_size;
74     int64_t bdev_length;
75     unsigned long *cow_bitmap;
76     BdrvDirtyBitmap *dirty_bitmap;
77     BdrvDirtyBitmapIter *dbi;
78     uint8_t *buf;
79     QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
80     int buf_free_count;
81 
82     uint64_t last_pause_ns;
83     unsigned long *in_flight_bitmap;
84     unsigned in_flight;
85     int64_t bytes_in_flight;
86     QTAILQ_HEAD(, MirrorOp) ops_in_flight;
87     int ret;
88     bool unmap;
89     int target_cluster_size;
90     int max_iov;
91     bool initial_zeroing_ongoing;
92     int in_active_write_counter;
93     int64_t active_write_bytes_in_flight;
94     bool prepared;
95     bool in_drain;
96 } MirrorBlockJob;
97 
98 typedef struct MirrorBDSOpaque {
99     MirrorBlockJob *job;
100     bool stop;
101     bool is_commit;
102 } MirrorBDSOpaque;
103 
104 struct MirrorOp {
105     MirrorBlockJob *s;
106     QEMUIOVector qiov;
107     int64_t offset;
108     uint64_t bytes;
109 
110     /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
111      * mirror_co_discard() before yielding for the first time */
112     int64_t *bytes_handled;
113 
114     bool is_pseudo_op;
115     bool is_active_write;
116     bool is_in_flight;
117     CoQueue waiting_requests;
118     Coroutine *co;
119     MirrorOp *waiting_for_op;
120 
121     QTAILQ_ENTRY(MirrorOp) next;
122 };
123 
124 typedef enum MirrorMethod {
125     MIRROR_METHOD_COPY,
126     MIRROR_METHOD_ZERO,
127     MIRROR_METHOD_DISCARD,
128 } MirrorMethod;
129 
130 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
131                                             int error)
132 {
133     qatomic_set(&s->actively_synced, false);
134     if (read) {
135         return block_job_error_action(&s->common, s->on_source_error,
136                                       true, error);
137     } else {
138         return block_job_error_action(&s->common, s->on_target_error,
139                                       false, error);
140     }
141 }
142 
143 static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
144                                                   MirrorBlockJob *s,
145                                                   uint64_t offset,
146                                                   uint64_t bytes)
147 {
148     uint64_t self_start_chunk = offset / s->granularity;
149     uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
150     uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
151 
152     while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
153                          self_start_chunk) < self_end_chunk &&
154            s->ret >= 0)
155     {
156         MirrorOp *op;
157 
158         QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
159             uint64_t op_start_chunk = op->offset / s->granularity;
160             uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
161                                                  s->granularity) -
162                                     op_start_chunk;
163 
164             if (op == self) {
165                 continue;
166             }
167 
168             if (ranges_overlap(self_start_chunk, self_nb_chunks,
169                                op_start_chunk, op_nb_chunks))
170             {
171                 if (self) {
172                     /*
173                      * If the operation is already (indirectly) waiting for us,
174                      * or will wait for us as soon as it wakes up, then just go
175                      * on (instead of producing a deadlock in the former case).
176                      */
177                     if (op->waiting_for_op) {
178                         continue;
179                     }
180 
181                     self->waiting_for_op = op;
182                 }
183 
184                 qemu_co_queue_wait(&op->waiting_requests, NULL);
185 
186                 if (self) {
187                     self->waiting_for_op = NULL;
188                 }
189 
190                 break;
191             }
192         }
193     }
194 }
195 
196 static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
197 {
198     MirrorBlockJob *s = op->s;
199     struct iovec *iov;
200     int64_t chunk_num;
201     int i, nb_chunks;
202 
203     trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
204 
205     s->in_flight--;
206     s->bytes_in_flight -= op->bytes;
207     iov = op->qiov.iov;
208     for (i = 0; i < op->qiov.niov; i++) {
209         MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
210         QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
211         s->buf_free_count++;
212     }
213 
214     chunk_num = op->offset / s->granularity;
215     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
216 
217     bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
218     QTAILQ_REMOVE(&s->ops_in_flight, op, next);
219     if (ret >= 0) {
220         if (s->cow_bitmap) {
221             bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
222         }
223         if (!s->initial_zeroing_ongoing) {
224             job_progress_update(&s->common.job, op->bytes);
225         }
226     }
227     qemu_iovec_destroy(&op->qiov);
228 
229     qemu_co_queue_restart_all(&op->waiting_requests);
230     g_free(op);
231 }
232 
233 static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
234 {
235     MirrorBlockJob *s = op->s;
236 
237     if (ret < 0) {
238         BlockErrorAction action;
239 
240         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
241         action = mirror_error_action(s, false, -ret);
242         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
243             s->ret = ret;
244         }
245     }
246 
247     mirror_iteration_done(op, ret);
248 }
249 
250 static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
251 {
252     MirrorBlockJob *s = op->s;
253 
254     if (ret < 0) {
255         BlockErrorAction action;
256 
257         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
258         action = mirror_error_action(s, true, -ret);
259         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
260             s->ret = ret;
261         }
262 
263         mirror_iteration_done(op, ret);
264         return;
265     }
266 
267     ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
268     mirror_write_complete(op, ret);
269 }
270 
271 /* Clip bytes relative to offset to not exceed end-of-file */
272 static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
273                                         int64_t offset,
274                                         int64_t bytes)
275 {
276     return MIN(bytes, s->bdev_length - offset);
277 }
278 
279 /* Round offset and/or bytes to target cluster if COW is needed, and
280  * return the offset of the adjusted tail against original. */
281 static int coroutine_fn mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
282                                          uint64_t *bytes)
283 {
284     bool need_cow;
285     int ret = 0;
286     int64_t align_offset = *offset;
287     int64_t align_bytes = *bytes;
288     int max_bytes = s->granularity * s->max_iov;
289 
290     need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
291     need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
292                           s->cow_bitmap);
293     if (need_cow) {
294         bdrv_round_to_subclusters(blk_bs(s->target), *offset, *bytes,
295                                   &align_offset, &align_bytes);
296     }
297 
298     if (align_bytes > max_bytes) {
299         align_bytes = max_bytes;
300         if (need_cow) {
301             align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
302         }
303     }
304     /* Clipping may result in align_bytes unaligned to chunk boundary, but
305      * that doesn't matter because it's already the end of source image. */
306     align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
307 
308     ret = align_offset + align_bytes - (*offset + *bytes);
309     *offset = align_offset;
310     *bytes = align_bytes;
311     assert(ret >= 0);
312     return ret;
313 }
314 
315 static inline void coroutine_fn
316 mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
317 {
318     MirrorOp *op;
319 
320     QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
321         /*
322          * Do not wait on pseudo ops, because it may in turn wait on
323          * some other operation to start, which may in fact be the
324          * caller of this function.  Since there is only one pseudo op
325          * at any given time, we will always find some real operation
326          * to wait on.
327          * Also, do not wait on active operations, because they do not
328          * use up in-flight slots.
329          */
330         if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
331             qemu_co_queue_wait(&op->waiting_requests, NULL);
332             return;
333         }
334     }
335     abort();
336 }
337 
338 /* Perform a mirror copy operation.
339  *
340  * *op->bytes_handled is set to the number of bytes copied after and
341  * including offset, excluding any bytes copied prior to offset due
342  * to alignment.  This will be op->bytes if no alignment is necessary,
343  * or (new_end - op->offset) if the tail is rounded up or down due to
344  * alignment or buffer limit.
345  */
346 static void coroutine_fn mirror_co_read(void *opaque)
347 {
348     MirrorOp *op = opaque;
349     MirrorBlockJob *s = op->s;
350     int nb_chunks;
351     uint64_t ret;
352     uint64_t max_bytes;
353 
354     max_bytes = s->granularity * s->max_iov;
355 
356     /* We can only handle as much as buf_size at a time. */
357     op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
358     assert(op->bytes);
359     assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
360     *op->bytes_handled = op->bytes;
361 
362     if (s->cow_bitmap) {
363         *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
364     }
365     /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
366     assert(*op->bytes_handled <= UINT_MAX);
367     assert(op->bytes <= s->buf_size);
368     /* The offset is granularity-aligned because:
369      * 1) Caller passes in aligned values;
370      * 2) mirror_cow_align is used only when target cluster is larger. */
371     assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
372     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
373     assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
374     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
375 
376     while (s->buf_free_count < nb_chunks) {
377         trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
378         mirror_wait_for_free_in_flight_slot(s);
379     }
380 
381     /* Now make a QEMUIOVector taking enough granularity-sized chunks
382      * from s->buf_free.
383      */
384     qemu_iovec_init(&op->qiov, nb_chunks);
385     while (nb_chunks-- > 0) {
386         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
387         size_t remaining = op->bytes - op->qiov.size;
388 
389         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
390         s->buf_free_count--;
391         qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
392     }
393 
394     /* Copy the dirty cluster.  */
395     s->in_flight++;
396     s->bytes_in_flight += op->bytes;
397     op->is_in_flight = true;
398     trace_mirror_one_iteration(s, op->offset, op->bytes);
399 
400     WITH_GRAPH_RDLOCK_GUARD() {
401         ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
402                              &op->qiov, 0);
403     }
404     mirror_read_complete(op, ret);
405 }
406 
407 static void coroutine_fn mirror_co_zero(void *opaque)
408 {
409     MirrorOp *op = opaque;
410     int ret;
411 
412     op->s->in_flight++;
413     op->s->bytes_in_flight += op->bytes;
414     *op->bytes_handled = op->bytes;
415     op->is_in_flight = true;
416 
417     ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
418                                op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
419     mirror_write_complete(op, ret);
420 }
421 
422 static void coroutine_fn mirror_co_discard(void *opaque)
423 {
424     MirrorOp *op = opaque;
425     int ret;
426 
427     op->s->in_flight++;
428     op->s->bytes_in_flight += op->bytes;
429     *op->bytes_handled = op->bytes;
430     op->is_in_flight = true;
431 
432     ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
433     mirror_write_complete(op, ret);
434 }
435 
436 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
437                                unsigned bytes, MirrorMethod mirror_method)
438 {
439     MirrorOp *op;
440     Coroutine *co;
441     int64_t bytes_handled = -1;
442 
443     op = g_new(MirrorOp, 1);
444     *op = (MirrorOp){
445         .s              = s,
446         .offset         = offset,
447         .bytes          = bytes,
448         .bytes_handled  = &bytes_handled,
449     };
450     qemu_co_queue_init(&op->waiting_requests);
451 
452     switch (mirror_method) {
453     case MIRROR_METHOD_COPY:
454         co = qemu_coroutine_create(mirror_co_read, op);
455         break;
456     case MIRROR_METHOD_ZERO:
457         co = qemu_coroutine_create(mirror_co_zero, op);
458         break;
459     case MIRROR_METHOD_DISCARD:
460         co = qemu_coroutine_create(mirror_co_discard, op);
461         break;
462     default:
463         abort();
464     }
465     op->co = co;
466 
467     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
468     qemu_coroutine_enter(co);
469     /* At this point, ownership of op has been moved to the coroutine
470      * and the object may already be freed */
471 
472     /* Assert that this value has been set */
473     assert(bytes_handled >= 0);
474 
475     /* Same assertion as in mirror_co_read() (and for mirror_co_read()
476      * and mirror_co_discard(), bytes_handled == op->bytes, which
477      * is the @bytes parameter given to this function) */
478     assert(bytes_handled <= UINT_MAX);
479     return bytes_handled;
480 }
481 
482 static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
483 {
484     BlockDriverState *source = s->mirror_top_bs->backing->bs;
485     MirrorOp *pseudo_op;
486     int64_t offset;
487     /* At least the first dirty chunk is mirrored in one iteration. */
488     int nb_chunks = 1;
489     bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
490     int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
491 
492     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
493     offset = bdrv_dirty_iter_next(s->dbi);
494     if (offset < 0) {
495         bdrv_set_dirty_iter(s->dbi, 0);
496         offset = bdrv_dirty_iter_next(s->dbi);
497         trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
498         assert(offset >= 0);
499     }
500     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
501 
502     /*
503      * Wait for concurrent requests to @offset.  The next loop will limit the
504      * copied area based on in_flight_bitmap so we only copy an area that does
505      * not overlap with concurrent in-flight requests.  Still, we would like to
506      * copy something, so wait until there are at least no more requests to the
507      * very beginning of the area.
508      */
509     mirror_wait_on_conflicts(NULL, s, offset, 1);
510 
511     job_pause_point(&s->common.job);
512 
513     /* Find the number of consecutive dirty chunks following the first dirty
514      * one, and wait for in flight requests in them. */
515     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
516     while (nb_chunks * s->granularity < s->buf_size) {
517         int64_t next_dirty;
518         int64_t next_offset = offset + nb_chunks * s->granularity;
519         int64_t next_chunk = next_offset / s->granularity;
520         if (next_offset >= s->bdev_length ||
521             !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) {
522             break;
523         }
524         if (test_bit(next_chunk, s->in_flight_bitmap)) {
525             break;
526         }
527 
528         next_dirty = bdrv_dirty_iter_next(s->dbi);
529         if (next_dirty > next_offset || next_dirty < 0) {
530             /* The bitmap iterator's cache is stale, refresh it */
531             bdrv_set_dirty_iter(s->dbi, next_offset);
532             next_dirty = bdrv_dirty_iter_next(s->dbi);
533         }
534         assert(next_dirty == next_offset);
535         nb_chunks++;
536     }
537 
538     /* Clear dirty bits before querying the block status, because
539      * calling bdrv_block_status_above could yield - if some blocks are
540      * marked dirty in this window, we need to know.
541      */
542     bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
543                                    nb_chunks * s->granularity);
544     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
545 
546     /* Before claiming an area in the in-flight bitmap, we have to
547      * create a MirrorOp for it so that conflicting requests can wait
548      * for it.  mirror_perform() will create the real MirrorOps later,
549      * for now we just create a pseudo operation that will wake up all
550      * conflicting requests once all real operations have been
551      * launched. */
552     pseudo_op = g_new(MirrorOp, 1);
553     *pseudo_op = (MirrorOp){
554         .offset         = offset,
555         .bytes          = nb_chunks * s->granularity,
556         .is_pseudo_op   = true,
557     };
558     qemu_co_queue_init(&pseudo_op->waiting_requests);
559     QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
560 
561     bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
562     while (nb_chunks > 0 && offset < s->bdev_length) {
563         int ret;
564         int64_t io_bytes;
565         int64_t io_bytes_acct;
566         MirrorMethod mirror_method = MIRROR_METHOD_COPY;
567 
568         assert(!(offset % s->granularity));
569         WITH_GRAPH_RDLOCK_GUARD() {
570             ret = bdrv_co_block_status_above(source, NULL, offset,
571                                              nb_chunks * s->granularity,
572                                              &io_bytes, NULL, NULL);
573         }
574         if (ret < 0) {
575             io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
576         } else if (ret & BDRV_BLOCK_DATA) {
577             io_bytes = MIN(io_bytes, max_io_bytes);
578         }
579 
580         io_bytes -= io_bytes % s->granularity;
581         if (io_bytes < s->granularity) {
582             io_bytes = s->granularity;
583         } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
584             int64_t target_offset;
585             int64_t target_bytes;
586             WITH_GRAPH_RDLOCK_GUARD() {
587                 bdrv_round_to_subclusters(blk_bs(s->target), offset, io_bytes,
588                                           &target_offset, &target_bytes);
589             }
590             if (target_offset == offset &&
591                 target_bytes == io_bytes) {
592                 mirror_method = ret & BDRV_BLOCK_ZERO ?
593                                     MIRROR_METHOD_ZERO :
594                                     MIRROR_METHOD_DISCARD;
595             }
596         }
597 
598         while (s->in_flight >= MAX_IN_FLIGHT) {
599             trace_mirror_yield_in_flight(s, offset, s->in_flight);
600             mirror_wait_for_free_in_flight_slot(s);
601         }
602 
603         if (s->ret < 0) {
604             ret = 0;
605             goto fail;
606         }
607 
608         io_bytes = mirror_clip_bytes(s, offset, io_bytes);
609         io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
610         if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
611             io_bytes_acct = 0;
612         } else {
613             io_bytes_acct = io_bytes;
614         }
615         assert(io_bytes);
616         offset += io_bytes;
617         nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
618         block_job_ratelimit_processed_bytes(&s->common, io_bytes_acct);
619     }
620 
621 fail:
622     QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
623     qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
624     g_free(pseudo_op);
625 }
626 
627 static void mirror_free_init(MirrorBlockJob *s)
628 {
629     int granularity = s->granularity;
630     size_t buf_size = s->buf_size;
631     uint8_t *buf = s->buf;
632 
633     assert(s->buf_free_count == 0);
634     QSIMPLEQ_INIT(&s->buf_free);
635     while (buf_size != 0) {
636         MirrorBuffer *cur = (MirrorBuffer *)buf;
637         QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
638         s->buf_free_count++;
639         buf_size -= granularity;
640         buf += granularity;
641     }
642 }
643 
644 /* This is also used for the .pause callback. There is no matching
645  * mirror_resume() because mirror_run() will begin iterating again
646  * when the job is resumed.
647  */
648 static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s)
649 {
650     while (s->in_flight > 0) {
651         mirror_wait_for_free_in_flight_slot(s);
652     }
653 }
654 
655 /**
656  * mirror_exit_common: handle both abort() and prepare() cases.
657  * for .prepare, returns 0 on success and -errno on failure.
658  * for .abort cases, denoted by abort = true, MUST return 0.
659  */
660 static int mirror_exit_common(Job *job)
661 {
662     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
663     BlockJob *bjob = &s->common;
664     MirrorBDSOpaque *bs_opaque;
665     AioContext *replace_aio_context = NULL;
666     BlockDriverState *src;
667     BlockDriverState *target_bs;
668     BlockDriverState *mirror_top_bs;
669     Error *local_err = NULL;
670     bool abort = job->ret < 0;
671     int ret = 0;
672 
673     GLOBAL_STATE_CODE();
674 
675     if (s->prepared) {
676         return 0;
677     }
678     s->prepared = true;
679 
680     aio_context_acquire(qemu_get_aio_context());
681 
682     mirror_top_bs = s->mirror_top_bs;
683     bs_opaque = mirror_top_bs->opaque;
684     src = mirror_top_bs->backing->bs;
685     target_bs = blk_bs(s->target);
686 
687     if (bdrv_chain_contains(src, target_bs)) {
688         bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs);
689     }
690 
691     bdrv_release_dirty_bitmap(s->dirty_bitmap);
692 
693     /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
694      * before we can call bdrv_drained_end */
695     bdrv_ref(src);
696     bdrv_ref(mirror_top_bs);
697     bdrv_ref(target_bs);
698 
699     /*
700      * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
701      * inserting target_bs at s->to_replace, where we might not be able to get
702      * these permissions.
703      */
704     blk_unref(s->target);
705     s->target = NULL;
706 
707     /* We don't access the source any more. Dropping any WRITE/RESIZE is
708      * required before it could become a backing file of target_bs. Not having
709      * these permissions any more means that we can't allow any new requests on
710      * mirror_top_bs from now on, so keep it drained. */
711     bdrv_drained_begin(mirror_top_bs);
712     bs_opaque->stop = true;
713 
714     bdrv_graph_rdlock_main_loop();
715     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
716                              &error_abort);
717     bdrv_graph_rdunlock_main_loop();
718 
719     if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
720         BlockDriverState *backing = s->is_none_mode ? src : s->base;
721         BlockDriverState *unfiltered_target = bdrv_skip_filters(target_bs);
722 
723         if (bdrv_cow_bs(unfiltered_target) != backing) {
724             bdrv_set_backing_hd(unfiltered_target, backing, &local_err);
725             if (local_err) {
726                 error_report_err(local_err);
727                 local_err = NULL;
728                 ret = -EPERM;
729             }
730         }
731     } else if (!abort && s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
732         assert(!bdrv_backing_chain_next(target_bs));
733         ret = bdrv_open_backing_file(bdrv_skip_filters(target_bs), NULL,
734                                      "backing", &local_err);
735         if (ret < 0) {
736             error_report_err(local_err);
737             local_err = NULL;
738         }
739     }
740 
741     if (s->to_replace) {
742         replace_aio_context = bdrv_get_aio_context(s->to_replace);
743         aio_context_acquire(replace_aio_context);
744     }
745 
746     if (s->should_complete && !abort) {
747         BlockDriverState *to_replace = s->to_replace ?: src;
748         bool ro = bdrv_is_read_only(to_replace);
749 
750         if (ro != bdrv_is_read_only(target_bs)) {
751             bdrv_reopen_set_read_only(target_bs, ro, NULL);
752         }
753 
754         /* The mirror job has no requests in flight any more, but we need to
755          * drain potential other users of the BDS before changing the graph. */
756         assert(s->in_drain);
757         bdrv_drained_begin(target_bs);
758         /*
759          * Cannot use check_to_replace_node() here, because that would
760          * check for an op blocker on @to_replace, and we have our own
761          * there.
762          *
763          * TODO Pull out the writer lock from bdrv_replace_node() to here
764          */
765         bdrv_graph_rdlock_main_loop();
766         if (bdrv_recurse_can_replace(src, to_replace)) {
767             bdrv_replace_node(to_replace, target_bs, &local_err);
768         } else {
769             error_setg(&local_err, "Can no longer replace '%s' by '%s', "
770                        "because it can no longer be guaranteed that doing so "
771                        "would not lead to an abrupt change of visible data",
772                        to_replace->node_name, target_bs->node_name);
773         }
774         bdrv_graph_rdunlock_main_loop();
775         bdrv_drained_end(target_bs);
776         if (local_err) {
777             error_report_err(local_err);
778             ret = -EPERM;
779         }
780     }
781     if (s->to_replace) {
782         bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
783         error_free(s->replace_blocker);
784         bdrv_unref(s->to_replace);
785     }
786     if (replace_aio_context) {
787         aio_context_release(replace_aio_context);
788     }
789     g_free(s->replaces);
790     bdrv_unref(target_bs);
791 
792     /*
793      * Remove the mirror filter driver from the graph. Before this, get rid of
794      * the blockers on the intermediate nodes so that the resulting state is
795      * valid.
796      */
797     block_job_remove_all_bdrv(bjob);
798     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
799 
800     bs_opaque->job = NULL;
801 
802     bdrv_drained_end(src);
803     bdrv_drained_end(mirror_top_bs);
804     s->in_drain = false;
805     bdrv_unref(mirror_top_bs);
806     bdrv_unref(src);
807 
808     aio_context_release(qemu_get_aio_context());
809 
810     return ret;
811 }
812 
813 static int mirror_prepare(Job *job)
814 {
815     return mirror_exit_common(job);
816 }
817 
818 static void mirror_abort(Job *job)
819 {
820     int ret = mirror_exit_common(job);
821     assert(ret == 0);
822 }
823 
824 static void coroutine_fn mirror_throttle(MirrorBlockJob *s)
825 {
826     int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
827 
828     if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
829         s->last_pause_ns = now;
830         job_sleep_ns(&s->common.job, 0);
831     } else {
832         job_pause_point(&s->common.job);
833     }
834 }
835 
836 static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
837 {
838     int64_t offset;
839     BlockDriverState *bs = s->mirror_top_bs->backing->bs;
840     BlockDriverState *target_bs = blk_bs(s->target);
841     int ret;
842     int64_t count;
843 
844     if (s->zero_target) {
845         if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
846             bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
847             return 0;
848         }
849 
850         s->initial_zeroing_ongoing = true;
851         for (offset = 0; offset < s->bdev_length; ) {
852             int bytes = MIN(s->bdev_length - offset,
853                             QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
854 
855             mirror_throttle(s);
856 
857             if (job_is_cancelled(&s->common.job)) {
858                 s->initial_zeroing_ongoing = false;
859                 return 0;
860             }
861 
862             if (s->in_flight >= MAX_IN_FLIGHT) {
863                 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
864                                    s->in_flight);
865                 mirror_wait_for_free_in_flight_slot(s);
866                 continue;
867             }
868 
869             mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
870             offset += bytes;
871         }
872 
873         mirror_wait_for_all_io(s);
874         s->initial_zeroing_ongoing = false;
875     }
876 
877     /* First part, loop on the sectors and initialize the dirty bitmap.  */
878     for (offset = 0; offset < s->bdev_length; ) {
879         /* Just to make sure we are not exceeding int limit. */
880         int bytes = MIN(s->bdev_length - offset,
881                         QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
882 
883         mirror_throttle(s);
884 
885         if (job_is_cancelled(&s->common.job)) {
886             return 0;
887         }
888 
889         WITH_GRAPH_RDLOCK_GUARD() {
890             ret = bdrv_co_is_allocated_above(bs, s->base_overlay, true, offset,
891                                              bytes, &count);
892         }
893         if (ret < 0) {
894             return ret;
895         }
896 
897         assert(count);
898         if (ret > 0) {
899             bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
900         }
901         offset += count;
902     }
903     return 0;
904 }
905 
906 /* Called when going out of the streaming phase to flush the bulk of the
907  * data to the medium, or just before completing.
908  */
909 static int coroutine_fn mirror_flush(MirrorBlockJob *s)
910 {
911     int ret = blk_co_flush(s->target);
912     if (ret < 0) {
913         if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
914             s->ret = ret;
915         }
916     }
917     return ret;
918 }
919 
920 static int coroutine_fn mirror_run(Job *job, Error **errp)
921 {
922     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
923     BlockDriverState *bs = s->mirror_top_bs->backing->bs;
924     MirrorBDSOpaque *mirror_top_opaque = s->mirror_top_bs->opaque;
925     BlockDriverState *target_bs = blk_bs(s->target);
926     bool need_drain = true;
927     BlockDeviceIoStatus iostatus;
928     int64_t length;
929     int64_t target_length;
930     BlockDriverInfo bdi;
931     char backing_filename[2]; /* we only need 2 characters because we are only
932                                  checking for a NULL string */
933     int ret = 0;
934 
935     if (job_is_cancelled(&s->common.job)) {
936         goto immediate_exit;
937     }
938 
939     bdrv_graph_co_rdlock();
940     s->bdev_length = bdrv_co_getlength(bs);
941     bdrv_graph_co_rdunlock();
942 
943     if (s->bdev_length < 0) {
944         ret = s->bdev_length;
945         goto immediate_exit;
946     }
947 
948     target_length = blk_co_getlength(s->target);
949     if (target_length < 0) {
950         ret = target_length;
951         goto immediate_exit;
952     }
953 
954     /* Active commit must resize the base image if its size differs from the
955      * active layer. */
956     if (s->base == blk_bs(s->target)) {
957         if (s->bdev_length > target_length) {
958             ret = blk_co_truncate(s->target, s->bdev_length, false,
959                                   PREALLOC_MODE_OFF, 0, NULL);
960             if (ret < 0) {
961                 goto immediate_exit;
962             }
963         }
964     } else if (s->bdev_length != target_length) {
965         error_setg(errp, "Source and target image have different sizes");
966         ret = -EINVAL;
967         goto immediate_exit;
968     }
969 
970     if (s->bdev_length == 0) {
971         /* Transition to the READY state and wait for complete. */
972         job_transition_to_ready(&s->common.job);
973         qatomic_set(&s->actively_synced, true);
974         while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
975             job_yield(&s->common.job);
976         }
977         goto immediate_exit;
978     }
979 
980     length = DIV_ROUND_UP(s->bdev_length, s->granularity);
981     s->in_flight_bitmap = bitmap_new(length);
982 
983     /* If we have no backing file yet in the destination, we cannot let
984      * the destination do COW.  Instead, we copy sectors around the
985      * dirty data if needed.  We need a bitmap to do that.
986      */
987     bdrv_get_backing_filename(target_bs, backing_filename,
988                               sizeof(backing_filename));
989     bdrv_graph_co_rdlock();
990     if (!bdrv_co_get_info(target_bs, &bdi) && bdi.cluster_size) {
991         s->target_cluster_size = bdi.cluster_size;
992     } else {
993         s->target_cluster_size = BDRV_SECTOR_SIZE;
994     }
995     bdrv_graph_co_rdunlock();
996     if (backing_filename[0] && !bdrv_backing_chain_next(target_bs) &&
997         s->granularity < s->target_cluster_size) {
998         s->buf_size = MAX(s->buf_size, s->target_cluster_size);
999         s->cow_bitmap = bitmap_new(length);
1000     }
1001     s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
1002 
1003     s->buf = qemu_try_blockalign(bs, s->buf_size);
1004     if (s->buf == NULL) {
1005         ret = -ENOMEM;
1006         goto immediate_exit;
1007     }
1008 
1009     mirror_free_init(s);
1010 
1011     s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1012     if (!s->is_none_mode) {
1013         ret = mirror_dirty_init(s);
1014         if (ret < 0 || job_is_cancelled(&s->common.job)) {
1015             goto immediate_exit;
1016         }
1017     }
1018 
1019     /*
1020      * Only now the job is fully initialised and mirror_top_bs should start
1021      * accessing it.
1022      */
1023     mirror_top_opaque->job = s;
1024 
1025     assert(!s->dbi);
1026     s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
1027     for (;;) {
1028         int64_t cnt, delta;
1029         bool should_complete;
1030 
1031         if (s->ret < 0) {
1032             ret = s->ret;
1033             goto immediate_exit;
1034         }
1035 
1036         job_pause_point(&s->common.job);
1037 
1038         if (job_is_cancelled(&s->common.job)) {
1039             ret = 0;
1040             goto immediate_exit;
1041         }
1042 
1043         cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1044         /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
1045          * the number of bytes currently being processed; together those are
1046          * the current remaining operation length */
1047         job_progress_set_remaining(&s->common.job,
1048                                    s->bytes_in_flight + cnt +
1049                                    s->active_write_bytes_in_flight);
1050 
1051         /* Note that even when no rate limit is applied we need to yield
1052          * periodically with no pending I/O so that bdrv_drain_all() returns.
1053          * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
1054          * an error, or when the source is clean, whichever comes first. */
1055         delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
1056         WITH_JOB_LOCK_GUARD() {
1057             iostatus = s->common.iostatus;
1058         }
1059         if (delta < BLOCK_JOB_SLICE_TIME &&
1060             iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1061             if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
1062                 (cnt == 0 && s->in_flight > 0)) {
1063                 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
1064                 mirror_wait_for_free_in_flight_slot(s);
1065                 continue;
1066             } else if (cnt != 0) {
1067                 mirror_iteration(s);
1068             }
1069         }
1070 
1071         should_complete = false;
1072         if (s->in_flight == 0 && cnt == 0) {
1073             trace_mirror_before_flush(s);
1074             if (!job_is_ready(&s->common.job)) {
1075                 if (mirror_flush(s) < 0) {
1076                     /* Go check s->ret.  */
1077                     continue;
1078                 }
1079                 /* We're out of the streaming phase.  From now on, if the job
1080                  * is cancelled we will actually complete all pending I/O and
1081                  * report completion.  This way, block-job-cancel will leave
1082                  * the target in a consistent state.
1083                  */
1084                 job_transition_to_ready(&s->common.job);
1085             }
1086             if (qatomic_read(&s->copy_mode) != MIRROR_COPY_MODE_BACKGROUND) {
1087                 qatomic_set(&s->actively_synced, true);
1088             }
1089 
1090             should_complete = s->should_complete ||
1091                 job_cancel_requested(&s->common.job);
1092             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1093         }
1094 
1095         if (cnt == 0 && should_complete) {
1096             /* The dirty bitmap is not updated while operations are pending.
1097              * If we're about to exit, wait for pending operations before
1098              * calling bdrv_get_dirty_count(bs), or we may exit while the
1099              * source has dirty data to copy!
1100              *
1101              * Note that I/O can be submitted by the guest while
1102              * mirror_populate runs, so pause it now.  Before deciding
1103              * whether to switch to target check one last time if I/O has
1104              * come in the meanwhile, and if not flush the data to disk.
1105              */
1106             trace_mirror_before_drain(s, cnt);
1107 
1108             s->in_drain = true;
1109             bdrv_drained_begin(bs);
1110 
1111             /* Must be zero because we are drained */
1112             assert(s->in_active_write_counter == 0);
1113 
1114             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1115             if (cnt > 0 || mirror_flush(s) < 0) {
1116                 bdrv_drained_end(bs);
1117                 s->in_drain = false;
1118                 continue;
1119             }
1120 
1121             /* The two disks are in sync.  Exit and report successful
1122              * completion.
1123              */
1124             assert(QLIST_EMPTY(&bs->tracked_requests));
1125             need_drain = false;
1126             break;
1127         }
1128 
1129         if (job_is_ready(&s->common.job) && !should_complete) {
1130             if (s->in_flight == 0 && cnt == 0) {
1131                 trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1132                                           BLOCK_JOB_SLICE_TIME);
1133                 job_sleep_ns(&s->common.job, BLOCK_JOB_SLICE_TIME);
1134             }
1135         } else {
1136             block_job_ratelimit_sleep(&s->common);
1137         }
1138         s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1139     }
1140 
1141 immediate_exit:
1142     if (s->in_flight > 0) {
1143         /* We get here only if something went wrong.  Either the job failed,
1144          * or it was cancelled prematurely so that we do not guarantee that
1145          * the target is a copy of the source.
1146          */
1147         assert(ret < 0 || job_is_cancelled(&s->common.job));
1148         assert(need_drain);
1149         mirror_wait_for_all_io(s);
1150     }
1151 
1152     assert(s->in_flight == 0);
1153     qemu_vfree(s->buf);
1154     g_free(s->cow_bitmap);
1155     g_free(s->in_flight_bitmap);
1156     bdrv_dirty_iter_free(s->dbi);
1157 
1158     if (need_drain) {
1159         s->in_drain = true;
1160         bdrv_drained_begin(bs);
1161     }
1162 
1163     return ret;
1164 }
1165 
1166 static void mirror_complete(Job *job, Error **errp)
1167 {
1168     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1169 
1170     if (!job_is_ready(job)) {
1171         error_setg(errp, "The active block job '%s' cannot be completed",
1172                    job->id);
1173         return;
1174     }
1175 
1176     /* block all operations on to_replace bs */
1177     if (s->replaces) {
1178         AioContext *replace_aio_context;
1179 
1180         s->to_replace = bdrv_find_node(s->replaces);
1181         if (!s->to_replace) {
1182             error_setg(errp, "Node name '%s' not found", s->replaces);
1183             return;
1184         }
1185 
1186         replace_aio_context = bdrv_get_aio_context(s->to_replace);
1187         aio_context_acquire(replace_aio_context);
1188 
1189         /* TODO Translate this into child freeze system. */
1190         error_setg(&s->replace_blocker,
1191                    "block device is in use by block-job-complete");
1192         bdrv_op_block_all(s->to_replace, s->replace_blocker);
1193         bdrv_ref(s->to_replace);
1194 
1195         aio_context_release(replace_aio_context);
1196     }
1197 
1198     s->should_complete = true;
1199 
1200     /* If the job is paused, it will be re-entered when it is resumed */
1201     WITH_JOB_LOCK_GUARD() {
1202         if (!job->paused) {
1203             job_enter_cond_locked(job, NULL);
1204         }
1205     }
1206 }
1207 
1208 static void coroutine_fn mirror_pause(Job *job)
1209 {
1210     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1211 
1212     mirror_wait_for_all_io(s);
1213 }
1214 
1215 static bool mirror_drained_poll(BlockJob *job)
1216 {
1217     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1218 
1219     /* If the job isn't paused nor cancelled, we can't be sure that it won't
1220      * issue more requests. We make an exception if we've reached this point
1221      * from one of our own drain sections, to avoid a deadlock waiting for
1222      * ourselves.
1223      */
1224     WITH_JOB_LOCK_GUARD() {
1225         if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
1226             && !s->in_drain) {
1227             return true;
1228         }
1229     }
1230 
1231     return !!s->in_flight;
1232 }
1233 
1234 static bool mirror_cancel(Job *job, bool force)
1235 {
1236     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1237     BlockDriverState *target = blk_bs(s->target);
1238 
1239     /*
1240      * Before the job is READY, we treat any cancellation like a
1241      * force-cancellation.
1242      */
1243     force = force || !job_is_ready(job);
1244 
1245     if (force) {
1246         bdrv_cancel_in_flight(target);
1247     }
1248     return force;
1249 }
1250 
1251 static bool commit_active_cancel(Job *job, bool force)
1252 {
1253     /* Same as above in mirror_cancel() */
1254     return force || !job_is_ready(job);
1255 }
1256 
1257 static void mirror_change(BlockJob *job, BlockJobChangeOptions *opts,
1258                           Error **errp)
1259 {
1260     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1261     BlockJobChangeOptionsMirror *change_opts = &opts->u.mirror;
1262     MirrorCopyMode current;
1263 
1264     /*
1265      * The implementation relies on the fact that copy_mode is only written
1266      * under the BQL. Otherwise, further synchronization would be required.
1267      */
1268 
1269     GLOBAL_STATE_CODE();
1270 
1271     if (qatomic_read(&s->copy_mode) == change_opts->copy_mode) {
1272         return;
1273     }
1274 
1275     if (change_opts->copy_mode != MIRROR_COPY_MODE_WRITE_BLOCKING) {
1276         error_setg(errp, "Change to copy mode '%s' is not implemented",
1277                    MirrorCopyMode_str(change_opts->copy_mode));
1278         return;
1279     }
1280 
1281     current = qatomic_cmpxchg(&s->copy_mode, MIRROR_COPY_MODE_BACKGROUND,
1282                               change_opts->copy_mode);
1283     if (current != MIRROR_COPY_MODE_BACKGROUND) {
1284         error_setg(errp, "Expected current copy mode '%s', got '%s'",
1285                    MirrorCopyMode_str(MIRROR_COPY_MODE_BACKGROUND),
1286                    MirrorCopyMode_str(current));
1287     }
1288 }
1289 
1290 static void mirror_query(BlockJob *job, BlockJobInfo *info)
1291 {
1292     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1293 
1294     info->u.mirror = (BlockJobInfoMirror) {
1295         .actively_synced = qatomic_read(&s->actively_synced),
1296     };
1297 }
1298 
1299 static const BlockJobDriver mirror_job_driver = {
1300     .job_driver = {
1301         .instance_size          = sizeof(MirrorBlockJob),
1302         .job_type               = JOB_TYPE_MIRROR,
1303         .free                   = block_job_free,
1304         .user_resume            = block_job_user_resume,
1305         .run                    = mirror_run,
1306         .prepare                = mirror_prepare,
1307         .abort                  = mirror_abort,
1308         .pause                  = mirror_pause,
1309         .complete               = mirror_complete,
1310         .cancel                 = mirror_cancel,
1311     },
1312     .drained_poll           = mirror_drained_poll,
1313     .change                 = mirror_change,
1314     .query                  = mirror_query,
1315 };
1316 
1317 static const BlockJobDriver commit_active_job_driver = {
1318     .job_driver = {
1319         .instance_size          = sizeof(MirrorBlockJob),
1320         .job_type               = JOB_TYPE_COMMIT,
1321         .free                   = block_job_free,
1322         .user_resume            = block_job_user_resume,
1323         .run                    = mirror_run,
1324         .prepare                = mirror_prepare,
1325         .abort                  = mirror_abort,
1326         .pause                  = mirror_pause,
1327         .complete               = mirror_complete,
1328         .cancel                 = commit_active_cancel,
1329     },
1330     .drained_poll           = mirror_drained_poll,
1331 };
1332 
1333 static void coroutine_fn
1334 do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
1335                      uint64_t offset, uint64_t bytes,
1336                      QEMUIOVector *qiov, int flags)
1337 {
1338     int ret;
1339     size_t qiov_offset = 0;
1340     int64_t bitmap_offset, bitmap_end;
1341 
1342     if (!QEMU_IS_ALIGNED(offset, job->granularity) &&
1343         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset))
1344     {
1345             /*
1346              * Dirty unaligned padding: ignore it.
1347              *
1348              * Reasoning:
1349              * 1. If we copy it, we can't reset corresponding bit in
1350              *    dirty_bitmap as there may be some "dirty" bytes still not
1351              *    copied.
1352              * 2. It's already dirty, so skipping it we don't diverge mirror
1353              *    progress.
1354              *
1355              * Note, that because of this, guest write may have no contribution
1356              * into mirror converge, but that's not bad, as we have background
1357              * process of mirroring. If under some bad circumstances (high guest
1358              * IO load) background process starve, we will not converge anyway,
1359              * even if each write will contribute, as guest is not guaranteed to
1360              * rewrite the whole disk.
1361              */
1362             qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset;
1363             if (bytes <= qiov_offset) {
1364                 /* nothing to do after shrink */
1365                 return;
1366             }
1367             offset += qiov_offset;
1368             bytes -= qiov_offset;
1369     }
1370 
1371     if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) &&
1372         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1))
1373     {
1374         uint64_t tail = (offset + bytes) % job->granularity;
1375 
1376         if (bytes <= tail) {
1377             /* nothing to do after shrink */
1378             return;
1379         }
1380         bytes -= tail;
1381     }
1382 
1383     /*
1384      * Tails are either clean or shrunk, so for bitmap resetting
1385      * we safely align the range down.
1386      */
1387     bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity);
1388     bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity);
1389     if (bitmap_offset < bitmap_end) {
1390         bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1391                                 bitmap_end - bitmap_offset);
1392     }
1393 
1394     job_progress_increase_remaining(&job->common.job, bytes);
1395     job->active_write_bytes_in_flight += bytes;
1396 
1397     switch (method) {
1398     case MIRROR_METHOD_COPY:
1399         ret = blk_co_pwritev_part(job->target, offset, bytes,
1400                                   qiov, qiov_offset, flags);
1401         break;
1402 
1403     case MIRROR_METHOD_ZERO:
1404         assert(!qiov);
1405         ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags);
1406         break;
1407 
1408     case MIRROR_METHOD_DISCARD:
1409         assert(!qiov);
1410         ret = blk_co_pdiscard(job->target, offset, bytes);
1411         break;
1412 
1413     default:
1414         abort();
1415     }
1416 
1417     job->active_write_bytes_in_flight -= bytes;
1418     if (ret >= 0) {
1419         job_progress_update(&job->common.job, bytes);
1420     } else {
1421         BlockErrorAction action;
1422 
1423         /*
1424          * We failed, so we should mark dirty the whole area, aligned up.
1425          * Note that we don't care about shrunk tails if any: they were dirty
1426          * at function start, and they must be still dirty, as we've locked
1427          * the region for in-flight op.
1428          */
1429         bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity);
1430         bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity);
1431         bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1432                               bitmap_end - bitmap_offset);
1433         qatomic_set(&job->actively_synced, false);
1434 
1435         action = mirror_error_action(job, false, -ret);
1436         if (action == BLOCK_ERROR_ACTION_REPORT) {
1437             if (!job->ret) {
1438                 job->ret = ret;
1439             }
1440         }
1441     }
1442 }
1443 
1444 static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
1445                                                    uint64_t offset,
1446                                                    uint64_t bytes)
1447 {
1448     MirrorOp *op;
1449     uint64_t start_chunk = offset / s->granularity;
1450     uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
1451 
1452     op = g_new(MirrorOp, 1);
1453     *op = (MirrorOp){
1454         .s                  = s,
1455         .offset             = offset,
1456         .bytes              = bytes,
1457         .is_active_write    = true,
1458         .is_in_flight       = true,
1459         .co                 = qemu_coroutine_self(),
1460     };
1461     qemu_co_queue_init(&op->waiting_requests);
1462     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
1463 
1464     s->in_active_write_counter++;
1465 
1466     /*
1467      * Wait for concurrent requests affecting the area.  If there are already
1468      * running requests that are copying off now-to-be stale data in the area,
1469      * we must wait for them to finish before we begin writing fresh data to the
1470      * target so that the write operations appear in the correct order.
1471      * Note that background requests (see mirror_iteration()) in contrast only
1472      * wait for conflicting requests at the start of the dirty area, and then
1473      * (based on the in_flight_bitmap) truncate the area to copy so it will not
1474      * conflict with any requests beyond that.  For active writes, however, we
1475      * cannot truncate that area.  The request from our parent must be blocked
1476      * until the area is copied in full.  Therefore, we must wait for the whole
1477      * area to become free of concurrent requests.
1478      */
1479     mirror_wait_on_conflicts(op, s, offset, bytes);
1480 
1481     bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1482 
1483     return op;
1484 }
1485 
1486 static void coroutine_fn GRAPH_RDLOCK active_write_settle(MirrorOp *op)
1487 {
1488     uint64_t start_chunk = op->offset / op->s->granularity;
1489     uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
1490                                       op->s->granularity);
1491 
1492     if (!--op->s->in_active_write_counter &&
1493         qatomic_read(&op->s->actively_synced)) {
1494         BdrvChild *source = op->s->mirror_top_bs->backing;
1495 
1496         if (QLIST_FIRST(&source->bs->parents) == source &&
1497             QLIST_NEXT(source, next_parent) == NULL)
1498         {
1499             /* Assert that we are back in sync once all active write
1500              * operations are settled.
1501              * Note that we can only assert this if the mirror node
1502              * is the source node's only parent. */
1503             assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
1504         }
1505     }
1506     bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1507     QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
1508     qemu_co_queue_restart_all(&op->waiting_requests);
1509     g_free(op);
1510 }
1511 
1512 static int coroutine_fn GRAPH_RDLOCK
1513 bdrv_mirror_top_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1514                        QEMUIOVector *qiov, BdrvRequestFlags flags)
1515 {
1516     return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
1517 }
1518 
1519 static bool should_copy_to_target(MirrorBDSOpaque *s)
1520 {
1521     return s->job && s->job->ret >= 0 &&
1522         !job_is_cancelled(&s->job->common.job) &&
1523         qatomic_read(&s->job->copy_mode) == MIRROR_COPY_MODE_WRITE_BLOCKING;
1524 }
1525 
1526 static int coroutine_fn GRAPH_RDLOCK
1527 bdrv_mirror_top_do_write(BlockDriverState *bs, MirrorMethod method,
1528                          bool copy_to_target, uint64_t offset, uint64_t bytes,
1529                          QEMUIOVector *qiov, int flags)
1530 {
1531     MirrorOp *op = NULL;
1532     MirrorBDSOpaque *s = bs->opaque;
1533     int ret = 0;
1534 
1535     if (copy_to_target) {
1536         op = active_write_prepare(s->job, offset, bytes);
1537     }
1538 
1539     switch (method) {
1540     case MIRROR_METHOD_COPY:
1541         ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
1542         break;
1543 
1544     case MIRROR_METHOD_ZERO:
1545         ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
1546         break;
1547 
1548     case MIRROR_METHOD_DISCARD:
1549         ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1550         break;
1551 
1552     default:
1553         abort();
1554     }
1555 
1556     if (!copy_to_target && s->job && s->job->dirty_bitmap) {
1557         qatomic_set(&s->job->actively_synced, false);
1558         bdrv_set_dirty_bitmap(s->job->dirty_bitmap, offset, bytes);
1559     }
1560 
1561     if (ret < 0) {
1562         goto out;
1563     }
1564 
1565     if (copy_to_target) {
1566         do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
1567     }
1568 
1569 out:
1570     if (copy_to_target) {
1571         active_write_settle(op);
1572     }
1573     return ret;
1574 }
1575 
1576 static int coroutine_fn GRAPH_RDLOCK
1577 bdrv_mirror_top_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1578                         QEMUIOVector *qiov, BdrvRequestFlags flags)
1579 {
1580     QEMUIOVector bounce_qiov;
1581     void *bounce_buf;
1582     int ret = 0;
1583     bool copy_to_target = should_copy_to_target(bs->opaque);
1584 
1585     if (copy_to_target) {
1586         /* The guest might concurrently modify the data to write; but
1587          * the data on source and destination must match, so we have
1588          * to use a bounce buffer if we are going to write to the
1589          * target now. */
1590         bounce_buf = qemu_blockalign(bs, bytes);
1591         iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
1592 
1593         qemu_iovec_init(&bounce_qiov, 1);
1594         qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
1595         qiov = &bounce_qiov;
1596 
1597         flags &= ~BDRV_REQ_REGISTERED_BUF;
1598     }
1599 
1600     ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, copy_to_target,
1601                                    offset, bytes, qiov, flags);
1602 
1603     if (copy_to_target) {
1604         qemu_iovec_destroy(&bounce_qiov);
1605         qemu_vfree(bounce_buf);
1606     }
1607 
1608     return ret;
1609 }
1610 
1611 static int coroutine_fn GRAPH_RDLOCK bdrv_mirror_top_flush(BlockDriverState *bs)
1612 {
1613     if (bs->backing == NULL) {
1614         /* we can be here after failed bdrv_append in mirror_start_job */
1615         return 0;
1616     }
1617     return bdrv_co_flush(bs->backing->bs);
1618 }
1619 
1620 static int coroutine_fn GRAPH_RDLOCK
1621 bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1622                               int64_t bytes, BdrvRequestFlags flags)
1623 {
1624     bool copy_to_target = should_copy_to_target(bs->opaque);
1625     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, copy_to_target,
1626                                     offset, bytes, NULL, flags);
1627 }
1628 
1629 static int coroutine_fn GRAPH_RDLOCK
1630 bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1631 {
1632     bool copy_to_target = should_copy_to_target(bs->opaque);
1633     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, copy_to_target,
1634                                     offset, bytes, NULL, 0);
1635 }
1636 
1637 static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs)
1638 {
1639     if (bs->backing == NULL) {
1640         /* we can be here after failed bdrv_attach_child in
1641          * bdrv_set_backing_hd */
1642         return;
1643     }
1644     pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
1645             bs->backing->bs->filename);
1646 }
1647 
1648 static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
1649                                        BdrvChildRole role,
1650                                        BlockReopenQueue *reopen_queue,
1651                                        uint64_t perm, uint64_t shared,
1652                                        uint64_t *nperm, uint64_t *nshared)
1653 {
1654     MirrorBDSOpaque *s = bs->opaque;
1655 
1656     if (s->stop) {
1657         /*
1658          * If the job is to be stopped, we do not need to forward
1659          * anything to the real image.
1660          */
1661         *nperm = 0;
1662         *nshared = BLK_PERM_ALL;
1663         return;
1664     }
1665 
1666     bdrv_default_perms(bs, c, role, reopen_queue,
1667                        perm, shared, nperm, nshared);
1668 
1669     if (s->is_commit) {
1670         /*
1671          * For commit jobs, we cannot take CONSISTENT_READ, because
1672          * that permission is unshared for everything above the base
1673          * node (except for filters on the base node).
1674          * We also have to force-share the WRITE permission, or
1675          * otherwise we would block ourselves at the base node (if
1676          * writes are blocked for a node, they are also blocked for
1677          * its backing file).
1678          * (We could also share RESIZE, because it may be needed for
1679          * the target if its size is less than the top node's; but
1680          * bdrv_default_perms_for_cow() automatically shares RESIZE
1681          * for backing nodes if WRITE is shared, so there is no need
1682          * to do it here.)
1683          */
1684         *nperm &= ~BLK_PERM_CONSISTENT_READ;
1685         *nshared |= BLK_PERM_WRITE;
1686     }
1687 }
1688 
1689 /* Dummy node that provides consistent read to its users without requiring it
1690  * from its backing file and that allows writes on the backing file chain. */
1691 static BlockDriver bdrv_mirror_top = {
1692     .format_name                = "mirror_top",
1693     .bdrv_co_preadv             = bdrv_mirror_top_preadv,
1694     .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
1695     .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
1696     .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
1697     .bdrv_co_flush              = bdrv_mirror_top_flush,
1698     .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1699     .bdrv_child_perm            = bdrv_mirror_top_child_perm,
1700 
1701     .is_filter                  = true,
1702     .filtered_child_is_backing  = true,
1703 };
1704 
1705 static BlockJob *mirror_start_job(
1706                              const char *job_id, BlockDriverState *bs,
1707                              int creation_flags, BlockDriverState *target,
1708                              const char *replaces, int64_t speed,
1709                              uint32_t granularity, int64_t buf_size,
1710                              BlockMirrorBackingMode backing_mode,
1711                              bool zero_target,
1712                              BlockdevOnError on_source_error,
1713                              BlockdevOnError on_target_error,
1714                              bool unmap,
1715                              BlockCompletionFunc *cb,
1716                              void *opaque,
1717                              const BlockJobDriver *driver,
1718                              bool is_none_mode, BlockDriverState *base,
1719                              bool auto_complete, const char *filter_node_name,
1720                              bool is_mirror, MirrorCopyMode copy_mode,
1721                              Error **errp)
1722 {
1723     MirrorBlockJob *s;
1724     MirrorBDSOpaque *bs_opaque;
1725     BlockDriverState *mirror_top_bs;
1726     bool target_is_backing;
1727     uint64_t target_perms, target_shared_perms;
1728     int ret;
1729 
1730     GLOBAL_STATE_CODE();
1731 
1732     if (granularity == 0) {
1733         granularity = bdrv_get_default_bitmap_granularity(target);
1734     }
1735 
1736     assert(is_power_of_2(granularity));
1737 
1738     if (buf_size < 0) {
1739         error_setg(errp, "Invalid parameter 'buf-size'");
1740         return NULL;
1741     }
1742 
1743     if (buf_size == 0) {
1744         buf_size = DEFAULT_MIRROR_BUF_SIZE;
1745     }
1746 
1747     if (bdrv_skip_filters(bs) == bdrv_skip_filters(target)) {
1748         error_setg(errp, "Can't mirror node into itself");
1749         return NULL;
1750     }
1751 
1752     target_is_backing = bdrv_chain_contains(bs, target);
1753 
1754     /* In the case of active commit, add dummy driver to provide consistent
1755      * reads on the top, while disabling it in the intermediate nodes, and make
1756      * the backing chain writable. */
1757     mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
1758                                          BDRV_O_RDWR, errp);
1759     if (mirror_top_bs == NULL) {
1760         return NULL;
1761     }
1762     if (!filter_node_name) {
1763         mirror_top_bs->implicit = true;
1764     }
1765 
1766     /* So that we can always drop this node */
1767     mirror_top_bs->never_freeze = true;
1768 
1769     mirror_top_bs->total_sectors = bs->total_sectors;
1770     mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1771     mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED |
1772                                           BDRV_REQ_NO_FALLBACK;
1773     bs_opaque = g_new0(MirrorBDSOpaque, 1);
1774     mirror_top_bs->opaque = bs_opaque;
1775 
1776     bs_opaque->is_commit = target_is_backing;
1777 
1778     bdrv_drained_begin(bs);
1779     ret = bdrv_append(mirror_top_bs, bs, errp);
1780     bdrv_drained_end(bs);
1781 
1782     if (ret < 0) {
1783         bdrv_unref(mirror_top_bs);
1784         return NULL;
1785     }
1786 
1787     /* Make sure that the source is not resized while the job is running */
1788     s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1789                          BLK_PERM_CONSISTENT_READ,
1790                          BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1791                          BLK_PERM_WRITE, speed,
1792                          creation_flags, cb, opaque, errp);
1793     if (!s) {
1794         goto fail;
1795     }
1796 
1797     /* The block job now has a reference to this node */
1798     bdrv_unref(mirror_top_bs);
1799 
1800     s->mirror_top_bs = mirror_top_bs;
1801 
1802     /* No resize for the target either; while the mirror is still running, a
1803      * consistent read isn't necessarily possible. We could possibly allow
1804      * writes and graph modifications, though it would likely defeat the
1805      * purpose of a mirror, so leave them blocked for now.
1806      *
1807      * In the case of active commit, things look a bit different, though,
1808      * because the target is an already populated backing file in active use.
1809      * We can allow anything except resize there.*/
1810 
1811     target_perms = BLK_PERM_WRITE;
1812     target_shared_perms = BLK_PERM_WRITE_UNCHANGED;
1813 
1814     if (target_is_backing) {
1815         int64_t bs_size, target_size;
1816         bs_size = bdrv_getlength(bs);
1817         if (bs_size < 0) {
1818             error_setg_errno(errp, -bs_size,
1819                              "Could not inquire top image size");
1820             goto fail;
1821         }
1822 
1823         target_size = bdrv_getlength(target);
1824         if (target_size < 0) {
1825             error_setg_errno(errp, -target_size,
1826                              "Could not inquire base image size");
1827             goto fail;
1828         }
1829 
1830         if (target_size < bs_size) {
1831             target_perms |= BLK_PERM_RESIZE;
1832         }
1833 
1834         target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
1835     } else if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
1836         /*
1837          * We may want to allow this in the future, but it would
1838          * require taking some extra care.
1839          */
1840         error_setg(errp, "Cannot mirror to a filter on top of a node in the "
1841                    "source's backing chain");
1842         goto fail;
1843     }
1844 
1845     s->target = blk_new(s->common.job.aio_context,
1846                         target_perms, target_shared_perms);
1847     ret = blk_insert_bs(s->target, target, errp);
1848     if (ret < 0) {
1849         goto fail;
1850     }
1851     if (is_mirror) {
1852         /* XXX: Mirror target could be a NBD server of target QEMU in the case
1853          * of non-shared block migration. To allow migration completion, we
1854          * have to allow "inactivate" of the target BB.  When that happens, we
1855          * know the job is drained, and the vcpus are stopped, so no write
1856          * operation will be performed. Block layer already has assertions to
1857          * ensure that. */
1858         blk_set_force_allow_inactivate(s->target);
1859     }
1860     blk_set_allow_aio_context_change(s->target, true);
1861     blk_set_disable_request_queuing(s->target, true);
1862 
1863     s->replaces = g_strdup(replaces);
1864     s->on_source_error = on_source_error;
1865     s->on_target_error = on_target_error;
1866     s->is_none_mode = is_none_mode;
1867     s->backing_mode = backing_mode;
1868     s->zero_target = zero_target;
1869     qatomic_set(&s->copy_mode, copy_mode);
1870     s->base = base;
1871     s->base_overlay = bdrv_find_overlay(bs, base);
1872     s->granularity = granularity;
1873     s->buf_size = ROUND_UP(buf_size, granularity);
1874     s->unmap = unmap;
1875     if (auto_complete) {
1876         s->should_complete = true;
1877     }
1878 
1879     s->dirty_bitmap = bdrv_create_dirty_bitmap(s->mirror_top_bs, granularity,
1880                                                NULL, errp);
1881     if (!s->dirty_bitmap) {
1882         goto fail;
1883     }
1884 
1885     /*
1886      * The dirty bitmap is set by bdrv_mirror_top_do_write() when not in active
1887      * mode.
1888      */
1889     bdrv_disable_dirty_bitmap(s->dirty_bitmap);
1890 
1891     bdrv_graph_wrlock(bs);
1892     ret = block_job_add_bdrv(&s->common, "source", bs, 0,
1893                              BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE |
1894                              BLK_PERM_CONSISTENT_READ,
1895                              errp);
1896     if (ret < 0) {
1897         bdrv_graph_wrunlock();
1898         goto fail;
1899     }
1900 
1901     /* Required permissions are already taken with blk_new() */
1902     block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
1903                        &error_abort);
1904 
1905     /* In commit_active_start() all intermediate nodes disappear, so
1906      * any jobs in them must be blocked */
1907     if (target_is_backing) {
1908         BlockDriverState *iter, *filtered_target;
1909         uint64_t iter_shared_perms;
1910 
1911         /*
1912          * The topmost node with
1913          * bdrv_skip_filters(filtered_target) == bdrv_skip_filters(target)
1914          */
1915         filtered_target = bdrv_cow_bs(bdrv_find_overlay(bs, target));
1916 
1917         assert(bdrv_skip_filters(filtered_target) ==
1918                bdrv_skip_filters(target));
1919 
1920         /*
1921          * XXX BLK_PERM_WRITE needs to be allowed so we don't block
1922          * ourselves at s->base (if writes are blocked for a node, they are
1923          * also blocked for its backing file). The other options would be a
1924          * second filter driver above s->base (== target).
1925          */
1926         iter_shared_perms = BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE;
1927 
1928         for (iter = bdrv_filter_or_cow_bs(bs); iter != target;
1929              iter = bdrv_filter_or_cow_bs(iter))
1930         {
1931             if (iter == filtered_target) {
1932                 /*
1933                  * From here on, all nodes are filters on the base.
1934                  * This allows us to share BLK_PERM_CONSISTENT_READ.
1935                  */
1936                 iter_shared_perms |= BLK_PERM_CONSISTENT_READ;
1937             }
1938 
1939             ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
1940                                      iter_shared_perms, errp);
1941             if (ret < 0) {
1942                 bdrv_graph_wrunlock();
1943                 goto fail;
1944             }
1945         }
1946 
1947         if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) {
1948             bdrv_graph_wrunlock();
1949             goto fail;
1950         }
1951     }
1952     bdrv_graph_wrunlock();
1953 
1954     QTAILQ_INIT(&s->ops_in_flight);
1955 
1956     trace_mirror_start(bs, s, opaque);
1957     job_start(&s->common.job);
1958 
1959     return &s->common;
1960 
1961 fail:
1962     if (s) {
1963         /* Make sure this BDS does not go away until we have completed the graph
1964          * changes below */
1965         bdrv_ref(mirror_top_bs);
1966 
1967         g_free(s->replaces);
1968         blk_unref(s->target);
1969         bs_opaque->job = NULL;
1970         if (s->dirty_bitmap) {
1971             bdrv_release_dirty_bitmap(s->dirty_bitmap);
1972         }
1973         job_early_fail(&s->common.job);
1974     }
1975 
1976     bs_opaque->stop = true;
1977     bdrv_graph_rdlock_main_loop();
1978     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
1979                              &error_abort);
1980     bdrv_graph_rdunlock_main_loop();
1981     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
1982 
1983     bdrv_unref(mirror_top_bs);
1984 
1985     return NULL;
1986 }
1987 
1988 void mirror_start(const char *job_id, BlockDriverState *bs,
1989                   BlockDriverState *target, const char *replaces,
1990                   int creation_flags, int64_t speed,
1991                   uint32_t granularity, int64_t buf_size,
1992                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
1993                   bool zero_target,
1994                   BlockdevOnError on_source_error,
1995                   BlockdevOnError on_target_error,
1996                   bool unmap, const char *filter_node_name,
1997                   MirrorCopyMode copy_mode, Error **errp)
1998 {
1999     bool is_none_mode;
2000     BlockDriverState *base;
2001 
2002     GLOBAL_STATE_CODE();
2003 
2004     if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
2005         (mode == MIRROR_SYNC_MODE_BITMAP)) {
2006         error_setg(errp, "Sync mode '%s' not supported",
2007                    MirrorSyncMode_str(mode));
2008         return;
2009     }
2010     is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
2011     base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
2012     mirror_start_job(job_id, bs, creation_flags, target, replaces,
2013                      speed, granularity, buf_size, backing_mode, zero_target,
2014                      on_source_error, on_target_error, unmap, NULL, NULL,
2015                      &mirror_job_driver, is_none_mode, base, false,
2016                      filter_node_name, true, copy_mode, errp);
2017 }
2018 
2019 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
2020                               BlockDriverState *base, int creation_flags,
2021                               int64_t speed, BlockdevOnError on_error,
2022                               const char *filter_node_name,
2023                               BlockCompletionFunc *cb, void *opaque,
2024                               bool auto_complete, Error **errp)
2025 {
2026     bool base_read_only;
2027     BlockJob *job;
2028 
2029     GLOBAL_STATE_CODE();
2030 
2031     base_read_only = bdrv_is_read_only(base);
2032 
2033     if (base_read_only) {
2034         if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
2035             return NULL;
2036         }
2037     }
2038 
2039     job = mirror_start_job(
2040                      job_id, bs, creation_flags, base, NULL, speed, 0, 0,
2041                      MIRROR_LEAVE_BACKING_CHAIN, false,
2042                      on_error, on_error, true, cb, opaque,
2043                      &commit_active_job_driver, false, base, auto_complete,
2044                      filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
2045                      errp);
2046     if (!job) {
2047         goto error_restore_flags;
2048     }
2049 
2050     return job;
2051 
2052 error_restore_flags:
2053     /* ignore error and errp for bdrv_reopen, because we want to propagate
2054      * the original error */
2055     if (base_read_only) {
2056         bdrv_reopen_set_read_only(base, true, NULL);
2057     }
2058     return NULL;
2059 }
2060