xref: /qemu/block/block-copy.c (revision f9734d5d)
1 /*
2  * block_copy API
3  *
4  * Copyright (C) 2013 Proxmox Server Solutions
5  * Copyright (c) 2019 Virtuozzo International GmbH.
6  *
7  * Authors:
8  *  Dietmar Maurer (dietmar@proxmox.com)
9  *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
24 
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
29 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
30 
31 typedef enum {
32     COPY_READ_WRITE_CLUSTER,
33     COPY_READ_WRITE,
34     COPY_WRITE_ZEROES,
35     COPY_RANGE_SMALL,
36     COPY_RANGE_FULL
37 } BlockCopyMethod;
38 
39 static coroutine_fn int block_copy_task_entry(AioTask *task);
40 
41 typedef struct BlockCopyCallState {
42     /* Fields initialized in block_copy_async() and never changed. */
43     BlockCopyState *s;
44     int64_t offset;
45     int64_t bytes;
46     int max_workers;
47     int64_t max_chunk;
48     bool ignore_ratelimit;
49     BlockCopyAsyncCallbackFunc cb;
50     void *cb_opaque;
51     /* Coroutine where async block-copy is running */
52     Coroutine *co;
53 
54     /* Fields whose state changes throughout the execution */
55     bool finished; /* atomic */
56     QemuCoSleep sleep; /* TODO: protect API with a lock */
57     bool cancelled; /* atomic */
58     /* To reference all call states from BlockCopyState */
59     QLIST_ENTRY(BlockCopyCallState) list;
60 
61     /*
62      * Fields that report information about return values and erros.
63      * Protected by lock in BlockCopyState.
64      */
65     bool error_is_read;
66     /*
67      * @ret is set concurrently by tasks under mutex. Only set once by first
68      * failed task (and untouched if no task failed).
69      * After finishing (call_state->finished is true), it is not modified
70      * anymore and may be safely read without mutex.
71      */
72     int ret;
73 } BlockCopyCallState;
74 
75 typedef struct BlockCopyTask {
76     AioTask task;
77 
78     /*
79      * Fields initialized in block_copy_task_create()
80      * and never changed.
81      */
82     BlockCopyState *s;
83     BlockCopyCallState *call_state;
84     int64_t offset;
85     /*
86      * @method can also be set again in the while loop of
87      * block_copy_dirty_clusters(), but it is never accessed concurrently
88      * because the only other function that reads it is
89      * block_copy_task_entry() and it is invoked afterwards in the same
90      * iteration.
91      */
92     BlockCopyMethod method;
93 
94     /*
95      * Fields whose state changes throughout the execution
96      * Protected by lock in BlockCopyState.
97      */
98     CoQueue wait_queue; /* coroutines blocked on this task */
99     /*
100      * Only protect the case of parallel read while updating @bytes
101      * value in block_copy_task_shrink().
102      */
103     int64_t bytes;
104     QLIST_ENTRY(BlockCopyTask) list;
105 } BlockCopyTask;
106 
107 static int64_t task_end(BlockCopyTask *task)
108 {
109     return task->offset + task->bytes;
110 }
111 
112 typedef struct BlockCopyState {
113     /*
114      * BdrvChild objects are not owned or managed by block-copy. They are
115      * provided by block-copy user and user is responsible for appropriate
116      * permissions on these children.
117      */
118     BdrvChild *source;
119     BdrvChild *target;
120 
121     /*
122      * Fields initialized in block_copy_state_new()
123      * and never changed.
124      */
125     int64_t cluster_size;
126     int64_t max_transfer;
127     uint64_t len;
128     BdrvRequestFlags write_flags;
129 
130     /*
131      * Fields whose state changes throughout the execution
132      * Protected by lock.
133      */
134     CoMutex lock;
135     int64_t in_flight_bytes;
136     BlockCopyMethod method;
137     QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
138     QLIST_HEAD(, BlockCopyCallState) calls;
139     /*
140      * skip_unallocated:
141      *
142      * Used by sync=top jobs, which first scan the source node for unallocated
143      * areas and clear them in the copy_bitmap.  During this process, the bitmap
144      * is thus not fully initialized: It may still have bits set for areas that
145      * are unallocated and should actually not be copied.
146      *
147      * This is indicated by skip_unallocated.
148      *
149      * In this case, block_copy() will query the source’s allocation status,
150      * skip unallocated regions, clear them in the copy_bitmap, and invoke
151      * block_copy_reset_unallocated() every time it does.
152      */
153     bool skip_unallocated; /* atomic */
154     /* State fields that use a thread-safe API */
155     BdrvDirtyBitmap *copy_bitmap;
156     ProgressMeter *progress;
157     SharedResource *mem;
158     RateLimit rate_limit;
159 } BlockCopyState;
160 
161 /* Called with lock held */
162 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
163                                             int64_t offset, int64_t bytes)
164 {
165     BlockCopyTask *t;
166 
167     QLIST_FOREACH(t, &s->tasks, list) {
168         if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
169             return t;
170         }
171     }
172 
173     return NULL;
174 }
175 
176 /*
177  * If there are no intersecting tasks return false. Otherwise, wait for the
178  * first found intersecting tasks to finish and return true.
179  *
180  * Called with lock held. May temporary release the lock.
181  * Return value of 0 proves that lock was NOT released.
182  */
183 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
184                                              int64_t bytes)
185 {
186     BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
187 
188     if (!task) {
189         return false;
190     }
191 
192     qemu_co_queue_wait(&task->wait_queue, &s->lock);
193 
194     return true;
195 }
196 
197 /* Called with lock held */
198 static int64_t block_copy_chunk_size(BlockCopyState *s)
199 {
200     switch (s->method) {
201     case COPY_READ_WRITE_CLUSTER:
202         return s->cluster_size;
203     case COPY_READ_WRITE:
204     case COPY_RANGE_SMALL:
205         return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
206                    s->max_transfer);
207     case COPY_RANGE_FULL:
208         return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
209                    s->max_transfer);
210     default:
211         /* Cannot have COPY_WRITE_ZEROES here.  */
212         abort();
213     }
214 }
215 
216 /*
217  * Search for the first dirty area in offset/bytes range and create task at
218  * the beginning of it.
219  */
220 static coroutine_fn BlockCopyTask *
221 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
222                        int64_t offset, int64_t bytes)
223 {
224     BlockCopyTask *task;
225     int64_t max_chunk;
226 
227     QEMU_LOCK_GUARD(&s->lock);
228     max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
229     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
230                                            offset, offset + bytes,
231                                            max_chunk, &offset, &bytes))
232     {
233         return NULL;
234     }
235 
236     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
237     bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
238 
239     /* region is dirty, so no existent tasks possible in it */
240     assert(!find_conflicting_task(s, offset, bytes));
241 
242     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
243     s->in_flight_bytes += bytes;
244 
245     task = g_new(BlockCopyTask, 1);
246     *task = (BlockCopyTask) {
247         .task.func = block_copy_task_entry,
248         .s = s,
249         .call_state = call_state,
250         .offset = offset,
251         .bytes = bytes,
252         .method = s->method,
253     };
254     qemu_co_queue_init(&task->wait_queue);
255     QLIST_INSERT_HEAD(&s->tasks, task, list);
256 
257     return task;
258 }
259 
260 /*
261  * block_copy_task_shrink
262  *
263  * Drop the tail of the task to be handled later. Set dirty bits back and
264  * wake up all tasks waiting for us (may be some of them are not intersecting
265  * with shrunk task)
266  */
267 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
268                                                 int64_t new_bytes)
269 {
270     QEMU_LOCK_GUARD(&task->s->lock);
271     if (new_bytes == task->bytes) {
272         return;
273     }
274 
275     assert(new_bytes > 0 && new_bytes < task->bytes);
276 
277     task->s->in_flight_bytes -= task->bytes - new_bytes;
278     bdrv_set_dirty_bitmap(task->s->copy_bitmap,
279                           task->offset + new_bytes, task->bytes - new_bytes);
280 
281     task->bytes = new_bytes;
282     qemu_co_queue_restart_all(&task->wait_queue);
283 }
284 
285 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
286 {
287     QEMU_LOCK_GUARD(&task->s->lock);
288     task->s->in_flight_bytes -= task->bytes;
289     if (ret < 0) {
290         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
291     }
292     QLIST_REMOVE(task, list);
293     progress_set_remaining(task->s->progress,
294                            bdrv_get_dirty_count(task->s->copy_bitmap) +
295                            task->s->in_flight_bytes);
296     qemu_co_queue_restart_all(&task->wait_queue);
297 }
298 
299 void block_copy_state_free(BlockCopyState *s)
300 {
301     if (!s) {
302         return;
303     }
304 
305     ratelimit_destroy(&s->rate_limit);
306     bdrv_release_dirty_bitmap(s->copy_bitmap);
307     shres_destroy(s->mem);
308     g_free(s);
309 }
310 
311 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
312 {
313     return MIN_NON_ZERO(INT_MAX,
314                         MIN_NON_ZERO(source->bs->bl.max_transfer,
315                                      target->bs->bl.max_transfer));
316 }
317 
318 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
319                                      int64_t cluster_size, bool use_copy_range,
320                                      BdrvRequestFlags write_flags, Error **errp)
321 {
322     BlockCopyState *s;
323     BdrvDirtyBitmap *copy_bitmap;
324 
325     copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
326                                            errp);
327     if (!copy_bitmap) {
328         return NULL;
329     }
330     bdrv_disable_dirty_bitmap(copy_bitmap);
331 
332     s = g_new(BlockCopyState, 1);
333     *s = (BlockCopyState) {
334         .source = source,
335         .target = target,
336         .copy_bitmap = copy_bitmap,
337         .cluster_size = cluster_size,
338         .len = bdrv_dirty_bitmap_size(copy_bitmap),
339         .write_flags = write_flags,
340         .mem = shres_create(BLOCK_COPY_MAX_MEM),
341         .max_transfer = QEMU_ALIGN_DOWN(
342                                     block_copy_max_transfer(source, target),
343                                     cluster_size),
344     };
345 
346     if (s->max_transfer < cluster_size) {
347         /*
348          * copy_range does not respect max_transfer. We don't want to bother
349          * with requests smaller than block-copy cluster size, so fallback to
350          * buffered copying (read and write respect max_transfer on their
351          * behalf).
352          */
353         s->method = COPY_READ_WRITE_CLUSTER;
354     } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
355         /* Compression supports only cluster-size writes and no copy-range. */
356         s->method = COPY_READ_WRITE_CLUSTER;
357     } else {
358         /*
359          * If copy range enabled, start with COPY_RANGE_SMALL, until first
360          * successful copy_range (look at block_copy_do_copy).
361          */
362         s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
363     }
364 
365     ratelimit_init(&s->rate_limit);
366     qemu_co_mutex_init(&s->lock);
367     QLIST_INIT(&s->tasks);
368     QLIST_INIT(&s->calls);
369 
370     return s;
371 }
372 
373 /* Only set before running the job, no need for locking. */
374 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
375 {
376     s->progress = pm;
377 }
378 
379 /*
380  * Takes ownership of @task
381  *
382  * If pool is NULL directly run the task, otherwise schedule it into the pool.
383  *
384  * Returns: task.func return code if pool is NULL
385  *          otherwise -ECANCELED if pool status is bad
386  *          otherwise 0 (successfully scheduled)
387  */
388 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
389                                             BlockCopyTask *task)
390 {
391     if (!pool) {
392         int ret = task->task.func(&task->task);
393 
394         g_free(task);
395         return ret;
396     }
397 
398     aio_task_pool_wait_slot(pool);
399     if (aio_task_pool_status(pool) < 0) {
400         co_put_to_shres(task->s->mem, task->bytes);
401         block_copy_task_end(task, -ECANCELED);
402         g_free(task);
403         return -ECANCELED;
404     }
405 
406     aio_task_pool_start_task(pool, &task->task);
407 
408     return 0;
409 }
410 
411 /*
412  * block_copy_do_copy
413  *
414  * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
415  * s->len only to cover last cluster when s->len is not aligned to clusters.
416  *
417  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
418  *
419  * @method is an in-out argument, so that copy_range can be either extended to
420  * a full-size buffer or disabled if the copy_range attempt fails.  The output
421  * value of @method should be used for subsequent tasks.
422  * Returns 0 on success.
423  */
424 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
425                                            int64_t offset, int64_t bytes,
426                                            BlockCopyMethod *method,
427                                            bool *error_is_read)
428 {
429     int ret;
430     int64_t nbytes = MIN(offset + bytes, s->len) - offset;
431     void *bounce_buffer = NULL;
432 
433     assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
434     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
435     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
436     assert(offset < s->len);
437     assert(offset + bytes <= s->len ||
438            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
439     assert(nbytes < INT_MAX);
440 
441     switch (*method) {
442     case COPY_WRITE_ZEROES:
443         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
444                                     ~BDRV_REQ_WRITE_COMPRESSED);
445         if (ret < 0) {
446             trace_block_copy_write_zeroes_fail(s, offset, ret);
447             *error_is_read = false;
448         }
449         return ret;
450 
451     case COPY_RANGE_SMALL:
452     case COPY_RANGE_FULL:
453         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
454                                  0, s->write_flags);
455         if (ret >= 0) {
456             /* Successful copy-range, increase chunk size.  */
457             *method = COPY_RANGE_FULL;
458             return 0;
459         }
460 
461         trace_block_copy_copy_range_fail(s, offset, ret);
462         *method = COPY_READ_WRITE;
463         /* Fall through to read+write with allocated buffer */
464 
465     case COPY_READ_WRITE_CLUSTER:
466     case COPY_READ_WRITE:
467         /*
468          * In case of failed copy_range request above, we may proceed with
469          * buffered request larger than BLOCK_COPY_MAX_BUFFER.
470          * Still, further requests will be properly limited, so don't care too
471          * much. Moreover the most likely case (copy_range is unsupported for
472          * the configuration, so the very first copy_range request fails)
473          * is handled by setting large copy_size only after first successful
474          * copy_range.
475          */
476 
477         bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
478 
479         ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
480         if (ret < 0) {
481             trace_block_copy_read_fail(s, offset, ret);
482             *error_is_read = true;
483             goto out;
484         }
485 
486         ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
487                              s->write_flags);
488         if (ret < 0) {
489             trace_block_copy_write_fail(s, offset, ret);
490             *error_is_read = false;
491             goto out;
492         }
493 
494     out:
495         qemu_vfree(bounce_buffer);
496         break;
497 
498     default:
499         abort();
500     }
501 
502     return ret;
503 }
504 
505 static coroutine_fn int block_copy_task_entry(AioTask *task)
506 {
507     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
508     BlockCopyState *s = t->s;
509     bool error_is_read = false;
510     BlockCopyMethod method = t->method;
511     int ret;
512 
513     ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
514 
515     WITH_QEMU_LOCK_GUARD(&s->lock) {
516         if (s->method == t->method) {
517             s->method = method;
518         }
519 
520         if (ret < 0) {
521             if (!t->call_state->ret) {
522                 t->call_state->ret = ret;
523                 t->call_state->error_is_read = error_is_read;
524             }
525         } else {
526             progress_work_done(s->progress, t->bytes);
527         }
528     }
529     co_put_to_shres(s->mem, t->bytes);
530     block_copy_task_end(t, ret);
531 
532     return ret;
533 }
534 
535 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
536                                    int64_t bytes, int64_t *pnum)
537 {
538     int64_t num;
539     BlockDriverState *base;
540     int ret;
541 
542     if (qatomic_read(&s->skip_unallocated)) {
543         base = bdrv_backing_chain_next(s->source->bs);
544     } else {
545         base = NULL;
546     }
547 
548     ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
549                                   NULL, NULL);
550     if (ret < 0 || num < s->cluster_size) {
551         /*
552          * On error or if failed to obtain large enough chunk just fallback to
553          * copy one cluster.
554          */
555         num = s->cluster_size;
556         ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
557     } else if (offset + num == s->len) {
558         num = QEMU_ALIGN_UP(num, s->cluster_size);
559     } else {
560         num = QEMU_ALIGN_DOWN(num, s->cluster_size);
561     }
562 
563     *pnum = num;
564     return ret;
565 }
566 
567 /*
568  * Check if the cluster starting at offset is allocated or not.
569  * return via pnum the number of contiguous clusters sharing this allocation.
570  */
571 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
572                                            int64_t *pnum)
573 {
574     BlockDriverState *bs = s->source->bs;
575     int64_t count, total_count = 0;
576     int64_t bytes = s->len - offset;
577     int ret;
578 
579     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
580 
581     while (true) {
582         ret = bdrv_is_allocated(bs, offset, bytes, &count);
583         if (ret < 0) {
584             return ret;
585         }
586 
587         total_count += count;
588 
589         if (ret || count == 0) {
590             /*
591              * ret: partial segment(s) are considered allocated.
592              * otherwise: unallocated tail is treated as an entire segment.
593              */
594             *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
595             return ret;
596         }
597 
598         /* Unallocated segment(s) with uncertain following segment(s) */
599         if (total_count >= s->cluster_size) {
600             *pnum = total_count / s->cluster_size;
601             return 0;
602         }
603 
604         offset += count;
605         bytes -= count;
606     }
607 }
608 
609 /*
610  * Reset bits in copy_bitmap starting at offset if they represent unallocated
611  * data in the image. May reset subsequent contiguous bits.
612  * @return 0 when the cluster at @offset was unallocated,
613  *         1 otherwise, and -ret on error.
614  */
615 int64_t block_copy_reset_unallocated(BlockCopyState *s,
616                                      int64_t offset, int64_t *count)
617 {
618     int ret;
619     int64_t clusters, bytes;
620 
621     ret = block_copy_is_cluster_allocated(s, offset, &clusters);
622     if (ret < 0) {
623         return ret;
624     }
625 
626     bytes = clusters * s->cluster_size;
627 
628     if (!ret) {
629         qemu_co_mutex_lock(&s->lock);
630         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
631         progress_set_remaining(s->progress,
632                                bdrv_get_dirty_count(s->copy_bitmap) +
633                                s->in_flight_bytes);
634         qemu_co_mutex_unlock(&s->lock);
635     }
636 
637     *count = bytes;
638     return ret;
639 }
640 
641 /*
642  * block_copy_dirty_clusters
643  *
644  * Copy dirty clusters in @offset/@bytes range.
645  * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
646  * clusters found and -errno on failure.
647  */
648 static int coroutine_fn
649 block_copy_dirty_clusters(BlockCopyCallState *call_state)
650 {
651     BlockCopyState *s = call_state->s;
652     int64_t offset = call_state->offset;
653     int64_t bytes = call_state->bytes;
654 
655     int ret = 0;
656     bool found_dirty = false;
657     int64_t end = offset + bytes;
658     AioTaskPool *aio = NULL;
659 
660     /*
661      * block_copy() user is responsible for keeping source and target in same
662      * aio context
663      */
664     assert(bdrv_get_aio_context(s->source->bs) ==
665            bdrv_get_aio_context(s->target->bs));
666 
667     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
668     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
669 
670     while (bytes && aio_task_pool_status(aio) == 0 &&
671            !qatomic_read(&call_state->cancelled)) {
672         BlockCopyTask *task;
673         int64_t status_bytes;
674 
675         task = block_copy_task_create(s, call_state, offset, bytes);
676         if (!task) {
677             /* No more dirty bits in the bitmap */
678             trace_block_copy_skip_range(s, offset, bytes);
679             break;
680         }
681         if (task->offset > offset) {
682             trace_block_copy_skip_range(s, offset, task->offset - offset);
683         }
684 
685         found_dirty = true;
686 
687         ret = block_copy_block_status(s, task->offset, task->bytes,
688                                       &status_bytes);
689         assert(ret >= 0); /* never fail */
690         if (status_bytes < task->bytes) {
691             block_copy_task_shrink(task, status_bytes);
692         }
693         if (qatomic_read(&s->skip_unallocated) &&
694             !(ret & BDRV_BLOCK_ALLOCATED)) {
695             block_copy_task_end(task, 0);
696             trace_block_copy_skip_range(s, task->offset, task->bytes);
697             offset = task_end(task);
698             bytes = end - offset;
699             g_free(task);
700             continue;
701         }
702         if (ret & BDRV_BLOCK_ZERO) {
703             task->method = COPY_WRITE_ZEROES;
704         }
705 
706         if (!call_state->ignore_ratelimit) {
707             uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
708             if (ns > 0) {
709                 block_copy_task_end(task, -EAGAIN);
710                 g_free(task);
711                 qemu_co_sleep_ns_wakeable(&call_state->sleep,
712                                           QEMU_CLOCK_REALTIME, ns);
713                 continue;
714             }
715         }
716 
717         ratelimit_calculate_delay(&s->rate_limit, task->bytes);
718 
719         trace_block_copy_process(s, task->offset);
720 
721         co_get_from_shres(s->mem, task->bytes);
722 
723         offset = task_end(task);
724         bytes = end - offset;
725 
726         if (!aio && bytes) {
727             aio = aio_task_pool_new(call_state->max_workers);
728         }
729 
730         ret = block_copy_task_run(aio, task);
731         if (ret < 0) {
732             goto out;
733         }
734     }
735 
736 out:
737     if (aio) {
738         aio_task_pool_wait_all(aio);
739 
740         /*
741          * We are not really interested in -ECANCELED returned from
742          * block_copy_task_run. If it fails, it means some task already failed
743          * for real reason, let's return first failure.
744          * Still, assert that we don't rewrite failure by success.
745          *
746          * Note: ret may be positive here because of block-status result.
747          */
748         assert(ret >= 0 || aio_task_pool_status(aio) < 0);
749         ret = aio_task_pool_status(aio);
750 
751         aio_task_pool_free(aio);
752     }
753 
754     return ret < 0 ? ret : found_dirty;
755 }
756 
757 void block_copy_kick(BlockCopyCallState *call_state)
758 {
759     qemu_co_sleep_wake(&call_state->sleep);
760 }
761 
762 /*
763  * block_copy_common
764  *
765  * Copy requested region, accordingly to dirty bitmap.
766  * Collaborate with parallel block_copy requests: if they succeed it will help
767  * us. If they fail, we will retry not-copied regions. So, if we return error,
768  * it means that some I/O operation failed in context of _this_ block_copy call,
769  * not some parallel operation.
770  */
771 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
772 {
773     int ret;
774     BlockCopyState *s = call_state->s;
775 
776     qemu_co_mutex_lock(&s->lock);
777     QLIST_INSERT_HEAD(&s->calls, call_state, list);
778     qemu_co_mutex_unlock(&s->lock);
779 
780     do {
781         ret = block_copy_dirty_clusters(call_state);
782 
783         if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
784             WITH_QEMU_LOCK_GUARD(&s->lock) {
785                 /*
786                  * Check that there is no task we still need to
787                  * wait to complete
788                  */
789                 ret = block_copy_wait_one(s, call_state->offset,
790                                           call_state->bytes);
791                 if (ret == 0) {
792                     /*
793                      * No pending tasks, but check again the bitmap in this
794                      * same critical section, since a task might have failed
795                      * between this and the critical section in
796                      * block_copy_dirty_clusters().
797                      *
798                      * block_copy_wait_one return value 0 also means that it
799                      * didn't release the lock. So, we are still in the same
800                      * critical section, not interrupted by any concurrent
801                      * access to state.
802                      */
803                     ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
804                                                        call_state->offset,
805                                                        call_state->bytes) >= 0;
806                 }
807             }
808         }
809 
810         /*
811          * We retry in two cases:
812          * 1. Some progress done
813          *    Something was copied, which means that there were yield points
814          *    and some new dirty bits may have appeared (due to failed parallel
815          *    block-copy requests).
816          * 2. We have waited for some intersecting block-copy request
817          *    It may have failed and produced new dirty bits.
818          */
819     } while (ret > 0 && !qatomic_read(&call_state->cancelled));
820 
821     qatomic_store_release(&call_state->finished, true);
822 
823     if (call_state->cb) {
824         call_state->cb(call_state->cb_opaque);
825     }
826 
827     qemu_co_mutex_lock(&s->lock);
828     QLIST_REMOVE(call_state, list);
829     qemu_co_mutex_unlock(&s->lock);
830 
831     return ret;
832 }
833 
834 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
835                             bool ignore_ratelimit)
836 {
837     BlockCopyCallState call_state = {
838         .s = s,
839         .offset = start,
840         .bytes = bytes,
841         .ignore_ratelimit = ignore_ratelimit,
842         .max_workers = BLOCK_COPY_MAX_WORKERS,
843     };
844 
845     return block_copy_common(&call_state);
846 }
847 
848 static void coroutine_fn block_copy_async_co_entry(void *opaque)
849 {
850     block_copy_common(opaque);
851 }
852 
853 BlockCopyCallState *block_copy_async(BlockCopyState *s,
854                                      int64_t offset, int64_t bytes,
855                                      int max_workers, int64_t max_chunk,
856                                      BlockCopyAsyncCallbackFunc cb,
857                                      void *cb_opaque)
858 {
859     BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
860 
861     *call_state = (BlockCopyCallState) {
862         .s = s,
863         .offset = offset,
864         .bytes = bytes,
865         .max_workers = max_workers,
866         .max_chunk = max_chunk,
867         .cb = cb,
868         .cb_opaque = cb_opaque,
869 
870         .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
871     };
872 
873     qemu_coroutine_enter(call_state->co);
874 
875     return call_state;
876 }
877 
878 void block_copy_call_free(BlockCopyCallState *call_state)
879 {
880     if (!call_state) {
881         return;
882     }
883 
884     assert(qatomic_read(&call_state->finished));
885     g_free(call_state);
886 }
887 
888 bool block_copy_call_finished(BlockCopyCallState *call_state)
889 {
890     return qatomic_read(&call_state->finished);
891 }
892 
893 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
894 {
895     return qatomic_load_acquire(&call_state->finished) &&
896            !qatomic_read(&call_state->cancelled) &&
897            call_state->ret == 0;
898 }
899 
900 bool block_copy_call_failed(BlockCopyCallState *call_state)
901 {
902     return qatomic_load_acquire(&call_state->finished) &&
903            !qatomic_read(&call_state->cancelled) &&
904            call_state->ret < 0;
905 }
906 
907 bool block_copy_call_cancelled(BlockCopyCallState *call_state)
908 {
909     return qatomic_read(&call_state->cancelled);
910 }
911 
912 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
913 {
914     assert(qatomic_load_acquire(&call_state->finished));
915     if (error_is_read) {
916         *error_is_read = call_state->error_is_read;
917     }
918     return call_state->ret;
919 }
920 
921 /*
922  * Note that cancelling and finishing are racy.
923  * User can cancel a block-copy that is already finished.
924  */
925 void block_copy_call_cancel(BlockCopyCallState *call_state)
926 {
927     qatomic_set(&call_state->cancelled, true);
928     block_copy_kick(call_state);
929 }
930 
931 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
932 {
933     return s->copy_bitmap;
934 }
935 
936 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
937 {
938     qatomic_set(&s->skip_unallocated, skip);
939 }
940 
941 void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
942 {
943     ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
944 
945     /*
946      * Note: it's good to kick all call states from here, but it should be done
947      * only from a coroutine, to not crash if s->calls list changed while
948      * entering one call. So for now, the only user of this function kicks its
949      * only one call_state by hand.
950      */
951 }
952