xref: /qemu/block/block-copy.c (revision c888f7e0)
1 /*
2  * block_copy API
3  *
4  * Copyright (C) 2013 Proxmox Server Solutions
5  * Copyright (c) 2019 Virtuozzo International GmbH.
6  *
7  * Authors:
8  *  Dietmar Maurer (dietmar@proxmox.com)
9  *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
24 
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
29 
30 static coroutine_fn int block_copy_task_entry(AioTask *task);
31 
32 typedef struct BlockCopyCallState {
33     bool failed;
34     bool error_is_read;
35 } BlockCopyCallState;
36 
37 typedef struct BlockCopyTask {
38     AioTask task;
39 
40     BlockCopyState *s;
41     BlockCopyCallState *call_state;
42     int64_t offset;
43     int64_t bytes;
44     bool zeroes;
45     QLIST_ENTRY(BlockCopyTask) list;
46     CoQueue wait_queue; /* coroutines blocked on this task */
47 } BlockCopyTask;
48 
49 static int64_t task_end(BlockCopyTask *task)
50 {
51     return task->offset + task->bytes;
52 }
53 
54 typedef struct BlockCopyState {
55     /*
56      * BdrvChild objects are not owned or managed by block-copy. They are
57      * provided by block-copy user and user is responsible for appropriate
58      * permissions on these children.
59      */
60     BdrvChild *source;
61     BdrvChild *target;
62     BdrvDirtyBitmap *copy_bitmap;
63     int64_t in_flight_bytes;
64     int64_t cluster_size;
65     bool use_copy_range;
66     int64_t copy_size;
67     uint64_t len;
68     QLIST_HEAD(, BlockCopyTask) tasks;
69 
70     BdrvRequestFlags write_flags;
71 
72     /*
73      * skip_unallocated:
74      *
75      * Used by sync=top jobs, which first scan the source node for unallocated
76      * areas and clear them in the copy_bitmap.  During this process, the bitmap
77      * is thus not fully initialized: It may still have bits set for areas that
78      * are unallocated and should actually not be copied.
79      *
80      * This is indicated by skip_unallocated.
81      *
82      * In this case, block_copy() will query the source’s allocation status,
83      * skip unallocated regions, clear them in the copy_bitmap, and invoke
84      * block_copy_reset_unallocated() every time it does.
85      */
86     bool skip_unallocated;
87 
88     ProgressMeter *progress;
89     /* progress_bytes_callback: called when some copying progress is done. */
90     ProgressBytesCallbackFunc progress_bytes_callback;
91     void *progress_opaque;
92 
93     SharedResource *mem;
94 } BlockCopyState;
95 
96 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
97                                             int64_t offset, int64_t bytes)
98 {
99     BlockCopyTask *t;
100 
101     QLIST_FOREACH(t, &s->tasks, list) {
102         if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
103             return t;
104         }
105     }
106 
107     return NULL;
108 }
109 
110 /*
111  * If there are no intersecting tasks return false. Otherwise, wait for the
112  * first found intersecting tasks to finish and return true.
113  */
114 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
115                                              int64_t bytes)
116 {
117     BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
118 
119     if (!task) {
120         return false;
121     }
122 
123     qemu_co_queue_wait(&task->wait_queue, NULL);
124 
125     return true;
126 }
127 
128 /*
129  * Search for the first dirty area in offset/bytes range and create task at
130  * the beginning of it.
131  */
132 static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
133                                              BlockCopyCallState *call_state,
134                                              int64_t offset, int64_t bytes)
135 {
136     BlockCopyTask *task;
137 
138     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
139                                            offset, offset + bytes,
140                                            s->copy_size, &offset, &bytes))
141     {
142         return NULL;
143     }
144 
145     /* region is dirty, so no existent tasks possible in it */
146     assert(!find_conflicting_task(s, offset, bytes));
147 
148     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
149     s->in_flight_bytes += bytes;
150 
151     task = g_new(BlockCopyTask, 1);
152     *task = (BlockCopyTask) {
153         .task.func = block_copy_task_entry,
154         .s = s,
155         .call_state = call_state,
156         .offset = offset,
157         .bytes = bytes,
158     };
159     qemu_co_queue_init(&task->wait_queue);
160     QLIST_INSERT_HEAD(&s->tasks, task, list);
161 
162     return task;
163 }
164 
165 /*
166  * block_copy_task_shrink
167  *
168  * Drop the tail of the task to be handled later. Set dirty bits back and
169  * wake up all tasks waiting for us (may be some of them are not intersecting
170  * with shrunk task)
171  */
172 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
173                                                 int64_t new_bytes)
174 {
175     if (new_bytes == task->bytes) {
176         return;
177     }
178 
179     assert(new_bytes > 0 && new_bytes < task->bytes);
180 
181     task->s->in_flight_bytes -= task->bytes - new_bytes;
182     bdrv_set_dirty_bitmap(task->s->copy_bitmap,
183                           task->offset + new_bytes, task->bytes - new_bytes);
184 
185     task->bytes = new_bytes;
186     qemu_co_queue_restart_all(&task->wait_queue);
187 }
188 
189 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
190 {
191     task->s->in_flight_bytes -= task->bytes;
192     if (ret < 0) {
193         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
194     }
195     QLIST_REMOVE(task, list);
196     qemu_co_queue_restart_all(&task->wait_queue);
197 }
198 
199 void block_copy_state_free(BlockCopyState *s)
200 {
201     if (!s) {
202         return;
203     }
204 
205     bdrv_release_dirty_bitmap(s->copy_bitmap);
206     shres_destroy(s->mem);
207     g_free(s);
208 }
209 
210 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
211 {
212     return MIN_NON_ZERO(INT_MAX,
213                         MIN_NON_ZERO(source->bs->bl.max_transfer,
214                                      target->bs->bl.max_transfer));
215 }
216 
217 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
218                                      int64_t cluster_size,
219                                      BdrvRequestFlags write_flags, Error **errp)
220 {
221     BlockCopyState *s;
222     BdrvDirtyBitmap *copy_bitmap;
223 
224     copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
225                                            errp);
226     if (!copy_bitmap) {
227         return NULL;
228     }
229     bdrv_disable_dirty_bitmap(copy_bitmap);
230 
231     s = g_new(BlockCopyState, 1);
232     *s = (BlockCopyState) {
233         .source = source,
234         .target = target,
235         .copy_bitmap = copy_bitmap,
236         .cluster_size = cluster_size,
237         .len = bdrv_dirty_bitmap_size(copy_bitmap),
238         .write_flags = write_flags,
239         .mem = shres_create(BLOCK_COPY_MAX_MEM),
240     };
241 
242     if (block_copy_max_transfer(source, target) < cluster_size) {
243         /*
244          * copy_range does not respect max_transfer. We don't want to bother
245          * with requests smaller than block-copy cluster size, so fallback to
246          * buffered copying (read and write respect max_transfer on their
247          * behalf).
248          */
249         s->use_copy_range = false;
250         s->copy_size = cluster_size;
251     } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
252         /* Compression supports only cluster-size writes and no copy-range. */
253         s->use_copy_range = false;
254         s->copy_size = cluster_size;
255     } else {
256         /*
257          * We enable copy-range, but keep small copy_size, until first
258          * successful copy_range (look at block_copy_do_copy).
259          */
260         s->use_copy_range = true;
261         s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
262     }
263 
264     QLIST_INIT(&s->tasks);
265 
266     return s;
267 }
268 
269 void block_copy_set_progress_callback(
270         BlockCopyState *s,
271         ProgressBytesCallbackFunc progress_bytes_callback,
272         void *progress_opaque)
273 {
274     s->progress_bytes_callback = progress_bytes_callback;
275     s->progress_opaque = progress_opaque;
276 }
277 
278 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
279 {
280     s->progress = pm;
281 }
282 
283 /*
284  * Takes ownership of @task
285  *
286  * If pool is NULL directly run the task, otherwise schedule it into the pool.
287  *
288  * Returns: task.func return code if pool is NULL
289  *          otherwise -ECANCELED if pool status is bad
290  *          otherwise 0 (successfully scheduled)
291  */
292 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
293                                             BlockCopyTask *task)
294 {
295     if (!pool) {
296         int ret = task->task.func(&task->task);
297 
298         g_free(task);
299         return ret;
300     }
301 
302     aio_task_pool_wait_slot(pool);
303     if (aio_task_pool_status(pool) < 0) {
304         co_put_to_shres(task->s->mem, task->bytes);
305         block_copy_task_end(task, -ECANCELED);
306         g_free(task);
307         return -ECANCELED;
308     }
309 
310     aio_task_pool_start_task(pool, &task->task);
311 
312     return 0;
313 }
314 
315 /*
316  * block_copy_do_copy
317  *
318  * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
319  * s->len only to cover last cluster when s->len is not aligned to clusters.
320  *
321  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
322  *
323  * Returns 0 on success.
324  */
325 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
326                                            int64_t offset, int64_t bytes,
327                                            bool zeroes, bool *error_is_read)
328 {
329     int ret;
330     int64_t nbytes = MIN(offset + bytes, s->len) - offset;
331     void *bounce_buffer = NULL;
332 
333     assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
334     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
335     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
336     assert(offset < s->len);
337     assert(offset + bytes <= s->len ||
338            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
339     assert(nbytes < INT_MAX);
340 
341     if (zeroes) {
342         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
343                                     ~BDRV_REQ_WRITE_COMPRESSED);
344         if (ret < 0) {
345             trace_block_copy_write_zeroes_fail(s, offset, ret);
346             if (error_is_read) {
347                 *error_is_read = false;
348             }
349         }
350         return ret;
351     }
352 
353     if (s->use_copy_range) {
354         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
355                                  0, s->write_flags);
356         if (ret < 0) {
357             trace_block_copy_copy_range_fail(s, offset, ret);
358             s->use_copy_range = false;
359             s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
360             /* Fallback to read+write with allocated buffer */
361         } else {
362             if (s->use_copy_range) {
363                 /*
364                  * Successful copy-range. Now increase copy_size.  copy_range
365                  * does not respect max_transfer (it's a TODO), so we factor
366                  * that in here.
367                  *
368                  * Note: we double-check s->use_copy_range for the case when
369                  * parallel block-copy request unsets it during previous
370                  * bdrv_co_copy_range call.
371                  */
372                 s->copy_size =
373                         MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
374                             QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source,
375                                                                     s->target),
376                                             s->cluster_size));
377             }
378             goto out;
379         }
380     }
381 
382     /*
383      * In case of failed copy_range request above, we may proceed with buffered
384      * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will
385      * be properly limited, so don't care too much. Moreover the most likely
386      * case (copy_range is unsupported for the configuration, so the very first
387      * copy_range request fails) is handled by setting large copy_size only
388      * after first successful copy_range.
389      */
390 
391     bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
392 
393     ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
394     if (ret < 0) {
395         trace_block_copy_read_fail(s, offset, ret);
396         if (error_is_read) {
397             *error_is_read = true;
398         }
399         goto out;
400     }
401 
402     ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
403                          s->write_flags);
404     if (ret < 0) {
405         trace_block_copy_write_fail(s, offset, ret);
406         if (error_is_read) {
407             *error_is_read = false;
408         }
409         goto out;
410     }
411 
412 out:
413     qemu_vfree(bounce_buffer);
414 
415     return ret;
416 }
417 
418 static coroutine_fn int block_copy_task_entry(AioTask *task)
419 {
420     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
421     bool error_is_read;
422     int ret;
423 
424     ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
425                              &error_is_read);
426     if (ret < 0 && !t->call_state->failed) {
427         t->call_state->failed = true;
428         t->call_state->error_is_read = error_is_read;
429     } else {
430         progress_work_done(t->s->progress, t->bytes);
431         t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
432     }
433     co_put_to_shres(t->s->mem, t->bytes);
434     block_copy_task_end(t, ret);
435 
436     return ret;
437 }
438 
439 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
440                                    int64_t bytes, int64_t *pnum)
441 {
442     int64_t num;
443     BlockDriverState *base;
444     int ret;
445 
446     if (s->skip_unallocated && s->source->bs->backing) {
447         base = s->source->bs->backing->bs;
448     } else {
449         base = NULL;
450     }
451 
452     ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
453                                   NULL, NULL);
454     if (ret < 0 || num < s->cluster_size) {
455         /*
456          * On error or if failed to obtain large enough chunk just fallback to
457          * copy one cluster.
458          */
459         num = s->cluster_size;
460         ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
461     } else if (offset + num == s->len) {
462         num = QEMU_ALIGN_UP(num, s->cluster_size);
463     } else {
464         num = QEMU_ALIGN_DOWN(num, s->cluster_size);
465     }
466 
467     *pnum = num;
468     return ret;
469 }
470 
471 /*
472  * Check if the cluster starting at offset is allocated or not.
473  * return via pnum the number of contiguous clusters sharing this allocation.
474  */
475 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
476                                            int64_t *pnum)
477 {
478     BlockDriverState *bs = s->source->bs;
479     int64_t count, total_count = 0;
480     int64_t bytes = s->len - offset;
481     int ret;
482 
483     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
484 
485     while (true) {
486         ret = bdrv_is_allocated(bs, offset, bytes, &count);
487         if (ret < 0) {
488             return ret;
489         }
490 
491         total_count += count;
492 
493         if (ret || count == 0) {
494             /*
495              * ret: partial segment(s) are considered allocated.
496              * otherwise: unallocated tail is treated as an entire segment.
497              */
498             *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
499             return ret;
500         }
501 
502         /* Unallocated segment(s) with uncertain following segment(s) */
503         if (total_count >= s->cluster_size) {
504             *pnum = total_count / s->cluster_size;
505             return 0;
506         }
507 
508         offset += count;
509         bytes -= count;
510     }
511 }
512 
513 /*
514  * Reset bits in copy_bitmap starting at offset if they represent unallocated
515  * data in the image. May reset subsequent contiguous bits.
516  * @return 0 when the cluster at @offset was unallocated,
517  *         1 otherwise, and -ret on error.
518  */
519 int64_t block_copy_reset_unallocated(BlockCopyState *s,
520                                      int64_t offset, int64_t *count)
521 {
522     int ret;
523     int64_t clusters, bytes;
524 
525     ret = block_copy_is_cluster_allocated(s, offset, &clusters);
526     if (ret < 0) {
527         return ret;
528     }
529 
530     bytes = clusters * s->cluster_size;
531 
532     if (!ret) {
533         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
534         progress_set_remaining(s->progress,
535                                bdrv_get_dirty_count(s->copy_bitmap) +
536                                s->in_flight_bytes);
537     }
538 
539     *count = bytes;
540     return ret;
541 }
542 
543 /*
544  * block_copy_dirty_clusters
545  *
546  * Copy dirty clusters in @offset/@bytes range.
547  * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
548  * clusters found and -errno on failure.
549  */
550 static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
551                                                   int64_t offset, int64_t bytes,
552                                                   bool *error_is_read)
553 {
554     int ret = 0;
555     bool found_dirty = false;
556     int64_t end = offset + bytes;
557     AioTaskPool *aio = NULL;
558     BlockCopyCallState call_state = {false, false};
559 
560     /*
561      * block_copy() user is responsible for keeping source and target in same
562      * aio context
563      */
564     assert(bdrv_get_aio_context(s->source->bs) ==
565            bdrv_get_aio_context(s->target->bs));
566 
567     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
568     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
569 
570     while (bytes && aio_task_pool_status(aio) == 0) {
571         BlockCopyTask *task;
572         int64_t status_bytes;
573 
574         task = block_copy_task_create(s, &call_state, offset, bytes);
575         if (!task) {
576             /* No more dirty bits in the bitmap */
577             trace_block_copy_skip_range(s, offset, bytes);
578             break;
579         }
580         if (task->offset > offset) {
581             trace_block_copy_skip_range(s, offset, task->offset - offset);
582         }
583 
584         found_dirty = true;
585 
586         ret = block_copy_block_status(s, task->offset, task->bytes,
587                                       &status_bytes);
588         assert(ret >= 0); /* never fail */
589         if (status_bytes < task->bytes) {
590             block_copy_task_shrink(task, status_bytes);
591         }
592         if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
593             block_copy_task_end(task, 0);
594             progress_set_remaining(s->progress,
595                                    bdrv_get_dirty_count(s->copy_bitmap) +
596                                    s->in_flight_bytes);
597             trace_block_copy_skip_range(s, task->offset, task->bytes);
598             offset = task_end(task);
599             bytes = end - offset;
600             g_free(task);
601             continue;
602         }
603         task->zeroes = ret & BDRV_BLOCK_ZERO;
604 
605         trace_block_copy_process(s, task->offset);
606 
607         co_get_from_shres(s->mem, task->bytes);
608 
609         offset = task_end(task);
610         bytes = end - offset;
611 
612         if (!aio && bytes) {
613             aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS);
614         }
615 
616         ret = block_copy_task_run(aio, task);
617         if (ret < 0) {
618             goto out;
619         }
620     }
621 
622 out:
623     if (aio) {
624         aio_task_pool_wait_all(aio);
625 
626         /*
627          * We are not really interested in -ECANCELED returned from
628          * block_copy_task_run. If it fails, it means some task already failed
629          * for real reason, let's return first failure.
630          * Still, assert that we don't rewrite failure by success.
631          */
632         assert(ret == 0 || aio_task_pool_status(aio) < 0);
633         ret = aio_task_pool_status(aio);
634 
635         aio_task_pool_free(aio);
636     }
637     if (error_is_read && ret < 0) {
638         *error_is_read = call_state.error_is_read;
639     }
640 
641     return ret < 0 ? ret : found_dirty;
642 }
643 
644 /*
645  * block_copy
646  *
647  * Copy requested region, accordingly to dirty bitmap.
648  * Collaborate with parallel block_copy requests: if they succeed it will help
649  * us. If they fail, we will retry not-copied regions. So, if we return error,
650  * it means that some I/O operation failed in context of _this_ block_copy call,
651  * not some parallel operation.
652  */
653 int coroutine_fn block_copy(BlockCopyState *s, int64_t offset, int64_t bytes,
654                             bool *error_is_read)
655 {
656     int ret;
657 
658     do {
659         ret = block_copy_dirty_clusters(s, offset, bytes, error_is_read);
660 
661         if (ret == 0) {
662             ret = block_copy_wait_one(s, offset, bytes);
663         }
664 
665         /*
666          * We retry in two cases:
667          * 1. Some progress done
668          *    Something was copied, which means that there were yield points
669          *    and some new dirty bits may have appeared (due to failed parallel
670          *    block-copy requests).
671          * 2. We have waited for some intersecting block-copy request
672          *    It may have failed and produced new dirty bits.
673          */
674     } while (ret > 0);
675 
676     return ret;
677 }
678 
679 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
680 {
681     return s->copy_bitmap;
682 }
683 
684 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
685 {
686     s->skip_unallocated = skip;
687 }
688