xref: /qemu/block/mirror.c (revision 9c707525)
1 /*
2  * Image mirroring
3  *
4  * Copyright Red Hat, Inc. 2012
5  *
6  * Authors:
7  *  Paolo Bonzini  <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qemu/cutils.h"
16 #include "qemu/coroutine.h"
17 #include "qemu/range.h"
18 #include "trace.h"
19 #include "block/blockjob_int.h"
20 #include "block/block_int.h"
21 #include "block/dirty-bitmap.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qemu/ratelimit.h"
25 #include "qemu/bitmap.h"
26 #include "qemu/memalign.h"
27 
28 #define MAX_IN_FLIGHT 16
29 #define MAX_IO_BYTES (1 << 20) /* 1 Mb */
30 #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
31 
32 /* The mirroring buffer is a list of granularity-sized chunks.
33  * Free chunks are organized in a list.
34  */
35 typedef struct MirrorBuffer {
36     QSIMPLEQ_ENTRY(MirrorBuffer) next;
37 } MirrorBuffer;
38 
39 typedef struct MirrorOp MirrorOp;
40 
41 typedef struct MirrorBlockJob {
42     BlockJob common;
43     BlockBackend *target;
44     BlockDriverState *mirror_top_bs;
45     BlockDriverState *base;
46     BlockDriverState *base_overlay;
47 
48     /* The name of the graph node to replace */
49     char *replaces;
50     /* The BDS to replace */
51     BlockDriverState *to_replace;
52     /* Used to block operations on the drive-mirror-replace target */
53     Error *replace_blocker;
54     bool is_none_mode;
55     BlockMirrorBackingMode backing_mode;
56     /* Whether the target image requires explicit zero-initialization */
57     bool zero_target;
58     /*
59      * To be accesssed with atomics. Written only under the BQL (required by the
60      * current implementation of mirror_change()).
61      */
62     MirrorCopyMode copy_mode;
63     BlockdevOnError on_source_error, on_target_error;
64     /*
65      * To be accessed with atomics.
66      *
67      * Set when the target is synced (dirty bitmap is clean, nothing in flight)
68      * and the job is running in active mode.
69      */
70     bool actively_synced;
71     bool should_complete;
72     int64_t granularity;
73     size_t buf_size;
74     int64_t bdev_length;
75     unsigned long *cow_bitmap;
76     BdrvDirtyBitmap *dirty_bitmap;
77     BdrvDirtyBitmapIter *dbi;
78     uint8_t *buf;
79     QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
80     int buf_free_count;
81 
82     uint64_t last_pause_ns;
83     unsigned long *in_flight_bitmap;
84     unsigned in_flight;
85     int64_t bytes_in_flight;
86     QTAILQ_HEAD(, MirrorOp) ops_in_flight;
87     int ret;
88     bool unmap;
89     int target_cluster_size;
90     int max_iov;
91     bool initial_zeroing_ongoing;
92     int in_active_write_counter;
93     int64_t active_write_bytes_in_flight;
94     bool prepared;
95     bool in_drain;
96 } MirrorBlockJob;
97 
98 typedef struct MirrorBDSOpaque {
99     MirrorBlockJob *job;
100     bool stop;
101     bool is_commit;
102 } MirrorBDSOpaque;
103 
104 struct MirrorOp {
105     MirrorBlockJob *s;
106     QEMUIOVector qiov;
107     int64_t offset;
108     uint64_t bytes;
109 
110     /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
111      * mirror_co_discard() before yielding for the first time */
112     int64_t *bytes_handled;
113 
114     bool is_pseudo_op;
115     bool is_active_write;
116     bool is_in_flight;
117     CoQueue waiting_requests;
118     Coroutine *co;
119     MirrorOp *waiting_for_op;
120 
121     QTAILQ_ENTRY(MirrorOp) next;
122 };
123 
124 typedef enum MirrorMethod {
125     MIRROR_METHOD_COPY,
126     MIRROR_METHOD_ZERO,
127     MIRROR_METHOD_DISCARD,
128 } MirrorMethod;
129 
130 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
131                                             int error)
132 {
133     qatomic_set(&s->actively_synced, false);
134     if (read) {
135         return block_job_error_action(&s->common, s->on_source_error,
136                                       true, error);
137     } else {
138         return block_job_error_action(&s->common, s->on_target_error,
139                                       false, error);
140     }
141 }
142 
143 static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
144                                                   MirrorBlockJob *s,
145                                                   uint64_t offset,
146                                                   uint64_t bytes)
147 {
148     uint64_t self_start_chunk = offset / s->granularity;
149     uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
150     uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
151 
152     while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
153                          self_start_chunk) < self_end_chunk &&
154            s->ret >= 0)
155     {
156         MirrorOp *op;
157 
158         QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
159             uint64_t op_start_chunk = op->offset / s->granularity;
160             uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
161                                                  s->granularity) -
162                                     op_start_chunk;
163 
164             if (op == self) {
165                 continue;
166             }
167 
168             if (ranges_overlap(self_start_chunk, self_nb_chunks,
169                                op_start_chunk, op_nb_chunks))
170             {
171                 if (self) {
172                     /*
173                      * If the operation is already (indirectly) waiting for us,
174                      * or will wait for us as soon as it wakes up, then just go
175                      * on (instead of producing a deadlock in the former case).
176                      */
177                     if (op->waiting_for_op) {
178                         continue;
179                     }
180 
181                     self->waiting_for_op = op;
182                 }
183 
184                 qemu_co_queue_wait(&op->waiting_requests, NULL);
185 
186                 if (self) {
187                     self->waiting_for_op = NULL;
188                 }
189 
190                 break;
191             }
192         }
193     }
194 }
195 
196 static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
197 {
198     MirrorBlockJob *s = op->s;
199     struct iovec *iov;
200     int64_t chunk_num;
201     int i, nb_chunks;
202 
203     trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
204 
205     s->in_flight--;
206     s->bytes_in_flight -= op->bytes;
207     iov = op->qiov.iov;
208     for (i = 0; i < op->qiov.niov; i++) {
209         MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
210         QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
211         s->buf_free_count++;
212     }
213 
214     chunk_num = op->offset / s->granularity;
215     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
216 
217     bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
218     QTAILQ_REMOVE(&s->ops_in_flight, op, next);
219     if (ret >= 0) {
220         if (s->cow_bitmap) {
221             bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
222         }
223         if (!s->initial_zeroing_ongoing) {
224             job_progress_update(&s->common.job, op->bytes);
225         }
226     }
227     qemu_iovec_destroy(&op->qiov);
228 
229     qemu_co_queue_restart_all(&op->waiting_requests);
230     g_free(op);
231 }
232 
233 static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
234 {
235     MirrorBlockJob *s = op->s;
236 
237     if (ret < 0) {
238         BlockErrorAction action;
239 
240         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
241         action = mirror_error_action(s, false, -ret);
242         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
243             s->ret = ret;
244         }
245     }
246 
247     mirror_iteration_done(op, ret);
248 }
249 
250 static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
251 {
252     MirrorBlockJob *s = op->s;
253 
254     if (ret < 0) {
255         BlockErrorAction action;
256 
257         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
258         action = mirror_error_action(s, true, -ret);
259         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
260             s->ret = ret;
261         }
262 
263         mirror_iteration_done(op, ret);
264         return;
265     }
266 
267     ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
268     mirror_write_complete(op, ret);
269 }
270 
271 /* Clip bytes relative to offset to not exceed end-of-file */
272 static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
273                                         int64_t offset,
274                                         int64_t bytes)
275 {
276     return MIN(bytes, s->bdev_length - offset);
277 }
278 
279 /* Round offset and/or bytes to target cluster if COW is needed, and
280  * return the offset of the adjusted tail against original. */
281 static int coroutine_fn mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
282                                          uint64_t *bytes)
283 {
284     bool need_cow;
285     int ret = 0;
286     int64_t align_offset = *offset;
287     int64_t align_bytes = *bytes;
288     int max_bytes = s->granularity * s->max_iov;
289 
290     need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
291     need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
292                           s->cow_bitmap);
293     if (need_cow) {
294         bdrv_round_to_subclusters(blk_bs(s->target), *offset, *bytes,
295                                   &align_offset, &align_bytes);
296     }
297 
298     if (align_bytes > max_bytes) {
299         align_bytes = max_bytes;
300         if (need_cow) {
301             align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
302         }
303     }
304     /* Clipping may result in align_bytes unaligned to chunk boundary, but
305      * that doesn't matter because it's already the end of source image. */
306     align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
307 
308     ret = align_offset + align_bytes - (*offset + *bytes);
309     *offset = align_offset;
310     *bytes = align_bytes;
311     assert(ret >= 0);
312     return ret;
313 }
314 
315 static inline void coroutine_fn
316 mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
317 {
318     MirrorOp *op;
319 
320     QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
321         /*
322          * Do not wait on pseudo ops, because it may in turn wait on
323          * some other operation to start, which may in fact be the
324          * caller of this function.  Since there is only one pseudo op
325          * at any given time, we will always find some real operation
326          * to wait on.
327          * Also, do not wait on active operations, because they do not
328          * use up in-flight slots.
329          */
330         if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
331             qemu_co_queue_wait(&op->waiting_requests, NULL);
332             return;
333         }
334     }
335     abort();
336 }
337 
338 /* Perform a mirror copy operation.
339  *
340  * *op->bytes_handled is set to the number of bytes copied after and
341  * including offset, excluding any bytes copied prior to offset due
342  * to alignment.  This will be op->bytes if no alignment is necessary,
343  * or (new_end - op->offset) if the tail is rounded up or down due to
344  * alignment or buffer limit.
345  */
346 static void coroutine_fn mirror_co_read(void *opaque)
347 {
348     MirrorOp *op = opaque;
349     MirrorBlockJob *s = op->s;
350     int nb_chunks;
351     uint64_t ret;
352     uint64_t max_bytes;
353 
354     max_bytes = s->granularity * s->max_iov;
355 
356     /* We can only handle as much as buf_size at a time. */
357     op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
358     assert(op->bytes);
359     assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
360     *op->bytes_handled = op->bytes;
361 
362     if (s->cow_bitmap) {
363         *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
364     }
365     /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
366     assert(*op->bytes_handled <= UINT_MAX);
367     assert(op->bytes <= s->buf_size);
368     /* The offset is granularity-aligned because:
369      * 1) Caller passes in aligned values;
370      * 2) mirror_cow_align is used only when target cluster is larger. */
371     assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
372     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
373     assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
374     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
375 
376     while (s->buf_free_count < nb_chunks) {
377         trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
378         mirror_wait_for_free_in_flight_slot(s);
379     }
380 
381     /* Now make a QEMUIOVector taking enough granularity-sized chunks
382      * from s->buf_free.
383      */
384     qemu_iovec_init(&op->qiov, nb_chunks);
385     while (nb_chunks-- > 0) {
386         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
387         size_t remaining = op->bytes - op->qiov.size;
388 
389         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
390         s->buf_free_count--;
391         qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
392     }
393 
394     /* Copy the dirty cluster.  */
395     s->in_flight++;
396     s->bytes_in_flight += op->bytes;
397     op->is_in_flight = true;
398     trace_mirror_one_iteration(s, op->offset, op->bytes);
399 
400     WITH_GRAPH_RDLOCK_GUARD() {
401         ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
402                              &op->qiov, 0);
403     }
404     mirror_read_complete(op, ret);
405 }
406 
407 static void coroutine_fn mirror_co_zero(void *opaque)
408 {
409     MirrorOp *op = opaque;
410     int ret;
411 
412     op->s->in_flight++;
413     op->s->bytes_in_flight += op->bytes;
414     *op->bytes_handled = op->bytes;
415     op->is_in_flight = true;
416 
417     ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
418                                op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
419     mirror_write_complete(op, ret);
420 }
421 
422 static void coroutine_fn mirror_co_discard(void *opaque)
423 {
424     MirrorOp *op = opaque;
425     int ret;
426 
427     op->s->in_flight++;
428     op->s->bytes_in_flight += op->bytes;
429     *op->bytes_handled = op->bytes;
430     op->is_in_flight = true;
431 
432     ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
433     mirror_write_complete(op, ret);
434 }
435 
436 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
437                                unsigned bytes, MirrorMethod mirror_method)
438 {
439     MirrorOp *op;
440     Coroutine *co;
441     int64_t bytes_handled = -1;
442 
443     op = g_new(MirrorOp, 1);
444     *op = (MirrorOp){
445         .s              = s,
446         .offset         = offset,
447         .bytes          = bytes,
448         .bytes_handled  = &bytes_handled,
449     };
450     qemu_co_queue_init(&op->waiting_requests);
451 
452     switch (mirror_method) {
453     case MIRROR_METHOD_COPY:
454         co = qemu_coroutine_create(mirror_co_read, op);
455         break;
456     case MIRROR_METHOD_ZERO:
457         co = qemu_coroutine_create(mirror_co_zero, op);
458         break;
459     case MIRROR_METHOD_DISCARD:
460         co = qemu_coroutine_create(mirror_co_discard, op);
461         break;
462     default:
463         abort();
464     }
465     op->co = co;
466 
467     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
468     qemu_coroutine_enter(co);
469     /* At this point, ownership of op has been moved to the coroutine
470      * and the object may already be freed */
471 
472     /* Assert that this value has been set */
473     assert(bytes_handled >= 0);
474 
475     /* Same assertion as in mirror_co_read() (and for mirror_co_read()
476      * and mirror_co_discard(), bytes_handled == op->bytes, which
477      * is the @bytes parameter given to this function) */
478     assert(bytes_handled <= UINT_MAX);
479     return bytes_handled;
480 }
481 
482 static void coroutine_fn GRAPH_UNLOCKED mirror_iteration(MirrorBlockJob *s)
483 {
484     BlockDriverState *source;
485     MirrorOp *pseudo_op;
486     int64_t offset;
487     /* At least the first dirty chunk is mirrored in one iteration. */
488     int nb_chunks = 1;
489     bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
490     int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
491 
492     bdrv_graph_co_rdlock();
493     source = s->mirror_top_bs->backing->bs;
494     bdrv_graph_co_rdunlock();
495 
496     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
497     offset = bdrv_dirty_iter_next(s->dbi);
498     if (offset < 0) {
499         bdrv_set_dirty_iter(s->dbi, 0);
500         offset = bdrv_dirty_iter_next(s->dbi);
501         trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
502         assert(offset >= 0);
503     }
504     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
505 
506     /*
507      * Wait for concurrent requests to @offset.  The next loop will limit the
508      * copied area based on in_flight_bitmap so we only copy an area that does
509      * not overlap with concurrent in-flight requests.  Still, we would like to
510      * copy something, so wait until there are at least no more requests to the
511      * very beginning of the area.
512      */
513     mirror_wait_on_conflicts(NULL, s, offset, 1);
514 
515     job_pause_point(&s->common.job);
516 
517     /* Find the number of consecutive dirty chunks following the first dirty
518      * one, and wait for in flight requests in them. */
519     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
520     while (nb_chunks * s->granularity < s->buf_size) {
521         int64_t next_dirty;
522         int64_t next_offset = offset + nb_chunks * s->granularity;
523         int64_t next_chunk = next_offset / s->granularity;
524         if (next_offset >= s->bdev_length ||
525             !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) {
526             break;
527         }
528         if (test_bit(next_chunk, s->in_flight_bitmap)) {
529             break;
530         }
531 
532         next_dirty = bdrv_dirty_iter_next(s->dbi);
533         if (next_dirty > next_offset || next_dirty < 0) {
534             /* The bitmap iterator's cache is stale, refresh it */
535             bdrv_set_dirty_iter(s->dbi, next_offset);
536             next_dirty = bdrv_dirty_iter_next(s->dbi);
537         }
538         assert(next_dirty == next_offset);
539         nb_chunks++;
540     }
541 
542     /* Clear dirty bits before querying the block status, because
543      * calling bdrv_block_status_above could yield - if some blocks are
544      * marked dirty in this window, we need to know.
545      */
546     bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
547                                    nb_chunks * s->granularity);
548     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
549 
550     /* Before claiming an area in the in-flight bitmap, we have to
551      * create a MirrorOp for it so that conflicting requests can wait
552      * for it.  mirror_perform() will create the real MirrorOps later,
553      * for now we just create a pseudo operation that will wake up all
554      * conflicting requests once all real operations have been
555      * launched. */
556     pseudo_op = g_new(MirrorOp, 1);
557     *pseudo_op = (MirrorOp){
558         .offset         = offset,
559         .bytes          = nb_chunks * s->granularity,
560         .is_pseudo_op   = true,
561     };
562     qemu_co_queue_init(&pseudo_op->waiting_requests);
563     QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
564 
565     bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
566     while (nb_chunks > 0 && offset < s->bdev_length) {
567         int ret;
568         int64_t io_bytes;
569         int64_t io_bytes_acct;
570         MirrorMethod mirror_method = MIRROR_METHOD_COPY;
571 
572         assert(!(offset % s->granularity));
573         WITH_GRAPH_RDLOCK_GUARD() {
574             ret = bdrv_co_block_status_above(source, NULL, offset,
575                                              nb_chunks * s->granularity,
576                                              &io_bytes, NULL, NULL);
577         }
578         if (ret < 0) {
579             io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
580         } else if (ret & BDRV_BLOCK_DATA) {
581             io_bytes = MIN(io_bytes, max_io_bytes);
582         }
583 
584         io_bytes -= io_bytes % s->granularity;
585         if (io_bytes < s->granularity) {
586             io_bytes = s->granularity;
587         } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
588             int64_t target_offset;
589             int64_t target_bytes;
590             WITH_GRAPH_RDLOCK_GUARD() {
591                 bdrv_round_to_subclusters(blk_bs(s->target), offset, io_bytes,
592                                           &target_offset, &target_bytes);
593             }
594             if (target_offset == offset &&
595                 target_bytes == io_bytes) {
596                 mirror_method = ret & BDRV_BLOCK_ZERO ?
597                                     MIRROR_METHOD_ZERO :
598                                     MIRROR_METHOD_DISCARD;
599             }
600         }
601 
602         while (s->in_flight >= MAX_IN_FLIGHT) {
603             trace_mirror_yield_in_flight(s, offset, s->in_flight);
604             mirror_wait_for_free_in_flight_slot(s);
605         }
606 
607         if (s->ret < 0) {
608             ret = 0;
609             goto fail;
610         }
611 
612         io_bytes = mirror_clip_bytes(s, offset, io_bytes);
613         io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
614         if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
615             io_bytes_acct = 0;
616         } else {
617             io_bytes_acct = io_bytes;
618         }
619         assert(io_bytes);
620         offset += io_bytes;
621         nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
622         block_job_ratelimit_processed_bytes(&s->common, io_bytes_acct);
623     }
624 
625 fail:
626     QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
627     qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
628     g_free(pseudo_op);
629 }
630 
631 static void mirror_free_init(MirrorBlockJob *s)
632 {
633     int granularity = s->granularity;
634     size_t buf_size = s->buf_size;
635     uint8_t *buf = s->buf;
636 
637     assert(s->buf_free_count == 0);
638     QSIMPLEQ_INIT(&s->buf_free);
639     while (buf_size != 0) {
640         MirrorBuffer *cur = (MirrorBuffer *)buf;
641         QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
642         s->buf_free_count++;
643         buf_size -= granularity;
644         buf += granularity;
645     }
646 }
647 
648 /* This is also used for the .pause callback. There is no matching
649  * mirror_resume() because mirror_run() will begin iterating again
650  * when the job is resumed.
651  */
652 static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s)
653 {
654     while (s->in_flight > 0) {
655         mirror_wait_for_free_in_flight_slot(s);
656     }
657 }
658 
659 /**
660  * mirror_exit_common: handle both abort() and prepare() cases.
661  * for .prepare, returns 0 on success and -errno on failure.
662  * for .abort cases, denoted by abort = true, MUST return 0.
663  */
664 static int mirror_exit_common(Job *job)
665 {
666     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
667     BlockJob *bjob = &s->common;
668     MirrorBDSOpaque *bs_opaque;
669     BlockDriverState *src;
670     BlockDriverState *target_bs;
671     BlockDriverState *mirror_top_bs;
672     Error *local_err = NULL;
673     bool abort = job->ret < 0;
674     int ret = 0;
675 
676     GLOBAL_STATE_CODE();
677 
678     if (s->prepared) {
679         return 0;
680     }
681     s->prepared = true;
682 
683     bdrv_graph_rdlock_main_loop();
684 
685     mirror_top_bs = s->mirror_top_bs;
686     bs_opaque = mirror_top_bs->opaque;
687     src = mirror_top_bs->backing->bs;
688     target_bs = blk_bs(s->target);
689 
690     if (bdrv_chain_contains(src, target_bs)) {
691         bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs);
692     }
693 
694     bdrv_release_dirty_bitmap(s->dirty_bitmap);
695 
696     /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
697      * before we can call bdrv_drained_end */
698     bdrv_ref(src);
699     bdrv_ref(mirror_top_bs);
700     bdrv_ref(target_bs);
701 
702     bdrv_graph_rdunlock_main_loop();
703 
704     /*
705      * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
706      * inserting target_bs at s->to_replace, where we might not be able to get
707      * these permissions.
708      */
709     blk_unref(s->target);
710     s->target = NULL;
711 
712     /* We don't access the source any more. Dropping any WRITE/RESIZE is
713      * required before it could become a backing file of target_bs. Not having
714      * these permissions any more means that we can't allow any new requests on
715      * mirror_top_bs from now on, so keep it drained. */
716     bdrv_drained_begin(mirror_top_bs);
717     bdrv_drained_begin(target_bs);
718     bs_opaque->stop = true;
719 
720     bdrv_graph_rdlock_main_loop();
721     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
722                              &error_abort);
723 
724     if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
725         BlockDriverState *backing = s->is_none_mode ? src : s->base;
726         BlockDriverState *unfiltered_target = bdrv_skip_filters(target_bs);
727 
728         if (bdrv_cow_bs(unfiltered_target) != backing) {
729             bdrv_set_backing_hd(unfiltered_target, backing, &local_err);
730             if (local_err) {
731                 error_report_err(local_err);
732                 local_err = NULL;
733                 ret = -EPERM;
734             }
735         }
736     } else if (!abort && s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
737         assert(!bdrv_backing_chain_next(target_bs));
738         ret = bdrv_open_backing_file(bdrv_skip_filters(target_bs), NULL,
739                                      "backing", &local_err);
740         if (ret < 0) {
741             error_report_err(local_err);
742             local_err = NULL;
743         }
744     }
745     bdrv_graph_rdunlock_main_loop();
746 
747     if (s->should_complete && !abort) {
748         BlockDriverState *to_replace = s->to_replace ?: src;
749         bool ro = bdrv_is_read_only(to_replace);
750 
751         if (ro != bdrv_is_read_only(target_bs)) {
752             bdrv_reopen_set_read_only(target_bs, ro, NULL);
753         }
754 
755         /* The mirror job has no requests in flight any more, but we need to
756          * drain potential other users of the BDS before changing the graph. */
757         assert(s->in_drain);
758         bdrv_drained_begin(to_replace);
759         /*
760          * Cannot use check_to_replace_node() here, because that would
761          * check for an op blocker on @to_replace, and we have our own
762          * there.
763          */
764         bdrv_graph_wrlock();
765         if (bdrv_recurse_can_replace(src, to_replace)) {
766             bdrv_replace_node(to_replace, target_bs, &local_err);
767         } else {
768             error_setg(&local_err, "Can no longer replace '%s' by '%s', "
769                        "because it can no longer be guaranteed that doing so "
770                        "would not lead to an abrupt change of visible data",
771                        to_replace->node_name, target_bs->node_name);
772         }
773         bdrv_graph_wrunlock();
774         bdrv_drained_end(to_replace);
775         if (local_err) {
776             error_report_err(local_err);
777             ret = -EPERM;
778         }
779     }
780     if (s->to_replace) {
781         bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
782         error_free(s->replace_blocker);
783         bdrv_unref(s->to_replace);
784     }
785     g_free(s->replaces);
786 
787     /*
788      * Remove the mirror filter driver from the graph. Before this, get rid of
789      * the blockers on the intermediate nodes so that the resulting state is
790      * valid.
791      */
792     block_job_remove_all_bdrv(bjob);
793     bdrv_graph_wrlock();
794     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
795     bdrv_graph_wrunlock();
796 
797     bdrv_drained_end(target_bs);
798     bdrv_unref(target_bs);
799 
800     bs_opaque->job = NULL;
801 
802     bdrv_drained_end(src);
803     bdrv_drained_end(mirror_top_bs);
804     s->in_drain = false;
805     bdrv_unref(mirror_top_bs);
806     bdrv_unref(src);
807 
808     return ret;
809 }
810 
811 static int mirror_prepare(Job *job)
812 {
813     return mirror_exit_common(job);
814 }
815 
816 static void mirror_abort(Job *job)
817 {
818     int ret = mirror_exit_common(job);
819     assert(ret == 0);
820 }
821 
822 static void coroutine_fn mirror_throttle(MirrorBlockJob *s)
823 {
824     int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
825 
826     if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
827         s->last_pause_ns = now;
828         job_sleep_ns(&s->common.job, 0);
829     } else {
830         job_pause_point(&s->common.job);
831     }
832 }
833 
834 static int coroutine_fn GRAPH_UNLOCKED mirror_dirty_init(MirrorBlockJob *s)
835 {
836     int64_t offset;
837     BlockDriverState *bs;
838     BlockDriverState *target_bs = blk_bs(s->target);
839     int ret;
840     int64_t count;
841 
842     bdrv_graph_co_rdlock();
843     bs = s->mirror_top_bs->backing->bs;
844     bdrv_graph_co_rdunlock();
845 
846     if (s->zero_target) {
847         if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
848             bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
849             return 0;
850         }
851 
852         s->initial_zeroing_ongoing = true;
853         for (offset = 0; offset < s->bdev_length; ) {
854             int bytes = MIN(s->bdev_length - offset,
855                             QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
856 
857             mirror_throttle(s);
858 
859             if (job_is_cancelled(&s->common.job)) {
860                 s->initial_zeroing_ongoing = false;
861                 return 0;
862             }
863 
864             if (s->in_flight >= MAX_IN_FLIGHT) {
865                 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
866                                    s->in_flight);
867                 mirror_wait_for_free_in_flight_slot(s);
868                 continue;
869             }
870 
871             mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
872             offset += bytes;
873         }
874 
875         mirror_wait_for_all_io(s);
876         s->initial_zeroing_ongoing = false;
877     }
878 
879     /* First part, loop on the sectors and initialize the dirty bitmap.  */
880     for (offset = 0; offset < s->bdev_length; ) {
881         /* Just to make sure we are not exceeding int limit. */
882         int bytes = MIN(s->bdev_length - offset,
883                         QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
884 
885         mirror_throttle(s);
886 
887         if (job_is_cancelled(&s->common.job)) {
888             return 0;
889         }
890 
891         WITH_GRAPH_RDLOCK_GUARD() {
892             ret = bdrv_co_is_allocated_above(bs, s->base_overlay, true, offset,
893                                              bytes, &count);
894         }
895         if (ret < 0) {
896             return ret;
897         }
898 
899         assert(count);
900         if (ret > 0) {
901             bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
902         }
903         offset += count;
904     }
905     return 0;
906 }
907 
908 /* Called when going out of the streaming phase to flush the bulk of the
909  * data to the medium, or just before completing.
910  */
911 static int coroutine_fn mirror_flush(MirrorBlockJob *s)
912 {
913     int ret = blk_co_flush(s->target);
914     if (ret < 0) {
915         if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
916             s->ret = ret;
917         }
918     }
919     return ret;
920 }
921 
922 static int coroutine_fn mirror_run(Job *job, Error **errp)
923 {
924     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
925     BlockDriverState *bs;
926     MirrorBDSOpaque *mirror_top_opaque = s->mirror_top_bs->opaque;
927     BlockDriverState *target_bs = blk_bs(s->target);
928     bool need_drain = true;
929     BlockDeviceIoStatus iostatus;
930     int64_t length;
931     int64_t target_length;
932     BlockDriverInfo bdi;
933     char backing_filename[2]; /* we only need 2 characters because we are only
934                                  checking for a NULL string */
935     int ret = 0;
936 
937     bdrv_graph_co_rdlock();
938     bs = bdrv_filter_bs(s->mirror_top_bs);
939     bdrv_graph_co_rdunlock();
940 
941     if (job_is_cancelled(&s->common.job)) {
942         goto immediate_exit;
943     }
944 
945     bdrv_graph_co_rdlock();
946     s->bdev_length = bdrv_co_getlength(bs);
947     bdrv_graph_co_rdunlock();
948 
949     if (s->bdev_length < 0) {
950         ret = s->bdev_length;
951         goto immediate_exit;
952     }
953 
954     target_length = blk_co_getlength(s->target);
955     if (target_length < 0) {
956         ret = target_length;
957         goto immediate_exit;
958     }
959 
960     /* Active commit must resize the base image if its size differs from the
961      * active layer. */
962     if (s->base == blk_bs(s->target)) {
963         if (s->bdev_length > target_length) {
964             ret = blk_co_truncate(s->target, s->bdev_length, false,
965                                   PREALLOC_MODE_OFF, 0, NULL);
966             if (ret < 0) {
967                 goto immediate_exit;
968             }
969         }
970     } else if (s->bdev_length != target_length) {
971         error_setg(errp, "Source and target image have different sizes");
972         ret = -EINVAL;
973         goto immediate_exit;
974     }
975 
976     if (s->bdev_length == 0) {
977         /* Transition to the READY state and wait for complete. */
978         job_transition_to_ready(&s->common.job);
979         qatomic_set(&s->actively_synced, true);
980         while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
981             job_yield(&s->common.job);
982         }
983         goto immediate_exit;
984     }
985 
986     length = DIV_ROUND_UP(s->bdev_length, s->granularity);
987     s->in_flight_bitmap = bitmap_new(length);
988 
989     /* If we have no backing file yet in the destination, we cannot let
990      * the destination do COW.  Instead, we copy sectors around the
991      * dirty data if needed.  We need a bitmap to do that.
992      */
993     bdrv_get_backing_filename(target_bs, backing_filename,
994                               sizeof(backing_filename));
995     bdrv_graph_co_rdlock();
996     if (!bdrv_co_get_info(target_bs, &bdi) && bdi.cluster_size) {
997         s->target_cluster_size = bdi.cluster_size;
998     } else {
999         s->target_cluster_size = BDRV_SECTOR_SIZE;
1000     }
1001     if (backing_filename[0] && !bdrv_backing_chain_next(target_bs) &&
1002         s->granularity < s->target_cluster_size) {
1003         s->buf_size = MAX(s->buf_size, s->target_cluster_size);
1004         s->cow_bitmap = bitmap_new(length);
1005     }
1006     s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
1007     bdrv_graph_co_rdunlock();
1008 
1009     s->buf = qemu_try_blockalign(bs, s->buf_size);
1010     if (s->buf == NULL) {
1011         ret = -ENOMEM;
1012         goto immediate_exit;
1013     }
1014 
1015     mirror_free_init(s);
1016 
1017     s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1018     if (!s->is_none_mode) {
1019         ret = mirror_dirty_init(s);
1020         if (ret < 0 || job_is_cancelled(&s->common.job)) {
1021             goto immediate_exit;
1022         }
1023     }
1024 
1025     /*
1026      * Only now the job is fully initialised and mirror_top_bs should start
1027      * accessing it.
1028      */
1029     mirror_top_opaque->job = s;
1030 
1031     assert(!s->dbi);
1032     s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
1033     for (;;) {
1034         int64_t cnt, delta;
1035         bool should_complete;
1036 
1037         if (s->ret < 0) {
1038             ret = s->ret;
1039             goto immediate_exit;
1040         }
1041 
1042         job_pause_point(&s->common.job);
1043 
1044         if (job_is_cancelled(&s->common.job)) {
1045             ret = 0;
1046             goto immediate_exit;
1047         }
1048 
1049         cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1050         /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
1051          * the number of bytes currently being processed; together those are
1052          * the current remaining operation length */
1053         job_progress_set_remaining(&s->common.job,
1054                                    s->bytes_in_flight + cnt +
1055                                    s->active_write_bytes_in_flight);
1056 
1057         /* Note that even when no rate limit is applied we need to yield
1058          * periodically with no pending I/O so that bdrv_drain_all() returns.
1059          * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
1060          * an error, or when the source is clean, whichever comes first. */
1061         delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
1062         WITH_JOB_LOCK_GUARD() {
1063             iostatus = s->common.iostatus;
1064         }
1065         if (delta < BLOCK_JOB_SLICE_TIME &&
1066             iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1067             if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
1068                 (cnt == 0 && s->in_flight > 0)) {
1069                 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
1070                 mirror_wait_for_free_in_flight_slot(s);
1071                 continue;
1072             } else if (cnt != 0) {
1073                 mirror_iteration(s);
1074             }
1075         }
1076 
1077         should_complete = false;
1078         if (s->in_flight == 0 && cnt == 0) {
1079             trace_mirror_before_flush(s);
1080             if (!job_is_ready(&s->common.job)) {
1081                 if (mirror_flush(s) < 0) {
1082                     /* Go check s->ret.  */
1083                     continue;
1084                 }
1085                 /* We're out of the streaming phase.  From now on, if the job
1086                  * is cancelled we will actually complete all pending I/O and
1087                  * report completion.  This way, block-job-cancel will leave
1088                  * the target in a consistent state.
1089                  */
1090                 job_transition_to_ready(&s->common.job);
1091             }
1092             if (qatomic_read(&s->copy_mode) != MIRROR_COPY_MODE_BACKGROUND) {
1093                 qatomic_set(&s->actively_synced, true);
1094             }
1095 
1096             should_complete = s->should_complete ||
1097                 job_cancel_requested(&s->common.job);
1098             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1099         }
1100 
1101         if (cnt == 0 && should_complete) {
1102             /* The dirty bitmap is not updated while operations are pending.
1103              * If we're about to exit, wait for pending operations before
1104              * calling bdrv_get_dirty_count(bs), or we may exit while the
1105              * source has dirty data to copy!
1106              *
1107              * Note that I/O can be submitted by the guest while
1108              * mirror_populate runs, so pause it now.  Before deciding
1109              * whether to switch to target check one last time if I/O has
1110              * come in the meanwhile, and if not flush the data to disk.
1111              */
1112             trace_mirror_before_drain(s, cnt);
1113 
1114             s->in_drain = true;
1115             bdrv_drained_begin(bs);
1116 
1117             /* Must be zero because we are drained */
1118             assert(s->in_active_write_counter == 0);
1119 
1120             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1121             if (cnt > 0 || mirror_flush(s) < 0) {
1122                 bdrv_drained_end(bs);
1123                 s->in_drain = false;
1124                 continue;
1125             }
1126 
1127             /* The two disks are in sync.  Exit and report successful
1128              * completion.
1129              */
1130             assert(QLIST_EMPTY(&bs->tracked_requests));
1131             need_drain = false;
1132             break;
1133         }
1134 
1135         if (job_is_ready(&s->common.job) && !should_complete) {
1136             if (s->in_flight == 0 && cnt == 0) {
1137                 trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1138                                           BLOCK_JOB_SLICE_TIME);
1139                 job_sleep_ns(&s->common.job, BLOCK_JOB_SLICE_TIME);
1140             }
1141         } else {
1142             block_job_ratelimit_sleep(&s->common);
1143         }
1144         s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1145     }
1146 
1147 immediate_exit:
1148     if (s->in_flight > 0) {
1149         /* We get here only if something went wrong.  Either the job failed,
1150          * or it was cancelled prematurely so that we do not guarantee that
1151          * the target is a copy of the source.
1152          */
1153         assert(ret < 0 || job_is_cancelled(&s->common.job));
1154         assert(need_drain);
1155         mirror_wait_for_all_io(s);
1156     }
1157 
1158     assert(s->in_flight == 0);
1159     qemu_vfree(s->buf);
1160     g_free(s->cow_bitmap);
1161     g_free(s->in_flight_bitmap);
1162     bdrv_dirty_iter_free(s->dbi);
1163 
1164     if (need_drain) {
1165         s->in_drain = true;
1166         bdrv_drained_begin(bs);
1167     }
1168 
1169     return ret;
1170 }
1171 
1172 static void mirror_complete(Job *job, Error **errp)
1173 {
1174     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1175 
1176     if (!job_is_ready(job)) {
1177         error_setg(errp, "The active block job '%s' cannot be completed",
1178                    job->id);
1179         return;
1180     }
1181 
1182     /* block all operations on to_replace bs */
1183     if (s->replaces) {
1184         s->to_replace = bdrv_find_node(s->replaces);
1185         if (!s->to_replace) {
1186             error_setg(errp, "Node name '%s' not found", s->replaces);
1187             return;
1188         }
1189 
1190         /* TODO Translate this into child freeze system. */
1191         error_setg(&s->replace_blocker,
1192                    "block device is in use by block-job-complete");
1193         bdrv_op_block_all(s->to_replace, s->replace_blocker);
1194         bdrv_ref(s->to_replace);
1195     }
1196 
1197     s->should_complete = true;
1198 
1199     /* If the job is paused, it will be re-entered when it is resumed */
1200     WITH_JOB_LOCK_GUARD() {
1201         if (!job->paused) {
1202             job_enter_cond_locked(job, NULL);
1203         }
1204     }
1205 }
1206 
1207 static void coroutine_fn mirror_pause(Job *job)
1208 {
1209     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1210 
1211     mirror_wait_for_all_io(s);
1212 }
1213 
1214 static bool mirror_drained_poll(BlockJob *job)
1215 {
1216     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1217 
1218     /* If the job isn't paused nor cancelled, we can't be sure that it won't
1219      * issue more requests. We make an exception if we've reached this point
1220      * from one of our own drain sections, to avoid a deadlock waiting for
1221      * ourselves.
1222      */
1223     WITH_JOB_LOCK_GUARD() {
1224         if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
1225             && !s->in_drain) {
1226             return true;
1227         }
1228     }
1229 
1230     return !!s->in_flight;
1231 }
1232 
1233 static bool mirror_cancel(Job *job, bool force)
1234 {
1235     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1236     BlockDriverState *target = blk_bs(s->target);
1237 
1238     /*
1239      * Before the job is READY, we treat any cancellation like a
1240      * force-cancellation.
1241      */
1242     force = force || !job_is_ready(job);
1243 
1244     if (force) {
1245         bdrv_cancel_in_flight(target);
1246     }
1247     return force;
1248 }
1249 
1250 static bool commit_active_cancel(Job *job, bool force)
1251 {
1252     /* Same as above in mirror_cancel() */
1253     return force || !job_is_ready(job);
1254 }
1255 
1256 static void mirror_change(BlockJob *job, BlockJobChangeOptions *opts,
1257                           Error **errp)
1258 {
1259     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1260     BlockJobChangeOptionsMirror *change_opts = &opts->u.mirror;
1261     MirrorCopyMode current;
1262 
1263     /*
1264      * The implementation relies on the fact that copy_mode is only written
1265      * under the BQL. Otherwise, further synchronization would be required.
1266      */
1267 
1268     GLOBAL_STATE_CODE();
1269 
1270     if (qatomic_read(&s->copy_mode) == change_opts->copy_mode) {
1271         return;
1272     }
1273 
1274     if (change_opts->copy_mode != MIRROR_COPY_MODE_WRITE_BLOCKING) {
1275         error_setg(errp, "Change to copy mode '%s' is not implemented",
1276                    MirrorCopyMode_str(change_opts->copy_mode));
1277         return;
1278     }
1279 
1280     current = qatomic_cmpxchg(&s->copy_mode, MIRROR_COPY_MODE_BACKGROUND,
1281                               change_opts->copy_mode);
1282     if (current != MIRROR_COPY_MODE_BACKGROUND) {
1283         error_setg(errp, "Expected current copy mode '%s', got '%s'",
1284                    MirrorCopyMode_str(MIRROR_COPY_MODE_BACKGROUND),
1285                    MirrorCopyMode_str(current));
1286     }
1287 }
1288 
1289 static void mirror_query(BlockJob *job, BlockJobInfo *info)
1290 {
1291     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1292 
1293     info->u.mirror = (BlockJobInfoMirror) {
1294         .actively_synced = qatomic_read(&s->actively_synced),
1295     };
1296 }
1297 
1298 static const BlockJobDriver mirror_job_driver = {
1299     .job_driver = {
1300         .instance_size          = sizeof(MirrorBlockJob),
1301         .job_type               = JOB_TYPE_MIRROR,
1302         .free                   = block_job_free,
1303         .user_resume            = block_job_user_resume,
1304         .run                    = mirror_run,
1305         .prepare                = mirror_prepare,
1306         .abort                  = mirror_abort,
1307         .pause                  = mirror_pause,
1308         .complete               = mirror_complete,
1309         .cancel                 = mirror_cancel,
1310     },
1311     .drained_poll           = mirror_drained_poll,
1312     .change                 = mirror_change,
1313     .query                  = mirror_query,
1314 };
1315 
1316 static const BlockJobDriver commit_active_job_driver = {
1317     .job_driver = {
1318         .instance_size          = sizeof(MirrorBlockJob),
1319         .job_type               = JOB_TYPE_COMMIT,
1320         .free                   = block_job_free,
1321         .user_resume            = block_job_user_resume,
1322         .run                    = mirror_run,
1323         .prepare                = mirror_prepare,
1324         .abort                  = mirror_abort,
1325         .pause                  = mirror_pause,
1326         .complete               = mirror_complete,
1327         .cancel                 = commit_active_cancel,
1328     },
1329     .drained_poll           = mirror_drained_poll,
1330 };
1331 
1332 static void coroutine_fn
1333 do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
1334                      uint64_t offset, uint64_t bytes,
1335                      QEMUIOVector *qiov, int flags)
1336 {
1337     int ret;
1338     size_t qiov_offset = 0;
1339     int64_t bitmap_offset, bitmap_end;
1340 
1341     if (!QEMU_IS_ALIGNED(offset, job->granularity) &&
1342         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset))
1343     {
1344             /*
1345              * Dirty unaligned padding: ignore it.
1346              *
1347              * Reasoning:
1348              * 1. If we copy it, we can't reset corresponding bit in
1349              *    dirty_bitmap as there may be some "dirty" bytes still not
1350              *    copied.
1351              * 2. It's already dirty, so skipping it we don't diverge mirror
1352              *    progress.
1353              *
1354              * Note, that because of this, guest write may have no contribution
1355              * into mirror converge, but that's not bad, as we have background
1356              * process of mirroring. If under some bad circumstances (high guest
1357              * IO load) background process starve, we will not converge anyway,
1358              * even if each write will contribute, as guest is not guaranteed to
1359              * rewrite the whole disk.
1360              */
1361             qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset;
1362             if (bytes <= qiov_offset) {
1363                 /* nothing to do after shrink */
1364                 return;
1365             }
1366             offset += qiov_offset;
1367             bytes -= qiov_offset;
1368     }
1369 
1370     if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) &&
1371         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1))
1372     {
1373         uint64_t tail = (offset + bytes) % job->granularity;
1374 
1375         if (bytes <= tail) {
1376             /* nothing to do after shrink */
1377             return;
1378         }
1379         bytes -= tail;
1380     }
1381 
1382     /*
1383      * Tails are either clean or shrunk, so for bitmap resetting
1384      * we safely align the range down.
1385      */
1386     bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity);
1387     bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity);
1388     if (bitmap_offset < bitmap_end) {
1389         bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1390                                 bitmap_end - bitmap_offset);
1391     }
1392 
1393     job_progress_increase_remaining(&job->common.job, bytes);
1394     job->active_write_bytes_in_flight += bytes;
1395 
1396     switch (method) {
1397     case MIRROR_METHOD_COPY:
1398         ret = blk_co_pwritev_part(job->target, offset, bytes,
1399                                   qiov, qiov_offset, flags);
1400         break;
1401 
1402     case MIRROR_METHOD_ZERO:
1403         assert(!qiov);
1404         ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags);
1405         break;
1406 
1407     case MIRROR_METHOD_DISCARD:
1408         assert(!qiov);
1409         ret = blk_co_pdiscard(job->target, offset, bytes);
1410         break;
1411 
1412     default:
1413         abort();
1414     }
1415 
1416     job->active_write_bytes_in_flight -= bytes;
1417     if (ret >= 0) {
1418         job_progress_update(&job->common.job, bytes);
1419     } else {
1420         BlockErrorAction action;
1421 
1422         /*
1423          * We failed, so we should mark dirty the whole area, aligned up.
1424          * Note that we don't care about shrunk tails if any: they were dirty
1425          * at function start, and they must be still dirty, as we've locked
1426          * the region for in-flight op.
1427          */
1428         bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity);
1429         bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity);
1430         bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1431                               bitmap_end - bitmap_offset);
1432         qatomic_set(&job->actively_synced, false);
1433 
1434         action = mirror_error_action(job, false, -ret);
1435         if (action == BLOCK_ERROR_ACTION_REPORT) {
1436             if (!job->ret) {
1437                 job->ret = ret;
1438             }
1439         }
1440     }
1441 }
1442 
1443 static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
1444                                                    uint64_t offset,
1445                                                    uint64_t bytes)
1446 {
1447     MirrorOp *op;
1448     uint64_t start_chunk = offset / s->granularity;
1449     uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
1450 
1451     op = g_new(MirrorOp, 1);
1452     *op = (MirrorOp){
1453         .s                  = s,
1454         .offset             = offset,
1455         .bytes              = bytes,
1456         .is_active_write    = true,
1457         .is_in_flight       = true,
1458         .co                 = qemu_coroutine_self(),
1459     };
1460     qemu_co_queue_init(&op->waiting_requests);
1461     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
1462 
1463     s->in_active_write_counter++;
1464 
1465     /*
1466      * Wait for concurrent requests affecting the area.  If there are already
1467      * running requests that are copying off now-to-be stale data in the area,
1468      * we must wait for them to finish before we begin writing fresh data to the
1469      * target so that the write operations appear in the correct order.
1470      * Note that background requests (see mirror_iteration()) in contrast only
1471      * wait for conflicting requests at the start of the dirty area, and then
1472      * (based on the in_flight_bitmap) truncate the area to copy so it will not
1473      * conflict with any requests beyond that.  For active writes, however, we
1474      * cannot truncate that area.  The request from our parent must be blocked
1475      * until the area is copied in full.  Therefore, we must wait for the whole
1476      * area to become free of concurrent requests.
1477      */
1478     mirror_wait_on_conflicts(op, s, offset, bytes);
1479 
1480     bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1481 
1482     return op;
1483 }
1484 
1485 static void coroutine_fn GRAPH_RDLOCK active_write_settle(MirrorOp *op)
1486 {
1487     uint64_t start_chunk = op->offset / op->s->granularity;
1488     uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
1489                                       op->s->granularity);
1490 
1491     if (!--op->s->in_active_write_counter &&
1492         qatomic_read(&op->s->actively_synced)) {
1493         BdrvChild *source = op->s->mirror_top_bs->backing;
1494 
1495         if (QLIST_FIRST(&source->bs->parents) == source &&
1496             QLIST_NEXT(source, next_parent) == NULL)
1497         {
1498             /* Assert that we are back in sync once all active write
1499              * operations are settled.
1500              * Note that we can only assert this if the mirror node
1501              * is the source node's only parent. */
1502             assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
1503         }
1504     }
1505     bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1506     QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
1507     qemu_co_queue_restart_all(&op->waiting_requests);
1508     g_free(op);
1509 }
1510 
1511 static int coroutine_fn GRAPH_RDLOCK
1512 bdrv_mirror_top_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1513                        QEMUIOVector *qiov, BdrvRequestFlags flags)
1514 {
1515     return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
1516 }
1517 
1518 static bool should_copy_to_target(MirrorBDSOpaque *s)
1519 {
1520     return s->job && s->job->ret >= 0 &&
1521         !job_is_cancelled(&s->job->common.job) &&
1522         qatomic_read(&s->job->copy_mode) == MIRROR_COPY_MODE_WRITE_BLOCKING;
1523 }
1524 
1525 static int coroutine_fn GRAPH_RDLOCK
1526 bdrv_mirror_top_do_write(BlockDriverState *bs, MirrorMethod method,
1527                          bool copy_to_target, uint64_t offset, uint64_t bytes,
1528                          QEMUIOVector *qiov, int flags)
1529 {
1530     MirrorOp *op = NULL;
1531     MirrorBDSOpaque *s = bs->opaque;
1532     int ret = 0;
1533 
1534     if (copy_to_target) {
1535         op = active_write_prepare(s->job, offset, bytes);
1536     }
1537 
1538     switch (method) {
1539     case MIRROR_METHOD_COPY:
1540         ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
1541         break;
1542 
1543     case MIRROR_METHOD_ZERO:
1544         ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
1545         break;
1546 
1547     case MIRROR_METHOD_DISCARD:
1548         ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1549         break;
1550 
1551     default:
1552         abort();
1553     }
1554 
1555     if (!copy_to_target && s->job && s->job->dirty_bitmap) {
1556         qatomic_set(&s->job->actively_synced, false);
1557         bdrv_set_dirty_bitmap(s->job->dirty_bitmap, offset, bytes);
1558     }
1559 
1560     if (ret < 0) {
1561         goto out;
1562     }
1563 
1564     if (copy_to_target) {
1565         do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
1566     }
1567 
1568 out:
1569     if (copy_to_target) {
1570         active_write_settle(op);
1571     }
1572     return ret;
1573 }
1574 
1575 static int coroutine_fn GRAPH_RDLOCK
1576 bdrv_mirror_top_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1577                         QEMUIOVector *qiov, BdrvRequestFlags flags)
1578 {
1579     QEMUIOVector bounce_qiov;
1580     void *bounce_buf;
1581     int ret = 0;
1582     bool copy_to_target = should_copy_to_target(bs->opaque);
1583 
1584     if (copy_to_target) {
1585         /* The guest might concurrently modify the data to write; but
1586          * the data on source and destination must match, so we have
1587          * to use a bounce buffer if we are going to write to the
1588          * target now. */
1589         bounce_buf = qemu_blockalign(bs, bytes);
1590         iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
1591 
1592         qemu_iovec_init(&bounce_qiov, 1);
1593         qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
1594         qiov = &bounce_qiov;
1595 
1596         flags &= ~BDRV_REQ_REGISTERED_BUF;
1597     }
1598 
1599     ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, copy_to_target,
1600                                    offset, bytes, qiov, flags);
1601 
1602     if (copy_to_target) {
1603         qemu_iovec_destroy(&bounce_qiov);
1604         qemu_vfree(bounce_buf);
1605     }
1606 
1607     return ret;
1608 }
1609 
1610 static int coroutine_fn GRAPH_RDLOCK bdrv_mirror_top_flush(BlockDriverState *bs)
1611 {
1612     if (bs->backing == NULL) {
1613         /* we can be here after failed bdrv_append in mirror_start_job */
1614         return 0;
1615     }
1616     return bdrv_co_flush(bs->backing->bs);
1617 }
1618 
1619 static int coroutine_fn GRAPH_RDLOCK
1620 bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1621                               int64_t bytes, BdrvRequestFlags flags)
1622 {
1623     bool copy_to_target = should_copy_to_target(bs->opaque);
1624     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, copy_to_target,
1625                                     offset, bytes, NULL, flags);
1626 }
1627 
1628 static int coroutine_fn GRAPH_RDLOCK
1629 bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1630 {
1631     bool copy_to_target = should_copy_to_target(bs->opaque);
1632     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, copy_to_target,
1633                                     offset, bytes, NULL, 0);
1634 }
1635 
1636 static void GRAPH_RDLOCK bdrv_mirror_top_refresh_filename(BlockDriverState *bs)
1637 {
1638     if (bs->backing == NULL) {
1639         /* we can be here after failed bdrv_attach_child in
1640          * bdrv_set_backing_hd */
1641         return;
1642     }
1643     pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
1644             bs->backing->bs->filename);
1645 }
1646 
1647 static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
1648                                        BdrvChildRole role,
1649                                        BlockReopenQueue *reopen_queue,
1650                                        uint64_t perm, uint64_t shared,
1651                                        uint64_t *nperm, uint64_t *nshared)
1652 {
1653     MirrorBDSOpaque *s = bs->opaque;
1654 
1655     if (s->stop) {
1656         /*
1657          * If the job is to be stopped, we do not need to forward
1658          * anything to the real image.
1659          */
1660         *nperm = 0;
1661         *nshared = BLK_PERM_ALL;
1662         return;
1663     }
1664 
1665     bdrv_default_perms(bs, c, role, reopen_queue,
1666                        perm, shared, nperm, nshared);
1667 
1668     if (s->is_commit) {
1669         /*
1670          * For commit jobs, we cannot take CONSISTENT_READ, because
1671          * that permission is unshared for everything above the base
1672          * node (except for filters on the base node).
1673          * We also have to force-share the WRITE permission, or
1674          * otherwise we would block ourselves at the base node (if
1675          * writes are blocked for a node, they are also blocked for
1676          * its backing file).
1677          * (We could also share RESIZE, because it may be needed for
1678          * the target if its size is less than the top node's; but
1679          * bdrv_default_perms_for_cow() automatically shares RESIZE
1680          * for backing nodes if WRITE is shared, so there is no need
1681          * to do it here.)
1682          */
1683         *nperm &= ~BLK_PERM_CONSISTENT_READ;
1684         *nshared |= BLK_PERM_WRITE;
1685     }
1686 }
1687 
1688 /* Dummy node that provides consistent read to its users without requiring it
1689  * from its backing file and that allows writes on the backing file chain. */
1690 static BlockDriver bdrv_mirror_top = {
1691     .format_name                = "mirror_top",
1692     .bdrv_co_preadv             = bdrv_mirror_top_preadv,
1693     .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
1694     .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
1695     .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
1696     .bdrv_co_flush              = bdrv_mirror_top_flush,
1697     .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1698     .bdrv_child_perm            = bdrv_mirror_top_child_perm,
1699 
1700     .is_filter                  = true,
1701     .filtered_child_is_backing  = true,
1702 };
1703 
1704 static BlockJob *mirror_start_job(
1705                              const char *job_id, BlockDriverState *bs,
1706                              int creation_flags, BlockDriverState *target,
1707                              const char *replaces, int64_t speed,
1708                              uint32_t granularity, int64_t buf_size,
1709                              BlockMirrorBackingMode backing_mode,
1710                              bool zero_target,
1711                              BlockdevOnError on_source_error,
1712                              BlockdevOnError on_target_error,
1713                              bool unmap,
1714                              BlockCompletionFunc *cb,
1715                              void *opaque,
1716                              const BlockJobDriver *driver,
1717                              bool is_none_mode, BlockDriverState *base,
1718                              bool auto_complete, const char *filter_node_name,
1719                              bool is_mirror, MirrorCopyMode copy_mode,
1720                              Error **errp)
1721 {
1722     MirrorBlockJob *s;
1723     MirrorBDSOpaque *bs_opaque;
1724     BlockDriverState *mirror_top_bs;
1725     bool target_is_backing;
1726     uint64_t target_perms, target_shared_perms;
1727     int ret;
1728 
1729     GLOBAL_STATE_CODE();
1730 
1731     if (granularity == 0) {
1732         granularity = bdrv_get_default_bitmap_granularity(target);
1733     }
1734 
1735     assert(is_power_of_2(granularity));
1736 
1737     if (buf_size < 0) {
1738         error_setg(errp, "Invalid parameter 'buf-size'");
1739         return NULL;
1740     }
1741 
1742     if (buf_size == 0) {
1743         buf_size = DEFAULT_MIRROR_BUF_SIZE;
1744     }
1745 
1746     bdrv_graph_rdlock_main_loop();
1747     if (bdrv_skip_filters(bs) == bdrv_skip_filters(target)) {
1748         error_setg(errp, "Can't mirror node into itself");
1749         bdrv_graph_rdunlock_main_loop();
1750         return NULL;
1751     }
1752 
1753     target_is_backing = bdrv_chain_contains(bs, target);
1754     bdrv_graph_rdunlock_main_loop();
1755 
1756     /* In the case of active commit, add dummy driver to provide consistent
1757      * reads on the top, while disabling it in the intermediate nodes, and make
1758      * the backing chain writable. */
1759     mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
1760                                          BDRV_O_RDWR, errp);
1761     if (mirror_top_bs == NULL) {
1762         return NULL;
1763     }
1764     if (!filter_node_name) {
1765         mirror_top_bs->implicit = true;
1766     }
1767 
1768     /* So that we can always drop this node */
1769     mirror_top_bs->never_freeze = true;
1770 
1771     mirror_top_bs->total_sectors = bs->total_sectors;
1772     mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1773     mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED |
1774                                           BDRV_REQ_NO_FALLBACK;
1775     bs_opaque = g_new0(MirrorBDSOpaque, 1);
1776     mirror_top_bs->opaque = bs_opaque;
1777 
1778     bs_opaque->is_commit = target_is_backing;
1779 
1780     bdrv_drained_begin(bs);
1781     ret = bdrv_append(mirror_top_bs, bs, errp);
1782     bdrv_drained_end(bs);
1783 
1784     if (ret < 0) {
1785         bdrv_unref(mirror_top_bs);
1786         return NULL;
1787     }
1788 
1789     /* Make sure that the source is not resized while the job is running */
1790     s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1791                          BLK_PERM_CONSISTENT_READ,
1792                          BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1793                          BLK_PERM_WRITE, speed,
1794                          creation_flags, cb, opaque, errp);
1795     if (!s) {
1796         goto fail;
1797     }
1798 
1799     /* The block job now has a reference to this node */
1800     bdrv_unref(mirror_top_bs);
1801 
1802     s->mirror_top_bs = mirror_top_bs;
1803 
1804     /* No resize for the target either; while the mirror is still running, a
1805      * consistent read isn't necessarily possible. We could possibly allow
1806      * writes and graph modifications, though it would likely defeat the
1807      * purpose of a mirror, so leave them blocked for now.
1808      *
1809      * In the case of active commit, things look a bit different, though,
1810      * because the target is an already populated backing file in active use.
1811      * We can allow anything except resize there.*/
1812 
1813     target_perms = BLK_PERM_WRITE;
1814     target_shared_perms = BLK_PERM_WRITE_UNCHANGED;
1815 
1816     if (target_is_backing) {
1817         int64_t bs_size, target_size;
1818         bs_size = bdrv_getlength(bs);
1819         if (bs_size < 0) {
1820             error_setg_errno(errp, -bs_size,
1821                              "Could not inquire top image size");
1822             goto fail;
1823         }
1824 
1825         target_size = bdrv_getlength(target);
1826         if (target_size < 0) {
1827             error_setg_errno(errp, -target_size,
1828                              "Could not inquire base image size");
1829             goto fail;
1830         }
1831 
1832         if (target_size < bs_size) {
1833             target_perms |= BLK_PERM_RESIZE;
1834         }
1835 
1836         target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
1837     } else {
1838         bdrv_graph_rdlock_main_loop();
1839         if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
1840             /*
1841              * We may want to allow this in the future, but it would
1842              * require taking some extra care.
1843              */
1844             error_setg(errp, "Cannot mirror to a filter on top of a node in "
1845                        "the source's backing chain");
1846             bdrv_graph_rdunlock_main_loop();
1847             goto fail;
1848         }
1849         bdrv_graph_rdunlock_main_loop();
1850     }
1851 
1852     s->target = blk_new(s->common.job.aio_context,
1853                         target_perms, target_shared_perms);
1854     ret = blk_insert_bs(s->target, target, errp);
1855     if (ret < 0) {
1856         goto fail;
1857     }
1858     if (is_mirror) {
1859         /* XXX: Mirror target could be a NBD server of target QEMU in the case
1860          * of non-shared block migration. To allow migration completion, we
1861          * have to allow "inactivate" of the target BB.  When that happens, we
1862          * know the job is drained, and the vcpus are stopped, so no write
1863          * operation will be performed. Block layer already has assertions to
1864          * ensure that. */
1865         blk_set_force_allow_inactivate(s->target);
1866     }
1867     blk_set_allow_aio_context_change(s->target, true);
1868     blk_set_disable_request_queuing(s->target, true);
1869 
1870     bdrv_graph_rdlock_main_loop();
1871     s->replaces = g_strdup(replaces);
1872     s->on_source_error = on_source_error;
1873     s->on_target_error = on_target_error;
1874     s->is_none_mode = is_none_mode;
1875     s->backing_mode = backing_mode;
1876     s->zero_target = zero_target;
1877     qatomic_set(&s->copy_mode, copy_mode);
1878     s->base = base;
1879     s->base_overlay = bdrv_find_overlay(bs, base);
1880     s->granularity = granularity;
1881     s->buf_size = ROUND_UP(buf_size, granularity);
1882     s->unmap = unmap;
1883     if (auto_complete) {
1884         s->should_complete = true;
1885     }
1886     bdrv_graph_rdunlock_main_loop();
1887 
1888     s->dirty_bitmap = bdrv_create_dirty_bitmap(s->mirror_top_bs, granularity,
1889                                                NULL, errp);
1890     if (!s->dirty_bitmap) {
1891         goto fail;
1892     }
1893 
1894     /*
1895      * The dirty bitmap is set by bdrv_mirror_top_do_write() when not in active
1896      * mode.
1897      */
1898     bdrv_disable_dirty_bitmap(s->dirty_bitmap);
1899 
1900     bdrv_graph_wrlock();
1901     ret = block_job_add_bdrv(&s->common, "source", bs, 0,
1902                              BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE |
1903                              BLK_PERM_CONSISTENT_READ,
1904                              errp);
1905     if (ret < 0) {
1906         bdrv_graph_wrunlock();
1907         goto fail;
1908     }
1909 
1910     /* Required permissions are already taken with blk_new() */
1911     block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
1912                        &error_abort);
1913 
1914     /* In commit_active_start() all intermediate nodes disappear, so
1915      * any jobs in them must be blocked */
1916     if (target_is_backing) {
1917         BlockDriverState *iter, *filtered_target;
1918         uint64_t iter_shared_perms;
1919 
1920         /*
1921          * The topmost node with
1922          * bdrv_skip_filters(filtered_target) == bdrv_skip_filters(target)
1923          */
1924         filtered_target = bdrv_cow_bs(bdrv_find_overlay(bs, target));
1925 
1926         assert(bdrv_skip_filters(filtered_target) ==
1927                bdrv_skip_filters(target));
1928 
1929         /*
1930          * XXX BLK_PERM_WRITE needs to be allowed so we don't block
1931          * ourselves at s->base (if writes are blocked for a node, they are
1932          * also blocked for its backing file). The other options would be a
1933          * second filter driver above s->base (== target).
1934          */
1935         iter_shared_perms = BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE;
1936 
1937         for (iter = bdrv_filter_or_cow_bs(bs); iter != target;
1938              iter = bdrv_filter_or_cow_bs(iter))
1939         {
1940             if (iter == filtered_target) {
1941                 /*
1942                  * From here on, all nodes are filters on the base.
1943                  * This allows us to share BLK_PERM_CONSISTENT_READ.
1944                  */
1945                 iter_shared_perms |= BLK_PERM_CONSISTENT_READ;
1946             }
1947 
1948             ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
1949                                      iter_shared_perms, errp);
1950             if (ret < 0) {
1951                 bdrv_graph_wrunlock();
1952                 goto fail;
1953             }
1954         }
1955 
1956         if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) {
1957             bdrv_graph_wrunlock();
1958             goto fail;
1959         }
1960     }
1961     bdrv_graph_wrunlock();
1962 
1963     QTAILQ_INIT(&s->ops_in_flight);
1964 
1965     trace_mirror_start(bs, s, opaque);
1966     job_start(&s->common.job);
1967 
1968     return &s->common;
1969 
1970 fail:
1971     if (s) {
1972         /* Make sure this BDS does not go away until we have completed the graph
1973          * changes below */
1974         bdrv_ref(mirror_top_bs);
1975 
1976         g_free(s->replaces);
1977         blk_unref(s->target);
1978         bs_opaque->job = NULL;
1979         if (s->dirty_bitmap) {
1980             bdrv_release_dirty_bitmap(s->dirty_bitmap);
1981         }
1982         job_early_fail(&s->common.job);
1983     }
1984 
1985     bs_opaque->stop = true;
1986     bdrv_drained_begin(bs);
1987     bdrv_graph_wrlock();
1988     assert(mirror_top_bs->backing->bs == bs);
1989     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
1990                              &error_abort);
1991     bdrv_replace_node(mirror_top_bs, bs, &error_abort);
1992     bdrv_graph_wrunlock();
1993     bdrv_drained_end(bs);
1994 
1995     bdrv_unref(mirror_top_bs);
1996 
1997     return NULL;
1998 }
1999 
2000 void mirror_start(const char *job_id, BlockDriverState *bs,
2001                   BlockDriverState *target, const char *replaces,
2002                   int creation_flags, int64_t speed,
2003                   uint32_t granularity, int64_t buf_size,
2004                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
2005                   bool zero_target,
2006                   BlockdevOnError on_source_error,
2007                   BlockdevOnError on_target_error,
2008                   bool unmap, const char *filter_node_name,
2009                   MirrorCopyMode copy_mode, Error **errp)
2010 {
2011     bool is_none_mode;
2012     BlockDriverState *base;
2013 
2014     GLOBAL_STATE_CODE();
2015 
2016     if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
2017         (mode == MIRROR_SYNC_MODE_BITMAP)) {
2018         error_setg(errp, "Sync mode '%s' not supported",
2019                    MirrorSyncMode_str(mode));
2020         return;
2021     }
2022 
2023     bdrv_graph_rdlock_main_loop();
2024     is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
2025     base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
2026     bdrv_graph_rdunlock_main_loop();
2027 
2028     mirror_start_job(job_id, bs, creation_flags, target, replaces,
2029                      speed, granularity, buf_size, backing_mode, zero_target,
2030                      on_source_error, on_target_error, unmap, NULL, NULL,
2031                      &mirror_job_driver, is_none_mode, base, false,
2032                      filter_node_name, true, copy_mode, errp);
2033 }
2034 
2035 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
2036                               BlockDriverState *base, int creation_flags,
2037                               int64_t speed, BlockdevOnError on_error,
2038                               const char *filter_node_name,
2039                               BlockCompletionFunc *cb, void *opaque,
2040                               bool auto_complete, Error **errp)
2041 {
2042     bool base_read_only;
2043     BlockJob *job;
2044 
2045     GLOBAL_STATE_CODE();
2046 
2047     base_read_only = bdrv_is_read_only(base);
2048 
2049     if (base_read_only) {
2050         if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
2051             return NULL;
2052         }
2053     }
2054 
2055     job = mirror_start_job(
2056                      job_id, bs, creation_flags, base, NULL, speed, 0, 0,
2057                      MIRROR_LEAVE_BACKING_CHAIN, false,
2058                      on_error, on_error, true, cb, opaque,
2059                      &commit_active_job_driver, false, base, auto_complete,
2060                      filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
2061                      errp);
2062     if (!job) {
2063         goto error_restore_flags;
2064     }
2065 
2066     return job;
2067 
2068 error_restore_flags:
2069     /* ignore error and errp for bdrv_reopen, because we want to propagate
2070      * the original error */
2071     if (base_read_only) {
2072         bdrv_reopen_set_read_only(base, true, NULL);
2073     }
2074     return NULL;
2075 }
2076