xref: /qemu/block/mirror.c (revision 69c22481)
1 /*
2  * Image mirroring
3  *
4  * Copyright Red Hat, Inc. 2012
5  *
6  * Authors:
7  *  Paolo Bonzini  <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qemu/cutils.h"
16 #include "qemu/coroutine.h"
17 #include "qemu/range.h"
18 #include "trace.h"
19 #include "block/blockjob_int.h"
20 #include "block/block_int.h"
21 #include "block/dirty-bitmap.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qemu/ratelimit.h"
25 #include "qemu/bitmap.h"
26 #include "qemu/memalign.h"
27 
28 #define MAX_IN_FLIGHT 16
29 #define MAX_IO_BYTES (1 << 20) /* 1 Mb */
30 #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
31 
32 /* The mirroring buffer is a list of granularity-sized chunks.
33  * Free chunks are organized in a list.
34  */
35 typedef struct MirrorBuffer {
36     QSIMPLEQ_ENTRY(MirrorBuffer) next;
37 } MirrorBuffer;
38 
39 typedef struct MirrorOp MirrorOp;
40 
41 typedef struct MirrorBlockJob {
42     BlockJob common;
43     BlockBackend *target;
44     BlockDriverState *mirror_top_bs;
45     BlockDriverState *base;
46     BlockDriverState *base_overlay;
47 
48     /* The name of the graph node to replace */
49     char *replaces;
50     /* The BDS to replace */
51     BlockDriverState *to_replace;
52     /* Used to block operations on the drive-mirror-replace target */
53     Error *replace_blocker;
54     bool is_none_mode;
55     BlockMirrorBackingMode backing_mode;
56     /* Whether the target image requires explicit zero-initialization */
57     bool zero_target;
58     /*
59      * To be accesssed with atomics. Written only under the BQL (required by the
60      * current implementation of mirror_change()).
61      */
62     MirrorCopyMode copy_mode;
63     BlockdevOnError on_source_error, on_target_error;
64     /*
65      * To be accessed with atomics.
66      *
67      * Set when the target is synced (dirty bitmap is clean, nothing in flight)
68      * and the job is running in active mode.
69      */
70     bool actively_synced;
71     bool should_complete;
72     int64_t granularity;
73     size_t buf_size;
74     int64_t bdev_length;
75     unsigned long *cow_bitmap;
76     BdrvDirtyBitmap *dirty_bitmap;
77     BdrvDirtyBitmapIter *dbi;
78     uint8_t *buf;
79     QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
80     int buf_free_count;
81 
82     uint64_t last_pause_ns;
83     unsigned long *in_flight_bitmap;
84     unsigned in_flight;
85     int64_t bytes_in_flight;
86     QTAILQ_HEAD(, MirrorOp) ops_in_flight;
87     int ret;
88     bool unmap;
89     int target_cluster_size;
90     int max_iov;
91     bool initial_zeroing_ongoing;
92     int in_active_write_counter;
93     int64_t active_write_bytes_in_flight;
94     bool prepared;
95     bool in_drain;
96 } MirrorBlockJob;
97 
98 typedef struct MirrorBDSOpaque {
99     MirrorBlockJob *job;
100     bool stop;
101     bool is_commit;
102 } MirrorBDSOpaque;
103 
104 struct MirrorOp {
105     MirrorBlockJob *s;
106     QEMUIOVector qiov;
107     int64_t offset;
108     uint64_t bytes;
109 
110     /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
111      * mirror_co_discard() before yielding for the first time */
112     int64_t *bytes_handled;
113 
114     bool is_pseudo_op;
115     bool is_active_write;
116     bool is_in_flight;
117     CoQueue waiting_requests;
118     Coroutine *co;
119     MirrorOp *waiting_for_op;
120 
121     QTAILQ_ENTRY(MirrorOp) next;
122 };
123 
124 typedef enum MirrorMethod {
125     MIRROR_METHOD_COPY,
126     MIRROR_METHOD_ZERO,
127     MIRROR_METHOD_DISCARD,
128 } MirrorMethod;
129 
130 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
131                                             int error)
132 {
133     qatomic_set(&s->actively_synced, false);
134     if (read) {
135         return block_job_error_action(&s->common, s->on_source_error,
136                                       true, error);
137     } else {
138         return block_job_error_action(&s->common, s->on_target_error,
139                                       false, error);
140     }
141 }
142 
143 static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
144                                                   MirrorBlockJob *s,
145                                                   uint64_t offset,
146                                                   uint64_t bytes)
147 {
148     uint64_t self_start_chunk = offset / s->granularity;
149     uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
150     uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
151 
152     while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
153                          self_start_chunk) < self_end_chunk &&
154            s->ret >= 0)
155     {
156         MirrorOp *op;
157 
158         QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
159             uint64_t op_start_chunk = op->offset / s->granularity;
160             uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
161                                                  s->granularity) -
162                                     op_start_chunk;
163 
164             if (op == self) {
165                 continue;
166             }
167 
168             if (ranges_overlap(self_start_chunk, self_nb_chunks,
169                                op_start_chunk, op_nb_chunks))
170             {
171                 if (self) {
172                     /*
173                      * If the operation is already (indirectly) waiting for us,
174                      * or will wait for us as soon as it wakes up, then just go
175                      * on (instead of producing a deadlock in the former case).
176                      */
177                     if (op->waiting_for_op) {
178                         continue;
179                     }
180 
181                     self->waiting_for_op = op;
182                 }
183 
184                 qemu_co_queue_wait(&op->waiting_requests, NULL);
185 
186                 if (self) {
187                     self->waiting_for_op = NULL;
188                 }
189 
190                 break;
191             }
192         }
193     }
194 }
195 
196 static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
197 {
198     MirrorBlockJob *s = op->s;
199     struct iovec *iov;
200     int64_t chunk_num;
201     int i, nb_chunks;
202 
203     trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
204 
205     s->in_flight--;
206     s->bytes_in_flight -= op->bytes;
207     iov = op->qiov.iov;
208     for (i = 0; i < op->qiov.niov; i++) {
209         MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
210         QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
211         s->buf_free_count++;
212     }
213 
214     chunk_num = op->offset / s->granularity;
215     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
216 
217     bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
218     QTAILQ_REMOVE(&s->ops_in_flight, op, next);
219     if (ret >= 0) {
220         if (s->cow_bitmap) {
221             bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
222         }
223         if (!s->initial_zeroing_ongoing) {
224             job_progress_update(&s->common.job, op->bytes);
225         }
226     }
227     qemu_iovec_destroy(&op->qiov);
228 
229     qemu_co_queue_restart_all(&op->waiting_requests);
230     g_free(op);
231 }
232 
233 static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
234 {
235     MirrorBlockJob *s = op->s;
236 
237     if (ret < 0) {
238         BlockErrorAction action;
239 
240         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
241         action = mirror_error_action(s, false, -ret);
242         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
243             s->ret = ret;
244         }
245     }
246 
247     mirror_iteration_done(op, ret);
248 }
249 
250 static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
251 {
252     MirrorBlockJob *s = op->s;
253 
254     if (ret < 0) {
255         BlockErrorAction action;
256 
257         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
258         action = mirror_error_action(s, true, -ret);
259         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
260             s->ret = ret;
261         }
262 
263         mirror_iteration_done(op, ret);
264         return;
265     }
266 
267     ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
268     mirror_write_complete(op, ret);
269 }
270 
271 /* Clip bytes relative to offset to not exceed end-of-file */
272 static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
273                                         int64_t offset,
274                                         int64_t bytes)
275 {
276     return MIN(bytes, s->bdev_length - offset);
277 }
278 
279 /* Round offset and/or bytes to target cluster if COW is needed, and
280  * return the offset of the adjusted tail against original. */
281 static int coroutine_fn mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
282                                          uint64_t *bytes)
283 {
284     bool need_cow;
285     int ret = 0;
286     int64_t align_offset = *offset;
287     int64_t align_bytes = *bytes;
288     int max_bytes = s->granularity * s->max_iov;
289 
290     need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
291     need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
292                           s->cow_bitmap);
293     if (need_cow) {
294         bdrv_round_to_subclusters(blk_bs(s->target), *offset, *bytes,
295                                   &align_offset, &align_bytes);
296     }
297 
298     if (align_bytes > max_bytes) {
299         align_bytes = max_bytes;
300         if (need_cow) {
301             align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
302         }
303     }
304     /* Clipping may result in align_bytes unaligned to chunk boundary, but
305      * that doesn't matter because it's already the end of source image. */
306     align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
307 
308     ret = align_offset + align_bytes - (*offset + *bytes);
309     *offset = align_offset;
310     *bytes = align_bytes;
311     assert(ret >= 0);
312     return ret;
313 }
314 
315 static inline void coroutine_fn
316 mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
317 {
318     MirrorOp *op;
319 
320     QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
321         /*
322          * Do not wait on pseudo ops, because it may in turn wait on
323          * some other operation to start, which may in fact be the
324          * caller of this function.  Since there is only one pseudo op
325          * at any given time, we will always find some real operation
326          * to wait on.
327          * Also, do not wait on active operations, because they do not
328          * use up in-flight slots.
329          */
330         if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
331             qemu_co_queue_wait(&op->waiting_requests, NULL);
332             return;
333         }
334     }
335     abort();
336 }
337 
338 /* Perform a mirror copy operation.
339  *
340  * *op->bytes_handled is set to the number of bytes copied after and
341  * including offset, excluding any bytes copied prior to offset due
342  * to alignment.  This will be op->bytes if no alignment is necessary,
343  * or (new_end - op->offset) if the tail is rounded up or down due to
344  * alignment or buffer limit.
345  */
346 static void coroutine_fn mirror_co_read(void *opaque)
347 {
348     MirrorOp *op = opaque;
349     MirrorBlockJob *s = op->s;
350     int nb_chunks;
351     uint64_t ret;
352     uint64_t max_bytes;
353 
354     max_bytes = s->granularity * s->max_iov;
355 
356     /* We can only handle as much as buf_size at a time. */
357     op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
358     assert(op->bytes);
359     assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
360     *op->bytes_handled = op->bytes;
361 
362     if (s->cow_bitmap) {
363         *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
364     }
365     /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
366     assert(*op->bytes_handled <= UINT_MAX);
367     assert(op->bytes <= s->buf_size);
368     /* The offset is granularity-aligned because:
369      * 1) Caller passes in aligned values;
370      * 2) mirror_cow_align is used only when target cluster is larger. */
371     assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
372     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
373     assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
374     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
375 
376     while (s->buf_free_count < nb_chunks) {
377         trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
378         mirror_wait_for_free_in_flight_slot(s);
379     }
380 
381     /* Now make a QEMUIOVector taking enough granularity-sized chunks
382      * from s->buf_free.
383      */
384     qemu_iovec_init(&op->qiov, nb_chunks);
385     while (nb_chunks-- > 0) {
386         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
387         size_t remaining = op->bytes - op->qiov.size;
388 
389         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
390         s->buf_free_count--;
391         qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
392     }
393 
394     /* Copy the dirty cluster.  */
395     s->in_flight++;
396     s->bytes_in_flight += op->bytes;
397     op->is_in_flight = true;
398     trace_mirror_one_iteration(s, op->offset, op->bytes);
399 
400     WITH_GRAPH_RDLOCK_GUARD() {
401         ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
402                              &op->qiov, 0);
403     }
404     mirror_read_complete(op, ret);
405 }
406 
407 static void coroutine_fn mirror_co_zero(void *opaque)
408 {
409     MirrorOp *op = opaque;
410     int ret;
411 
412     op->s->in_flight++;
413     op->s->bytes_in_flight += op->bytes;
414     *op->bytes_handled = op->bytes;
415     op->is_in_flight = true;
416 
417     ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
418                                op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
419     mirror_write_complete(op, ret);
420 }
421 
422 static void coroutine_fn mirror_co_discard(void *opaque)
423 {
424     MirrorOp *op = opaque;
425     int ret;
426 
427     op->s->in_flight++;
428     op->s->bytes_in_flight += op->bytes;
429     *op->bytes_handled = op->bytes;
430     op->is_in_flight = true;
431 
432     ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
433     mirror_write_complete(op, ret);
434 }
435 
436 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
437                                unsigned bytes, MirrorMethod mirror_method)
438 {
439     MirrorOp *op;
440     Coroutine *co;
441     int64_t bytes_handled = -1;
442 
443     op = g_new(MirrorOp, 1);
444     *op = (MirrorOp){
445         .s              = s,
446         .offset         = offset,
447         .bytes          = bytes,
448         .bytes_handled  = &bytes_handled,
449     };
450     qemu_co_queue_init(&op->waiting_requests);
451 
452     switch (mirror_method) {
453     case MIRROR_METHOD_COPY:
454         co = qemu_coroutine_create(mirror_co_read, op);
455         break;
456     case MIRROR_METHOD_ZERO:
457         co = qemu_coroutine_create(mirror_co_zero, op);
458         break;
459     case MIRROR_METHOD_DISCARD:
460         co = qemu_coroutine_create(mirror_co_discard, op);
461         break;
462     default:
463         abort();
464     }
465     op->co = co;
466 
467     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
468     qemu_coroutine_enter(co);
469     /* At this point, ownership of op has been moved to the coroutine
470      * and the object may already be freed */
471 
472     /* Assert that this value has been set */
473     assert(bytes_handled >= 0);
474 
475     /* Same assertion as in mirror_co_read() (and for mirror_co_read()
476      * and mirror_co_discard(), bytes_handled == op->bytes, which
477      * is the @bytes parameter given to this function) */
478     assert(bytes_handled <= UINT_MAX);
479     return bytes_handled;
480 }
481 
482 static void coroutine_fn GRAPH_RDLOCK mirror_iteration(MirrorBlockJob *s)
483 {
484     BlockDriverState *source = s->mirror_top_bs->backing->bs;
485     MirrorOp *pseudo_op;
486     int64_t offset;
487     /* At least the first dirty chunk is mirrored in one iteration. */
488     int nb_chunks = 1;
489     bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
490     int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
491 
492     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
493     offset = bdrv_dirty_iter_next(s->dbi);
494     if (offset < 0) {
495         bdrv_set_dirty_iter(s->dbi, 0);
496         offset = bdrv_dirty_iter_next(s->dbi);
497         trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
498         assert(offset >= 0);
499     }
500     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
501 
502     /*
503      * Wait for concurrent requests to @offset.  The next loop will limit the
504      * copied area based on in_flight_bitmap so we only copy an area that does
505      * not overlap with concurrent in-flight requests.  Still, we would like to
506      * copy something, so wait until there are at least no more requests to the
507      * very beginning of the area.
508      */
509     mirror_wait_on_conflicts(NULL, s, offset, 1);
510 
511     job_pause_point(&s->common.job);
512 
513     /* Find the number of consecutive dirty chunks following the first dirty
514      * one, and wait for in flight requests in them. */
515     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
516     while (nb_chunks * s->granularity < s->buf_size) {
517         int64_t next_dirty;
518         int64_t next_offset = offset + nb_chunks * s->granularity;
519         int64_t next_chunk = next_offset / s->granularity;
520         if (next_offset >= s->bdev_length ||
521             !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) {
522             break;
523         }
524         if (test_bit(next_chunk, s->in_flight_bitmap)) {
525             break;
526         }
527 
528         next_dirty = bdrv_dirty_iter_next(s->dbi);
529         if (next_dirty > next_offset || next_dirty < 0) {
530             /* The bitmap iterator's cache is stale, refresh it */
531             bdrv_set_dirty_iter(s->dbi, next_offset);
532             next_dirty = bdrv_dirty_iter_next(s->dbi);
533         }
534         assert(next_dirty == next_offset);
535         nb_chunks++;
536     }
537 
538     /* Clear dirty bits before querying the block status, because
539      * calling bdrv_block_status_above could yield - if some blocks are
540      * marked dirty in this window, we need to know.
541      */
542     bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
543                                    nb_chunks * s->granularity);
544     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
545 
546     /* Before claiming an area in the in-flight bitmap, we have to
547      * create a MirrorOp for it so that conflicting requests can wait
548      * for it.  mirror_perform() will create the real MirrorOps later,
549      * for now we just create a pseudo operation that will wake up all
550      * conflicting requests once all real operations have been
551      * launched. */
552     pseudo_op = g_new(MirrorOp, 1);
553     *pseudo_op = (MirrorOp){
554         .offset         = offset,
555         .bytes          = nb_chunks * s->granularity,
556         .is_pseudo_op   = true,
557     };
558     qemu_co_queue_init(&pseudo_op->waiting_requests);
559     QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
560 
561     bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
562     while (nb_chunks > 0 && offset < s->bdev_length) {
563         int ret;
564         int64_t io_bytes;
565         int64_t io_bytes_acct;
566         MirrorMethod mirror_method = MIRROR_METHOD_COPY;
567 
568         assert(!(offset % s->granularity));
569         WITH_GRAPH_RDLOCK_GUARD() {
570             ret = bdrv_co_block_status_above(source, NULL, offset,
571                                              nb_chunks * s->granularity,
572                                              &io_bytes, NULL, NULL);
573         }
574         if (ret < 0) {
575             io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
576         } else if (ret & BDRV_BLOCK_DATA) {
577             io_bytes = MIN(io_bytes, max_io_bytes);
578         }
579 
580         io_bytes -= io_bytes % s->granularity;
581         if (io_bytes < s->granularity) {
582             io_bytes = s->granularity;
583         } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
584             int64_t target_offset;
585             int64_t target_bytes;
586             WITH_GRAPH_RDLOCK_GUARD() {
587                 bdrv_round_to_subclusters(blk_bs(s->target), offset, io_bytes,
588                                           &target_offset, &target_bytes);
589             }
590             if (target_offset == offset &&
591                 target_bytes == io_bytes) {
592                 mirror_method = ret & BDRV_BLOCK_ZERO ?
593                                     MIRROR_METHOD_ZERO :
594                                     MIRROR_METHOD_DISCARD;
595             }
596         }
597 
598         while (s->in_flight >= MAX_IN_FLIGHT) {
599             trace_mirror_yield_in_flight(s, offset, s->in_flight);
600             mirror_wait_for_free_in_flight_slot(s);
601         }
602 
603         if (s->ret < 0) {
604             ret = 0;
605             goto fail;
606         }
607 
608         io_bytes = mirror_clip_bytes(s, offset, io_bytes);
609         io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
610         if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
611             io_bytes_acct = 0;
612         } else {
613             io_bytes_acct = io_bytes;
614         }
615         assert(io_bytes);
616         offset += io_bytes;
617         nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
618         block_job_ratelimit_processed_bytes(&s->common, io_bytes_acct);
619     }
620 
621 fail:
622     QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
623     qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
624     g_free(pseudo_op);
625 }
626 
627 static void mirror_free_init(MirrorBlockJob *s)
628 {
629     int granularity = s->granularity;
630     size_t buf_size = s->buf_size;
631     uint8_t *buf = s->buf;
632 
633     assert(s->buf_free_count == 0);
634     QSIMPLEQ_INIT(&s->buf_free);
635     while (buf_size != 0) {
636         MirrorBuffer *cur = (MirrorBuffer *)buf;
637         QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
638         s->buf_free_count++;
639         buf_size -= granularity;
640         buf += granularity;
641     }
642 }
643 
644 /* This is also used for the .pause callback. There is no matching
645  * mirror_resume() because mirror_run() will begin iterating again
646  * when the job is resumed.
647  */
648 static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s)
649 {
650     while (s->in_flight > 0) {
651         mirror_wait_for_free_in_flight_slot(s);
652     }
653 }
654 
655 /**
656  * mirror_exit_common: handle both abort() and prepare() cases.
657  * for .prepare, returns 0 on success and -errno on failure.
658  * for .abort cases, denoted by abort = true, MUST return 0.
659  */
660 static int mirror_exit_common(Job *job)
661 {
662     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
663     BlockJob *bjob = &s->common;
664     MirrorBDSOpaque *bs_opaque;
665     AioContext *replace_aio_context = NULL;
666     BlockDriverState *src;
667     BlockDriverState *target_bs;
668     BlockDriverState *mirror_top_bs;
669     Error *local_err = NULL;
670     bool abort = job->ret < 0;
671     int ret = 0;
672 
673     GLOBAL_STATE_CODE();
674 
675     if (s->prepared) {
676         return 0;
677     }
678     s->prepared = true;
679 
680     aio_context_acquire(qemu_get_aio_context());
681     bdrv_graph_rdlock_main_loop();
682 
683     mirror_top_bs = s->mirror_top_bs;
684     bs_opaque = mirror_top_bs->opaque;
685     src = mirror_top_bs->backing->bs;
686     target_bs = blk_bs(s->target);
687 
688     if (bdrv_chain_contains(src, target_bs)) {
689         bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs);
690     }
691 
692     bdrv_release_dirty_bitmap(s->dirty_bitmap);
693 
694     /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
695      * before we can call bdrv_drained_end */
696     bdrv_ref(src);
697     bdrv_ref(mirror_top_bs);
698     bdrv_ref(target_bs);
699 
700     bdrv_graph_rdunlock_main_loop();
701 
702     /*
703      * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
704      * inserting target_bs at s->to_replace, where we might not be able to get
705      * these permissions.
706      */
707     blk_unref(s->target);
708     s->target = NULL;
709 
710     /* We don't access the source any more. Dropping any WRITE/RESIZE is
711      * required before it could become a backing file of target_bs. Not having
712      * these permissions any more means that we can't allow any new requests on
713      * mirror_top_bs from now on, so keep it drained. */
714     bdrv_drained_begin(mirror_top_bs);
715     bdrv_drained_begin(target_bs);
716     bs_opaque->stop = true;
717 
718     bdrv_graph_rdlock_main_loop();
719     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
720                              &error_abort);
721 
722     if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
723         BlockDriverState *backing = s->is_none_mode ? src : s->base;
724         BlockDriverState *unfiltered_target = bdrv_skip_filters(target_bs);
725 
726         if (bdrv_cow_bs(unfiltered_target) != backing) {
727             bdrv_set_backing_hd(unfiltered_target, backing, &local_err);
728             if (local_err) {
729                 error_report_err(local_err);
730                 local_err = NULL;
731                 ret = -EPERM;
732             }
733         }
734     } else if (!abort && s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
735         assert(!bdrv_backing_chain_next(target_bs));
736         ret = bdrv_open_backing_file(bdrv_skip_filters(target_bs), NULL,
737                                      "backing", &local_err);
738         if (ret < 0) {
739             error_report_err(local_err);
740             local_err = NULL;
741         }
742     }
743     bdrv_graph_rdunlock_main_loop();
744 
745     if (s->to_replace) {
746         replace_aio_context = bdrv_get_aio_context(s->to_replace);
747         aio_context_acquire(replace_aio_context);
748     }
749 
750     if (s->should_complete && !abort) {
751         BlockDriverState *to_replace = s->to_replace ?: src;
752         bool ro = bdrv_is_read_only(to_replace);
753 
754         if (ro != bdrv_is_read_only(target_bs)) {
755             bdrv_reopen_set_read_only(target_bs, ro, NULL);
756         }
757 
758         /* The mirror job has no requests in flight any more, but we need to
759          * drain potential other users of the BDS before changing the graph. */
760         assert(s->in_drain);
761         bdrv_drained_begin(to_replace);
762         /*
763          * Cannot use check_to_replace_node() here, because that would
764          * check for an op blocker on @to_replace, and we have our own
765          * there.
766          */
767         bdrv_graph_wrlock(target_bs);
768         if (bdrv_recurse_can_replace(src, to_replace)) {
769             bdrv_replace_node(to_replace, target_bs, &local_err);
770         } else {
771             error_setg(&local_err, "Can no longer replace '%s' by '%s', "
772                        "because it can no longer be guaranteed that doing so "
773                        "would not lead to an abrupt change of visible data",
774                        to_replace->node_name, target_bs->node_name);
775         }
776         bdrv_graph_wrunlock(target_bs);
777         bdrv_drained_end(to_replace);
778         if (local_err) {
779             error_report_err(local_err);
780             ret = -EPERM;
781         }
782     }
783     if (s->to_replace) {
784         bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
785         error_free(s->replace_blocker);
786         bdrv_unref(s->to_replace);
787     }
788     if (replace_aio_context) {
789         aio_context_release(replace_aio_context);
790     }
791     g_free(s->replaces);
792 
793     /*
794      * Remove the mirror filter driver from the graph. Before this, get rid of
795      * the blockers on the intermediate nodes so that the resulting state is
796      * valid.
797      */
798     block_job_remove_all_bdrv(bjob);
799     bdrv_graph_wrlock(mirror_top_bs);
800     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
801     bdrv_graph_wrunlock(mirror_top_bs);
802 
803     bdrv_drained_end(target_bs);
804     bdrv_unref(target_bs);
805 
806     bs_opaque->job = NULL;
807 
808     bdrv_drained_end(src);
809     bdrv_drained_end(mirror_top_bs);
810     s->in_drain = false;
811     bdrv_unref(mirror_top_bs);
812     bdrv_unref(src);
813 
814     aio_context_release(qemu_get_aio_context());
815 
816     return ret;
817 }
818 
819 static int mirror_prepare(Job *job)
820 {
821     return mirror_exit_common(job);
822 }
823 
824 static void mirror_abort(Job *job)
825 {
826     int ret = mirror_exit_common(job);
827     assert(ret == 0);
828 }
829 
830 static void coroutine_fn mirror_throttle(MirrorBlockJob *s)
831 {
832     int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
833 
834     if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
835         s->last_pause_ns = now;
836         job_sleep_ns(&s->common.job, 0);
837     } else {
838         job_pause_point(&s->common.job);
839     }
840 }
841 
842 static int coroutine_fn GRAPH_UNLOCKED mirror_dirty_init(MirrorBlockJob *s)
843 {
844     int64_t offset;
845     BlockDriverState *bs;
846     BlockDriverState *target_bs = blk_bs(s->target);
847     int ret;
848     int64_t count;
849 
850     bdrv_graph_co_rdlock();
851     bs = s->mirror_top_bs->backing->bs;
852     bdrv_graph_co_rdunlock();
853 
854     if (s->zero_target) {
855         if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
856             bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
857             return 0;
858         }
859 
860         s->initial_zeroing_ongoing = true;
861         for (offset = 0; offset < s->bdev_length; ) {
862             int bytes = MIN(s->bdev_length - offset,
863                             QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
864 
865             mirror_throttle(s);
866 
867             if (job_is_cancelled(&s->common.job)) {
868                 s->initial_zeroing_ongoing = false;
869                 return 0;
870             }
871 
872             if (s->in_flight >= MAX_IN_FLIGHT) {
873                 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
874                                    s->in_flight);
875                 mirror_wait_for_free_in_flight_slot(s);
876                 continue;
877             }
878 
879             mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
880             offset += bytes;
881         }
882 
883         mirror_wait_for_all_io(s);
884         s->initial_zeroing_ongoing = false;
885     }
886 
887     /* First part, loop on the sectors and initialize the dirty bitmap.  */
888     for (offset = 0; offset < s->bdev_length; ) {
889         /* Just to make sure we are not exceeding int limit. */
890         int bytes = MIN(s->bdev_length - offset,
891                         QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
892 
893         mirror_throttle(s);
894 
895         if (job_is_cancelled(&s->common.job)) {
896             return 0;
897         }
898 
899         WITH_GRAPH_RDLOCK_GUARD() {
900             ret = bdrv_co_is_allocated_above(bs, s->base_overlay, true, offset,
901                                              bytes, &count);
902         }
903         if (ret < 0) {
904             return ret;
905         }
906 
907         assert(count);
908         if (ret > 0) {
909             bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
910         }
911         offset += count;
912     }
913     return 0;
914 }
915 
916 /* Called when going out of the streaming phase to flush the bulk of the
917  * data to the medium, or just before completing.
918  */
919 static int coroutine_fn mirror_flush(MirrorBlockJob *s)
920 {
921     int ret = blk_co_flush(s->target);
922     if (ret < 0) {
923         if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
924             s->ret = ret;
925         }
926     }
927     return ret;
928 }
929 
930 static int coroutine_fn mirror_run(Job *job, Error **errp)
931 {
932     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
933     BlockDriverState *bs;
934     MirrorBDSOpaque *mirror_top_opaque = s->mirror_top_bs->opaque;
935     BlockDriverState *target_bs = blk_bs(s->target);
936     bool need_drain = true;
937     BlockDeviceIoStatus iostatus;
938     int64_t length;
939     int64_t target_length;
940     BlockDriverInfo bdi;
941     char backing_filename[2]; /* we only need 2 characters because we are only
942                                  checking for a NULL string */
943     int ret = 0;
944 
945     bdrv_graph_co_rdlock();
946     bs = bdrv_filter_bs(s->mirror_top_bs);
947     bdrv_graph_co_rdunlock();
948 
949     if (job_is_cancelled(&s->common.job)) {
950         goto immediate_exit;
951     }
952 
953     bdrv_graph_co_rdlock();
954     s->bdev_length = bdrv_co_getlength(bs);
955     bdrv_graph_co_rdunlock();
956 
957     if (s->bdev_length < 0) {
958         ret = s->bdev_length;
959         goto immediate_exit;
960     }
961 
962     target_length = blk_co_getlength(s->target);
963     if (target_length < 0) {
964         ret = target_length;
965         goto immediate_exit;
966     }
967 
968     /* Active commit must resize the base image if its size differs from the
969      * active layer. */
970     if (s->base == blk_bs(s->target)) {
971         if (s->bdev_length > target_length) {
972             ret = blk_co_truncate(s->target, s->bdev_length, false,
973                                   PREALLOC_MODE_OFF, 0, NULL);
974             if (ret < 0) {
975                 goto immediate_exit;
976             }
977         }
978     } else if (s->bdev_length != target_length) {
979         error_setg(errp, "Source and target image have different sizes");
980         ret = -EINVAL;
981         goto immediate_exit;
982     }
983 
984     if (s->bdev_length == 0) {
985         /* Transition to the READY state and wait for complete. */
986         job_transition_to_ready(&s->common.job);
987         qatomic_set(&s->actively_synced, true);
988         while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
989             job_yield(&s->common.job);
990         }
991         goto immediate_exit;
992     }
993 
994     length = DIV_ROUND_UP(s->bdev_length, s->granularity);
995     s->in_flight_bitmap = bitmap_new(length);
996 
997     /* If we have no backing file yet in the destination, we cannot let
998      * the destination do COW.  Instead, we copy sectors around the
999      * dirty data if needed.  We need a bitmap to do that.
1000      */
1001     bdrv_get_backing_filename(target_bs, backing_filename,
1002                               sizeof(backing_filename));
1003     bdrv_graph_co_rdlock();
1004     if (!bdrv_co_get_info(target_bs, &bdi) && bdi.cluster_size) {
1005         s->target_cluster_size = bdi.cluster_size;
1006     } else {
1007         s->target_cluster_size = BDRV_SECTOR_SIZE;
1008     }
1009     if (backing_filename[0] && !bdrv_backing_chain_next(target_bs) &&
1010         s->granularity < s->target_cluster_size) {
1011         s->buf_size = MAX(s->buf_size, s->target_cluster_size);
1012         s->cow_bitmap = bitmap_new(length);
1013     }
1014     s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
1015     bdrv_graph_co_rdunlock();
1016 
1017     s->buf = qemu_try_blockalign(bs, s->buf_size);
1018     if (s->buf == NULL) {
1019         ret = -ENOMEM;
1020         goto immediate_exit;
1021     }
1022 
1023     mirror_free_init(s);
1024 
1025     s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1026     if (!s->is_none_mode) {
1027         ret = mirror_dirty_init(s);
1028         if (ret < 0 || job_is_cancelled(&s->common.job)) {
1029             goto immediate_exit;
1030         }
1031     }
1032 
1033     /*
1034      * Only now the job is fully initialised and mirror_top_bs should start
1035      * accessing it.
1036      */
1037     mirror_top_opaque->job = s;
1038 
1039     assert(!s->dbi);
1040     s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
1041     for (;;) {
1042         int64_t cnt, delta;
1043         bool should_complete;
1044 
1045         if (s->ret < 0) {
1046             ret = s->ret;
1047             goto immediate_exit;
1048         }
1049 
1050         job_pause_point(&s->common.job);
1051 
1052         if (job_is_cancelled(&s->common.job)) {
1053             ret = 0;
1054             goto immediate_exit;
1055         }
1056 
1057         cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1058         /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
1059          * the number of bytes currently being processed; together those are
1060          * the current remaining operation length */
1061         job_progress_set_remaining(&s->common.job,
1062                                    s->bytes_in_flight + cnt +
1063                                    s->active_write_bytes_in_flight);
1064 
1065         /* Note that even when no rate limit is applied we need to yield
1066          * periodically with no pending I/O so that bdrv_drain_all() returns.
1067          * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
1068          * an error, or when the source is clean, whichever comes first. */
1069         delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
1070         WITH_JOB_LOCK_GUARD() {
1071             iostatus = s->common.iostatus;
1072         }
1073         if (delta < BLOCK_JOB_SLICE_TIME &&
1074             iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1075             if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
1076                 (cnt == 0 && s->in_flight > 0)) {
1077                 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
1078                 mirror_wait_for_free_in_flight_slot(s);
1079                 continue;
1080             } else if (cnt != 0) {
1081                 bdrv_graph_co_rdlock();
1082                 mirror_iteration(s);
1083                 bdrv_graph_co_rdunlock();
1084             }
1085         }
1086 
1087         should_complete = false;
1088         if (s->in_flight == 0 && cnt == 0) {
1089             trace_mirror_before_flush(s);
1090             if (!job_is_ready(&s->common.job)) {
1091                 if (mirror_flush(s) < 0) {
1092                     /* Go check s->ret.  */
1093                     continue;
1094                 }
1095                 /* We're out of the streaming phase.  From now on, if the job
1096                  * is cancelled we will actually complete all pending I/O and
1097                  * report completion.  This way, block-job-cancel will leave
1098                  * the target in a consistent state.
1099                  */
1100                 job_transition_to_ready(&s->common.job);
1101             }
1102             if (qatomic_read(&s->copy_mode) != MIRROR_COPY_MODE_BACKGROUND) {
1103                 qatomic_set(&s->actively_synced, true);
1104             }
1105 
1106             should_complete = s->should_complete ||
1107                 job_cancel_requested(&s->common.job);
1108             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1109         }
1110 
1111         if (cnt == 0 && should_complete) {
1112             /* The dirty bitmap is not updated while operations are pending.
1113              * If we're about to exit, wait for pending operations before
1114              * calling bdrv_get_dirty_count(bs), or we may exit while the
1115              * source has dirty data to copy!
1116              *
1117              * Note that I/O can be submitted by the guest while
1118              * mirror_populate runs, so pause it now.  Before deciding
1119              * whether to switch to target check one last time if I/O has
1120              * come in the meanwhile, and if not flush the data to disk.
1121              */
1122             trace_mirror_before_drain(s, cnt);
1123 
1124             s->in_drain = true;
1125             bdrv_drained_begin(bs);
1126 
1127             /* Must be zero because we are drained */
1128             assert(s->in_active_write_counter == 0);
1129 
1130             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1131             if (cnt > 0 || mirror_flush(s) < 0) {
1132                 bdrv_drained_end(bs);
1133                 s->in_drain = false;
1134                 continue;
1135             }
1136 
1137             /* The two disks are in sync.  Exit and report successful
1138              * completion.
1139              */
1140             assert(QLIST_EMPTY(&bs->tracked_requests));
1141             need_drain = false;
1142             break;
1143         }
1144 
1145         if (job_is_ready(&s->common.job) && !should_complete) {
1146             if (s->in_flight == 0 && cnt == 0) {
1147                 trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1148                                           BLOCK_JOB_SLICE_TIME);
1149                 job_sleep_ns(&s->common.job, BLOCK_JOB_SLICE_TIME);
1150             }
1151         } else {
1152             block_job_ratelimit_sleep(&s->common);
1153         }
1154         s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1155     }
1156 
1157 immediate_exit:
1158     if (s->in_flight > 0) {
1159         /* We get here only if something went wrong.  Either the job failed,
1160          * or it was cancelled prematurely so that we do not guarantee that
1161          * the target is a copy of the source.
1162          */
1163         assert(ret < 0 || job_is_cancelled(&s->common.job));
1164         assert(need_drain);
1165         mirror_wait_for_all_io(s);
1166     }
1167 
1168     assert(s->in_flight == 0);
1169     qemu_vfree(s->buf);
1170     g_free(s->cow_bitmap);
1171     g_free(s->in_flight_bitmap);
1172     bdrv_dirty_iter_free(s->dbi);
1173 
1174     if (need_drain) {
1175         s->in_drain = true;
1176         bdrv_drained_begin(bs);
1177     }
1178 
1179     return ret;
1180 }
1181 
1182 static void mirror_complete(Job *job, Error **errp)
1183 {
1184     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1185 
1186     if (!job_is_ready(job)) {
1187         error_setg(errp, "The active block job '%s' cannot be completed",
1188                    job->id);
1189         return;
1190     }
1191 
1192     /* block all operations on to_replace bs */
1193     if (s->replaces) {
1194         AioContext *replace_aio_context;
1195 
1196         s->to_replace = bdrv_find_node(s->replaces);
1197         if (!s->to_replace) {
1198             error_setg(errp, "Node name '%s' not found", s->replaces);
1199             return;
1200         }
1201 
1202         replace_aio_context = bdrv_get_aio_context(s->to_replace);
1203         aio_context_acquire(replace_aio_context);
1204 
1205         /* TODO Translate this into child freeze system. */
1206         error_setg(&s->replace_blocker,
1207                    "block device is in use by block-job-complete");
1208         bdrv_op_block_all(s->to_replace, s->replace_blocker);
1209         bdrv_ref(s->to_replace);
1210 
1211         aio_context_release(replace_aio_context);
1212     }
1213 
1214     s->should_complete = true;
1215 
1216     /* If the job is paused, it will be re-entered when it is resumed */
1217     WITH_JOB_LOCK_GUARD() {
1218         if (!job->paused) {
1219             job_enter_cond_locked(job, NULL);
1220         }
1221     }
1222 }
1223 
1224 static void coroutine_fn mirror_pause(Job *job)
1225 {
1226     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1227 
1228     mirror_wait_for_all_io(s);
1229 }
1230 
1231 static bool mirror_drained_poll(BlockJob *job)
1232 {
1233     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1234 
1235     /* If the job isn't paused nor cancelled, we can't be sure that it won't
1236      * issue more requests. We make an exception if we've reached this point
1237      * from one of our own drain sections, to avoid a deadlock waiting for
1238      * ourselves.
1239      */
1240     WITH_JOB_LOCK_GUARD() {
1241         if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
1242             && !s->in_drain) {
1243             return true;
1244         }
1245     }
1246 
1247     return !!s->in_flight;
1248 }
1249 
1250 static bool mirror_cancel(Job *job, bool force)
1251 {
1252     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1253     BlockDriverState *target = blk_bs(s->target);
1254 
1255     /*
1256      * Before the job is READY, we treat any cancellation like a
1257      * force-cancellation.
1258      */
1259     force = force || !job_is_ready(job);
1260 
1261     if (force) {
1262         bdrv_cancel_in_flight(target);
1263     }
1264     return force;
1265 }
1266 
1267 static bool commit_active_cancel(Job *job, bool force)
1268 {
1269     /* Same as above in mirror_cancel() */
1270     return force || !job_is_ready(job);
1271 }
1272 
1273 static void mirror_change(BlockJob *job, BlockJobChangeOptions *opts,
1274                           Error **errp)
1275 {
1276     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1277     BlockJobChangeOptionsMirror *change_opts = &opts->u.mirror;
1278     MirrorCopyMode current;
1279 
1280     /*
1281      * The implementation relies on the fact that copy_mode is only written
1282      * under the BQL. Otherwise, further synchronization would be required.
1283      */
1284 
1285     GLOBAL_STATE_CODE();
1286 
1287     if (qatomic_read(&s->copy_mode) == change_opts->copy_mode) {
1288         return;
1289     }
1290 
1291     if (change_opts->copy_mode != MIRROR_COPY_MODE_WRITE_BLOCKING) {
1292         error_setg(errp, "Change to copy mode '%s' is not implemented",
1293                    MirrorCopyMode_str(change_opts->copy_mode));
1294         return;
1295     }
1296 
1297     current = qatomic_cmpxchg(&s->copy_mode, MIRROR_COPY_MODE_BACKGROUND,
1298                               change_opts->copy_mode);
1299     if (current != MIRROR_COPY_MODE_BACKGROUND) {
1300         error_setg(errp, "Expected current copy mode '%s', got '%s'",
1301                    MirrorCopyMode_str(MIRROR_COPY_MODE_BACKGROUND),
1302                    MirrorCopyMode_str(current));
1303     }
1304 }
1305 
1306 static void mirror_query(BlockJob *job, BlockJobInfo *info)
1307 {
1308     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1309 
1310     info->u.mirror = (BlockJobInfoMirror) {
1311         .actively_synced = qatomic_read(&s->actively_synced),
1312     };
1313 }
1314 
1315 static const BlockJobDriver mirror_job_driver = {
1316     .job_driver = {
1317         .instance_size          = sizeof(MirrorBlockJob),
1318         .job_type               = JOB_TYPE_MIRROR,
1319         .free                   = block_job_free,
1320         .user_resume            = block_job_user_resume,
1321         .run                    = mirror_run,
1322         .prepare                = mirror_prepare,
1323         .abort                  = mirror_abort,
1324         .pause                  = mirror_pause,
1325         .complete               = mirror_complete,
1326         .cancel                 = mirror_cancel,
1327     },
1328     .drained_poll           = mirror_drained_poll,
1329     .change                 = mirror_change,
1330     .query                  = mirror_query,
1331 };
1332 
1333 static const BlockJobDriver commit_active_job_driver = {
1334     .job_driver = {
1335         .instance_size          = sizeof(MirrorBlockJob),
1336         .job_type               = JOB_TYPE_COMMIT,
1337         .free                   = block_job_free,
1338         .user_resume            = block_job_user_resume,
1339         .run                    = mirror_run,
1340         .prepare                = mirror_prepare,
1341         .abort                  = mirror_abort,
1342         .pause                  = mirror_pause,
1343         .complete               = mirror_complete,
1344         .cancel                 = commit_active_cancel,
1345     },
1346     .drained_poll           = mirror_drained_poll,
1347 };
1348 
1349 static void coroutine_fn
1350 do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
1351                      uint64_t offset, uint64_t bytes,
1352                      QEMUIOVector *qiov, int flags)
1353 {
1354     int ret;
1355     size_t qiov_offset = 0;
1356     int64_t bitmap_offset, bitmap_end;
1357 
1358     if (!QEMU_IS_ALIGNED(offset, job->granularity) &&
1359         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset))
1360     {
1361             /*
1362              * Dirty unaligned padding: ignore it.
1363              *
1364              * Reasoning:
1365              * 1. If we copy it, we can't reset corresponding bit in
1366              *    dirty_bitmap as there may be some "dirty" bytes still not
1367              *    copied.
1368              * 2. It's already dirty, so skipping it we don't diverge mirror
1369              *    progress.
1370              *
1371              * Note, that because of this, guest write may have no contribution
1372              * into mirror converge, but that's not bad, as we have background
1373              * process of mirroring. If under some bad circumstances (high guest
1374              * IO load) background process starve, we will not converge anyway,
1375              * even if each write will contribute, as guest is not guaranteed to
1376              * rewrite the whole disk.
1377              */
1378             qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset;
1379             if (bytes <= qiov_offset) {
1380                 /* nothing to do after shrink */
1381                 return;
1382             }
1383             offset += qiov_offset;
1384             bytes -= qiov_offset;
1385     }
1386 
1387     if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) &&
1388         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1))
1389     {
1390         uint64_t tail = (offset + bytes) % job->granularity;
1391 
1392         if (bytes <= tail) {
1393             /* nothing to do after shrink */
1394             return;
1395         }
1396         bytes -= tail;
1397     }
1398 
1399     /*
1400      * Tails are either clean or shrunk, so for bitmap resetting
1401      * we safely align the range down.
1402      */
1403     bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity);
1404     bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity);
1405     if (bitmap_offset < bitmap_end) {
1406         bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1407                                 bitmap_end - bitmap_offset);
1408     }
1409 
1410     job_progress_increase_remaining(&job->common.job, bytes);
1411     job->active_write_bytes_in_flight += bytes;
1412 
1413     switch (method) {
1414     case MIRROR_METHOD_COPY:
1415         ret = blk_co_pwritev_part(job->target, offset, bytes,
1416                                   qiov, qiov_offset, flags);
1417         break;
1418 
1419     case MIRROR_METHOD_ZERO:
1420         assert(!qiov);
1421         ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags);
1422         break;
1423 
1424     case MIRROR_METHOD_DISCARD:
1425         assert(!qiov);
1426         ret = blk_co_pdiscard(job->target, offset, bytes);
1427         break;
1428 
1429     default:
1430         abort();
1431     }
1432 
1433     job->active_write_bytes_in_flight -= bytes;
1434     if (ret >= 0) {
1435         job_progress_update(&job->common.job, bytes);
1436     } else {
1437         BlockErrorAction action;
1438 
1439         /*
1440          * We failed, so we should mark dirty the whole area, aligned up.
1441          * Note that we don't care about shrunk tails if any: they were dirty
1442          * at function start, and they must be still dirty, as we've locked
1443          * the region for in-flight op.
1444          */
1445         bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity);
1446         bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity);
1447         bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1448                               bitmap_end - bitmap_offset);
1449         qatomic_set(&job->actively_synced, false);
1450 
1451         action = mirror_error_action(job, false, -ret);
1452         if (action == BLOCK_ERROR_ACTION_REPORT) {
1453             if (!job->ret) {
1454                 job->ret = ret;
1455             }
1456         }
1457     }
1458 }
1459 
1460 static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
1461                                                    uint64_t offset,
1462                                                    uint64_t bytes)
1463 {
1464     MirrorOp *op;
1465     uint64_t start_chunk = offset / s->granularity;
1466     uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
1467 
1468     op = g_new(MirrorOp, 1);
1469     *op = (MirrorOp){
1470         .s                  = s,
1471         .offset             = offset,
1472         .bytes              = bytes,
1473         .is_active_write    = true,
1474         .is_in_flight       = true,
1475         .co                 = qemu_coroutine_self(),
1476     };
1477     qemu_co_queue_init(&op->waiting_requests);
1478     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
1479 
1480     s->in_active_write_counter++;
1481 
1482     /*
1483      * Wait for concurrent requests affecting the area.  If there are already
1484      * running requests that are copying off now-to-be stale data in the area,
1485      * we must wait for them to finish before we begin writing fresh data to the
1486      * target so that the write operations appear in the correct order.
1487      * Note that background requests (see mirror_iteration()) in contrast only
1488      * wait for conflicting requests at the start of the dirty area, and then
1489      * (based on the in_flight_bitmap) truncate the area to copy so it will not
1490      * conflict with any requests beyond that.  For active writes, however, we
1491      * cannot truncate that area.  The request from our parent must be blocked
1492      * until the area is copied in full.  Therefore, we must wait for the whole
1493      * area to become free of concurrent requests.
1494      */
1495     mirror_wait_on_conflicts(op, s, offset, bytes);
1496 
1497     bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1498 
1499     return op;
1500 }
1501 
1502 static void coroutine_fn GRAPH_RDLOCK active_write_settle(MirrorOp *op)
1503 {
1504     uint64_t start_chunk = op->offset / op->s->granularity;
1505     uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
1506                                       op->s->granularity);
1507 
1508     if (!--op->s->in_active_write_counter &&
1509         qatomic_read(&op->s->actively_synced)) {
1510         BdrvChild *source = op->s->mirror_top_bs->backing;
1511 
1512         if (QLIST_FIRST(&source->bs->parents) == source &&
1513             QLIST_NEXT(source, next_parent) == NULL)
1514         {
1515             /* Assert that we are back in sync once all active write
1516              * operations are settled.
1517              * Note that we can only assert this if the mirror node
1518              * is the source node's only parent. */
1519             assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
1520         }
1521     }
1522     bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1523     QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
1524     qemu_co_queue_restart_all(&op->waiting_requests);
1525     g_free(op);
1526 }
1527 
1528 static int coroutine_fn GRAPH_RDLOCK
1529 bdrv_mirror_top_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1530                        QEMUIOVector *qiov, BdrvRequestFlags flags)
1531 {
1532     return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
1533 }
1534 
1535 static bool should_copy_to_target(MirrorBDSOpaque *s)
1536 {
1537     return s->job && s->job->ret >= 0 &&
1538         !job_is_cancelled(&s->job->common.job) &&
1539         qatomic_read(&s->job->copy_mode) == MIRROR_COPY_MODE_WRITE_BLOCKING;
1540 }
1541 
1542 static int coroutine_fn GRAPH_RDLOCK
1543 bdrv_mirror_top_do_write(BlockDriverState *bs, MirrorMethod method,
1544                          bool copy_to_target, uint64_t offset, uint64_t bytes,
1545                          QEMUIOVector *qiov, int flags)
1546 {
1547     MirrorOp *op = NULL;
1548     MirrorBDSOpaque *s = bs->opaque;
1549     int ret = 0;
1550 
1551     if (copy_to_target) {
1552         op = active_write_prepare(s->job, offset, bytes);
1553     }
1554 
1555     switch (method) {
1556     case MIRROR_METHOD_COPY:
1557         ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
1558         break;
1559 
1560     case MIRROR_METHOD_ZERO:
1561         ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
1562         break;
1563 
1564     case MIRROR_METHOD_DISCARD:
1565         ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1566         break;
1567 
1568     default:
1569         abort();
1570     }
1571 
1572     if (!copy_to_target && s->job && s->job->dirty_bitmap) {
1573         qatomic_set(&s->job->actively_synced, false);
1574         bdrv_set_dirty_bitmap(s->job->dirty_bitmap, offset, bytes);
1575     }
1576 
1577     if (ret < 0) {
1578         goto out;
1579     }
1580 
1581     if (copy_to_target) {
1582         do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
1583     }
1584 
1585 out:
1586     if (copy_to_target) {
1587         active_write_settle(op);
1588     }
1589     return ret;
1590 }
1591 
1592 static int coroutine_fn GRAPH_RDLOCK
1593 bdrv_mirror_top_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1594                         QEMUIOVector *qiov, BdrvRequestFlags flags)
1595 {
1596     QEMUIOVector bounce_qiov;
1597     void *bounce_buf;
1598     int ret = 0;
1599     bool copy_to_target = should_copy_to_target(bs->opaque);
1600 
1601     if (copy_to_target) {
1602         /* The guest might concurrently modify the data to write; but
1603          * the data on source and destination must match, so we have
1604          * to use a bounce buffer if we are going to write to the
1605          * target now. */
1606         bounce_buf = qemu_blockalign(bs, bytes);
1607         iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
1608 
1609         qemu_iovec_init(&bounce_qiov, 1);
1610         qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
1611         qiov = &bounce_qiov;
1612 
1613         flags &= ~BDRV_REQ_REGISTERED_BUF;
1614     }
1615 
1616     ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, copy_to_target,
1617                                    offset, bytes, qiov, flags);
1618 
1619     if (copy_to_target) {
1620         qemu_iovec_destroy(&bounce_qiov);
1621         qemu_vfree(bounce_buf);
1622     }
1623 
1624     return ret;
1625 }
1626 
1627 static int coroutine_fn GRAPH_RDLOCK bdrv_mirror_top_flush(BlockDriverState *bs)
1628 {
1629     if (bs->backing == NULL) {
1630         /* we can be here after failed bdrv_append in mirror_start_job */
1631         return 0;
1632     }
1633     return bdrv_co_flush(bs->backing->bs);
1634 }
1635 
1636 static int coroutine_fn GRAPH_RDLOCK
1637 bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1638                               int64_t bytes, BdrvRequestFlags flags)
1639 {
1640     bool copy_to_target = should_copy_to_target(bs->opaque);
1641     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, copy_to_target,
1642                                     offset, bytes, NULL, flags);
1643 }
1644 
1645 static int coroutine_fn GRAPH_RDLOCK
1646 bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1647 {
1648     bool copy_to_target = should_copy_to_target(bs->opaque);
1649     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, copy_to_target,
1650                                     offset, bytes, NULL, 0);
1651 }
1652 
1653 static void GRAPH_RDLOCK bdrv_mirror_top_refresh_filename(BlockDriverState *bs)
1654 {
1655     if (bs->backing == NULL) {
1656         /* we can be here after failed bdrv_attach_child in
1657          * bdrv_set_backing_hd */
1658         return;
1659     }
1660     pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
1661             bs->backing->bs->filename);
1662 }
1663 
1664 static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
1665                                        BdrvChildRole role,
1666                                        BlockReopenQueue *reopen_queue,
1667                                        uint64_t perm, uint64_t shared,
1668                                        uint64_t *nperm, uint64_t *nshared)
1669 {
1670     MirrorBDSOpaque *s = bs->opaque;
1671 
1672     if (s->stop) {
1673         /*
1674          * If the job is to be stopped, we do not need to forward
1675          * anything to the real image.
1676          */
1677         *nperm = 0;
1678         *nshared = BLK_PERM_ALL;
1679         return;
1680     }
1681 
1682     bdrv_default_perms(bs, c, role, reopen_queue,
1683                        perm, shared, nperm, nshared);
1684 
1685     if (s->is_commit) {
1686         /*
1687          * For commit jobs, we cannot take CONSISTENT_READ, because
1688          * that permission is unshared for everything above the base
1689          * node (except for filters on the base node).
1690          * We also have to force-share the WRITE permission, or
1691          * otherwise we would block ourselves at the base node (if
1692          * writes are blocked for a node, they are also blocked for
1693          * its backing file).
1694          * (We could also share RESIZE, because it may be needed for
1695          * the target if its size is less than the top node's; but
1696          * bdrv_default_perms_for_cow() automatically shares RESIZE
1697          * for backing nodes if WRITE is shared, so there is no need
1698          * to do it here.)
1699          */
1700         *nperm &= ~BLK_PERM_CONSISTENT_READ;
1701         *nshared |= BLK_PERM_WRITE;
1702     }
1703 }
1704 
1705 /* Dummy node that provides consistent read to its users without requiring it
1706  * from its backing file and that allows writes on the backing file chain. */
1707 static BlockDriver bdrv_mirror_top = {
1708     .format_name                = "mirror_top",
1709     .bdrv_co_preadv             = bdrv_mirror_top_preadv,
1710     .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
1711     .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
1712     .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
1713     .bdrv_co_flush              = bdrv_mirror_top_flush,
1714     .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1715     .bdrv_child_perm            = bdrv_mirror_top_child_perm,
1716 
1717     .is_filter                  = true,
1718     .filtered_child_is_backing  = true,
1719 };
1720 
1721 static BlockJob *mirror_start_job(
1722                              const char *job_id, BlockDriverState *bs,
1723                              int creation_flags, BlockDriverState *target,
1724                              const char *replaces, int64_t speed,
1725                              uint32_t granularity, int64_t buf_size,
1726                              BlockMirrorBackingMode backing_mode,
1727                              bool zero_target,
1728                              BlockdevOnError on_source_error,
1729                              BlockdevOnError on_target_error,
1730                              bool unmap,
1731                              BlockCompletionFunc *cb,
1732                              void *opaque,
1733                              const BlockJobDriver *driver,
1734                              bool is_none_mode, BlockDriverState *base,
1735                              bool auto_complete, const char *filter_node_name,
1736                              bool is_mirror, MirrorCopyMode copy_mode,
1737                              Error **errp)
1738 {
1739     MirrorBlockJob *s;
1740     MirrorBDSOpaque *bs_opaque;
1741     BlockDriverState *mirror_top_bs;
1742     bool target_is_backing;
1743     uint64_t target_perms, target_shared_perms;
1744     int ret;
1745 
1746     GLOBAL_STATE_CODE();
1747 
1748     if (granularity == 0) {
1749         granularity = bdrv_get_default_bitmap_granularity(target);
1750     }
1751 
1752     assert(is_power_of_2(granularity));
1753 
1754     if (buf_size < 0) {
1755         error_setg(errp, "Invalid parameter 'buf-size'");
1756         return NULL;
1757     }
1758 
1759     if (buf_size == 0) {
1760         buf_size = DEFAULT_MIRROR_BUF_SIZE;
1761     }
1762 
1763     bdrv_graph_rdlock_main_loop();
1764     if (bdrv_skip_filters(bs) == bdrv_skip_filters(target)) {
1765         error_setg(errp, "Can't mirror node into itself");
1766         bdrv_graph_rdunlock_main_loop();
1767         return NULL;
1768     }
1769 
1770     target_is_backing = bdrv_chain_contains(bs, target);
1771     bdrv_graph_rdunlock_main_loop();
1772 
1773     /* In the case of active commit, add dummy driver to provide consistent
1774      * reads on the top, while disabling it in the intermediate nodes, and make
1775      * the backing chain writable. */
1776     mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
1777                                          BDRV_O_RDWR, errp);
1778     if (mirror_top_bs == NULL) {
1779         return NULL;
1780     }
1781     if (!filter_node_name) {
1782         mirror_top_bs->implicit = true;
1783     }
1784 
1785     /* So that we can always drop this node */
1786     mirror_top_bs->never_freeze = true;
1787 
1788     mirror_top_bs->total_sectors = bs->total_sectors;
1789     mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1790     mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED |
1791                                           BDRV_REQ_NO_FALLBACK;
1792     bs_opaque = g_new0(MirrorBDSOpaque, 1);
1793     mirror_top_bs->opaque = bs_opaque;
1794 
1795     bs_opaque->is_commit = target_is_backing;
1796 
1797     bdrv_drained_begin(bs);
1798     ret = bdrv_append(mirror_top_bs, bs, errp);
1799     bdrv_drained_end(bs);
1800 
1801     if (ret < 0) {
1802         bdrv_unref(mirror_top_bs);
1803         return NULL;
1804     }
1805 
1806     /* Make sure that the source is not resized while the job is running */
1807     s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1808                          BLK_PERM_CONSISTENT_READ,
1809                          BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1810                          BLK_PERM_WRITE, speed,
1811                          creation_flags, cb, opaque, errp);
1812     if (!s) {
1813         goto fail;
1814     }
1815 
1816     /* The block job now has a reference to this node */
1817     bdrv_unref(mirror_top_bs);
1818 
1819     s->mirror_top_bs = mirror_top_bs;
1820 
1821     /* No resize for the target either; while the mirror is still running, a
1822      * consistent read isn't necessarily possible. We could possibly allow
1823      * writes and graph modifications, though it would likely defeat the
1824      * purpose of a mirror, so leave them blocked for now.
1825      *
1826      * In the case of active commit, things look a bit different, though,
1827      * because the target is an already populated backing file in active use.
1828      * We can allow anything except resize there.*/
1829 
1830     target_perms = BLK_PERM_WRITE;
1831     target_shared_perms = BLK_PERM_WRITE_UNCHANGED;
1832 
1833     if (target_is_backing) {
1834         int64_t bs_size, target_size;
1835         bs_size = bdrv_getlength(bs);
1836         if (bs_size < 0) {
1837             error_setg_errno(errp, -bs_size,
1838                              "Could not inquire top image size");
1839             goto fail;
1840         }
1841 
1842         target_size = bdrv_getlength(target);
1843         if (target_size < 0) {
1844             error_setg_errno(errp, -target_size,
1845                              "Could not inquire base image size");
1846             goto fail;
1847         }
1848 
1849         if (target_size < bs_size) {
1850             target_perms |= BLK_PERM_RESIZE;
1851         }
1852 
1853         target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
1854     } else {
1855         bdrv_graph_rdlock_main_loop();
1856         if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
1857             /*
1858              * We may want to allow this in the future, but it would
1859              * require taking some extra care.
1860              */
1861             error_setg(errp, "Cannot mirror to a filter on top of a node in "
1862                        "the source's backing chain");
1863             bdrv_graph_rdunlock_main_loop();
1864             goto fail;
1865         }
1866         bdrv_graph_rdunlock_main_loop();
1867     }
1868 
1869     s->target = blk_new(s->common.job.aio_context,
1870                         target_perms, target_shared_perms);
1871     ret = blk_insert_bs(s->target, target, errp);
1872     if (ret < 0) {
1873         goto fail;
1874     }
1875     if (is_mirror) {
1876         /* XXX: Mirror target could be a NBD server of target QEMU in the case
1877          * of non-shared block migration. To allow migration completion, we
1878          * have to allow "inactivate" of the target BB.  When that happens, we
1879          * know the job is drained, and the vcpus are stopped, so no write
1880          * operation will be performed. Block layer already has assertions to
1881          * ensure that. */
1882         blk_set_force_allow_inactivate(s->target);
1883     }
1884     blk_set_allow_aio_context_change(s->target, true);
1885     blk_set_disable_request_queuing(s->target, true);
1886 
1887     bdrv_graph_rdlock_main_loop();
1888     s->replaces = g_strdup(replaces);
1889     s->on_source_error = on_source_error;
1890     s->on_target_error = on_target_error;
1891     s->is_none_mode = is_none_mode;
1892     s->backing_mode = backing_mode;
1893     s->zero_target = zero_target;
1894     qatomic_set(&s->copy_mode, copy_mode);
1895     s->base = base;
1896     s->base_overlay = bdrv_find_overlay(bs, base);
1897     s->granularity = granularity;
1898     s->buf_size = ROUND_UP(buf_size, granularity);
1899     s->unmap = unmap;
1900     if (auto_complete) {
1901         s->should_complete = true;
1902     }
1903     bdrv_graph_rdunlock_main_loop();
1904 
1905     s->dirty_bitmap = bdrv_create_dirty_bitmap(s->mirror_top_bs, granularity,
1906                                                NULL, errp);
1907     if (!s->dirty_bitmap) {
1908         goto fail;
1909     }
1910 
1911     /*
1912      * The dirty bitmap is set by bdrv_mirror_top_do_write() when not in active
1913      * mode.
1914      */
1915     bdrv_disable_dirty_bitmap(s->dirty_bitmap);
1916 
1917     bdrv_graph_wrlock(bs);
1918     ret = block_job_add_bdrv(&s->common, "source", bs, 0,
1919                              BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE |
1920                              BLK_PERM_CONSISTENT_READ,
1921                              errp);
1922     if (ret < 0) {
1923         bdrv_graph_wrunlock(bs);
1924         goto fail;
1925     }
1926 
1927     /* Required permissions are already taken with blk_new() */
1928     block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
1929                        &error_abort);
1930 
1931     /* In commit_active_start() all intermediate nodes disappear, so
1932      * any jobs in them must be blocked */
1933     if (target_is_backing) {
1934         BlockDriverState *iter, *filtered_target;
1935         uint64_t iter_shared_perms;
1936 
1937         /*
1938          * The topmost node with
1939          * bdrv_skip_filters(filtered_target) == bdrv_skip_filters(target)
1940          */
1941         filtered_target = bdrv_cow_bs(bdrv_find_overlay(bs, target));
1942 
1943         assert(bdrv_skip_filters(filtered_target) ==
1944                bdrv_skip_filters(target));
1945 
1946         /*
1947          * XXX BLK_PERM_WRITE needs to be allowed so we don't block
1948          * ourselves at s->base (if writes are blocked for a node, they are
1949          * also blocked for its backing file). The other options would be a
1950          * second filter driver above s->base (== target).
1951          */
1952         iter_shared_perms = BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE;
1953 
1954         for (iter = bdrv_filter_or_cow_bs(bs); iter != target;
1955              iter = bdrv_filter_or_cow_bs(iter))
1956         {
1957             if (iter == filtered_target) {
1958                 /*
1959                  * From here on, all nodes are filters on the base.
1960                  * This allows us to share BLK_PERM_CONSISTENT_READ.
1961                  */
1962                 iter_shared_perms |= BLK_PERM_CONSISTENT_READ;
1963             }
1964 
1965             ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
1966                                      iter_shared_perms, errp);
1967             if (ret < 0) {
1968                 bdrv_graph_wrunlock(bs);
1969                 goto fail;
1970             }
1971         }
1972 
1973         if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) {
1974             bdrv_graph_wrunlock(bs);
1975             goto fail;
1976         }
1977     }
1978     bdrv_graph_wrunlock(bs);
1979 
1980     QTAILQ_INIT(&s->ops_in_flight);
1981 
1982     trace_mirror_start(bs, s, opaque);
1983     job_start(&s->common.job);
1984 
1985     return &s->common;
1986 
1987 fail:
1988     if (s) {
1989         /* Make sure this BDS does not go away until we have completed the graph
1990          * changes below */
1991         bdrv_ref(mirror_top_bs);
1992 
1993         g_free(s->replaces);
1994         blk_unref(s->target);
1995         bs_opaque->job = NULL;
1996         if (s->dirty_bitmap) {
1997             bdrv_release_dirty_bitmap(s->dirty_bitmap);
1998         }
1999         job_early_fail(&s->common.job);
2000     }
2001 
2002     bs_opaque->stop = true;
2003     bdrv_drained_begin(bs);
2004     bdrv_graph_wrlock(bs);
2005     assert(mirror_top_bs->backing->bs == bs);
2006     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
2007                              &error_abort);
2008     bdrv_replace_node(mirror_top_bs, bs, &error_abort);
2009     bdrv_graph_wrunlock(bs);
2010     bdrv_drained_end(bs);
2011 
2012     bdrv_unref(mirror_top_bs);
2013 
2014     return NULL;
2015 }
2016 
2017 void mirror_start(const char *job_id, BlockDriverState *bs,
2018                   BlockDriverState *target, const char *replaces,
2019                   int creation_flags, int64_t speed,
2020                   uint32_t granularity, int64_t buf_size,
2021                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
2022                   bool zero_target,
2023                   BlockdevOnError on_source_error,
2024                   BlockdevOnError on_target_error,
2025                   bool unmap, const char *filter_node_name,
2026                   MirrorCopyMode copy_mode, Error **errp)
2027 {
2028     bool is_none_mode;
2029     BlockDriverState *base;
2030 
2031     GLOBAL_STATE_CODE();
2032 
2033     if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
2034         (mode == MIRROR_SYNC_MODE_BITMAP)) {
2035         error_setg(errp, "Sync mode '%s' not supported",
2036                    MirrorSyncMode_str(mode));
2037         return;
2038     }
2039 
2040     bdrv_graph_rdlock_main_loop();
2041     is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
2042     base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
2043     bdrv_graph_rdunlock_main_loop();
2044 
2045     mirror_start_job(job_id, bs, creation_flags, target, replaces,
2046                      speed, granularity, buf_size, backing_mode, zero_target,
2047                      on_source_error, on_target_error, unmap, NULL, NULL,
2048                      &mirror_job_driver, is_none_mode, base, false,
2049                      filter_node_name, true, copy_mode, errp);
2050 }
2051 
2052 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
2053                               BlockDriverState *base, int creation_flags,
2054                               int64_t speed, BlockdevOnError on_error,
2055                               const char *filter_node_name,
2056                               BlockCompletionFunc *cb, void *opaque,
2057                               bool auto_complete, Error **errp)
2058 {
2059     bool base_read_only;
2060     BlockJob *job;
2061 
2062     GLOBAL_STATE_CODE();
2063 
2064     base_read_only = bdrv_is_read_only(base);
2065 
2066     if (base_read_only) {
2067         if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
2068             return NULL;
2069         }
2070     }
2071 
2072     job = mirror_start_job(
2073                      job_id, bs, creation_flags, base, NULL, speed, 0, 0,
2074                      MIRROR_LEAVE_BACKING_CHAIN, false,
2075                      on_error, on_error, true, cb, opaque,
2076                      &commit_active_job_driver, false, base, auto_complete,
2077                      filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
2078                      errp);
2079     if (!job) {
2080         goto error_restore_flags;
2081     }
2082 
2083     return job;
2084 
2085 error_restore_flags:
2086     /* ignore error and errp for bdrv_reopen, because we want to propagate
2087      * the original error */
2088     if (base_read_only) {
2089         bdrv_reopen_set_read_only(base, true, NULL);
2090     }
2091     return NULL;
2092 }
2093