]> git.proxmox.com Git - mirror_qemu.git/blob - block/block-copy.c
Merge remote-tracking branch 'remotes/vsementsov/tags/pull-jobs-2021-06-25' into...
[mirror_qemu.git] / block / block-copy.c
1 /*
2 * block_copy API
3 *
4 * Copyright (C) 2013 Proxmox Server Solutions
5 * Copyright (c) 2019 Virtuozzo International GmbH.
6 *
7 * Authors:
8 * Dietmar Maurer (dietmar@proxmox.com)
9 * Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
12 * See the COPYING file in the top-level directory.
13 */
14
15 #include "qemu/osdep.h"
16
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
24
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
29 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
30
31 typedef enum {
32 COPY_READ_WRITE_CLUSTER,
33 COPY_READ_WRITE,
34 COPY_WRITE_ZEROES,
35 COPY_RANGE_SMALL,
36 COPY_RANGE_FULL
37 } BlockCopyMethod;
38
39 static coroutine_fn int block_copy_task_entry(AioTask *task);
40
41 typedef struct BlockCopyCallState {
42 /* Fields initialized in block_copy_async() and never changed. */
43 BlockCopyState *s;
44 int64_t offset;
45 int64_t bytes;
46 int max_workers;
47 int64_t max_chunk;
48 bool ignore_ratelimit;
49 BlockCopyAsyncCallbackFunc cb;
50 void *cb_opaque;
51 /* Coroutine where async block-copy is running */
52 Coroutine *co;
53
54 /* Fields whose state changes throughout the execution */
55 bool finished; /* atomic */
56 QemuCoSleep sleep; /* TODO: protect API with a lock */
57 bool cancelled; /* atomic */
58 /* To reference all call states from BlockCopyState */
59 QLIST_ENTRY(BlockCopyCallState) list;
60
61 /*
62 * Fields that report information about return values and erros.
63 * Protected by lock in BlockCopyState.
64 */
65 bool error_is_read;
66 /*
67 * @ret is set concurrently by tasks under mutex. Only set once by first
68 * failed task (and untouched if no task failed).
69 * After finishing (call_state->finished is true), it is not modified
70 * anymore and may be safely read without mutex.
71 */
72 int ret;
73 } BlockCopyCallState;
74
75 typedef struct BlockCopyTask {
76 AioTask task;
77
78 /*
79 * Fields initialized in block_copy_task_create()
80 * and never changed.
81 */
82 BlockCopyState *s;
83 BlockCopyCallState *call_state;
84 int64_t offset;
85 /*
86 * @method can also be set again in the while loop of
87 * block_copy_dirty_clusters(), but it is never accessed concurrently
88 * because the only other function that reads it is
89 * block_copy_task_entry() and it is invoked afterwards in the same
90 * iteration.
91 */
92 BlockCopyMethod method;
93
94 /*
95 * Fields whose state changes throughout the execution
96 * Protected by lock in BlockCopyState.
97 */
98 CoQueue wait_queue; /* coroutines blocked on this task */
99 /*
100 * Only protect the case of parallel read while updating @bytes
101 * value in block_copy_task_shrink().
102 */
103 int64_t bytes;
104 QLIST_ENTRY(BlockCopyTask) list;
105 } BlockCopyTask;
106
107 static int64_t task_end(BlockCopyTask *task)
108 {
109 return task->offset + task->bytes;
110 }
111
112 typedef struct BlockCopyState {
113 /*
114 * BdrvChild objects are not owned or managed by block-copy. They are
115 * provided by block-copy user and user is responsible for appropriate
116 * permissions on these children.
117 */
118 BdrvChild *source;
119 BdrvChild *target;
120
121 /*
122 * Fields initialized in block_copy_state_new()
123 * and never changed.
124 */
125 int64_t cluster_size;
126 int64_t max_transfer;
127 uint64_t len;
128 BdrvRequestFlags write_flags;
129
130 /*
131 * Fields whose state changes throughout the execution
132 * Protected by lock.
133 */
134 CoMutex lock;
135 int64_t in_flight_bytes;
136 BlockCopyMethod method;
137 QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
138 QLIST_HEAD(, BlockCopyCallState) calls;
139 /*
140 * skip_unallocated:
141 *
142 * Used by sync=top jobs, which first scan the source node for unallocated
143 * areas and clear them in the copy_bitmap. During this process, the bitmap
144 * is thus not fully initialized: It may still have bits set for areas that
145 * are unallocated and should actually not be copied.
146 *
147 * This is indicated by skip_unallocated.
148 *
149 * In this case, block_copy() will query the source’s allocation status,
150 * skip unallocated regions, clear them in the copy_bitmap, and invoke
151 * block_copy_reset_unallocated() every time it does.
152 */
153 bool skip_unallocated; /* atomic */
154 /* State fields that use a thread-safe API */
155 BdrvDirtyBitmap *copy_bitmap;
156 ProgressMeter *progress;
157 SharedResource *mem;
158 RateLimit rate_limit;
159 } BlockCopyState;
160
161 /* Called with lock held */
162 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
163 int64_t offset, int64_t bytes)
164 {
165 BlockCopyTask *t;
166
167 QLIST_FOREACH(t, &s->tasks, list) {
168 if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
169 return t;
170 }
171 }
172
173 return NULL;
174 }
175
176 /*
177 * If there are no intersecting tasks return false. Otherwise, wait for the
178 * first found intersecting tasks to finish and return true.
179 *
180 * Called with lock held. May temporary release the lock.
181 * Return value of 0 proves that lock was NOT released.
182 */
183 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
184 int64_t bytes)
185 {
186 BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
187
188 if (!task) {
189 return false;
190 }
191
192 qemu_co_queue_wait(&task->wait_queue, &s->lock);
193
194 return true;
195 }
196
197 /* Called with lock held */
198 static int64_t block_copy_chunk_size(BlockCopyState *s)
199 {
200 switch (s->method) {
201 case COPY_READ_WRITE_CLUSTER:
202 return s->cluster_size;
203 case COPY_READ_WRITE:
204 case COPY_RANGE_SMALL:
205 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
206 s->max_transfer);
207 case COPY_RANGE_FULL:
208 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
209 s->max_transfer);
210 default:
211 /* Cannot have COPY_WRITE_ZEROES here. */
212 abort();
213 }
214 }
215
216 /*
217 * Search for the first dirty area in offset/bytes range and create task at
218 * the beginning of it.
219 */
220 static coroutine_fn BlockCopyTask *
221 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
222 int64_t offset, int64_t bytes)
223 {
224 BlockCopyTask *task;
225 int64_t max_chunk;
226
227 QEMU_LOCK_GUARD(&s->lock);
228 max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
229 if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
230 offset, offset + bytes,
231 max_chunk, &offset, &bytes))
232 {
233 return NULL;
234 }
235
236 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
237 bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
238
239 /* region is dirty, so no existent tasks possible in it */
240 assert(!find_conflicting_task(s, offset, bytes));
241
242 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
243 s->in_flight_bytes += bytes;
244
245 task = g_new(BlockCopyTask, 1);
246 *task = (BlockCopyTask) {
247 .task.func = block_copy_task_entry,
248 .s = s,
249 .call_state = call_state,
250 .offset = offset,
251 .bytes = bytes,
252 .method = s->method,
253 };
254 qemu_co_queue_init(&task->wait_queue);
255 QLIST_INSERT_HEAD(&s->tasks, task, list);
256
257 return task;
258 }
259
260 /*
261 * block_copy_task_shrink
262 *
263 * Drop the tail of the task to be handled later. Set dirty bits back and
264 * wake up all tasks waiting for us (may be some of them are not intersecting
265 * with shrunk task)
266 */
267 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
268 int64_t new_bytes)
269 {
270 QEMU_LOCK_GUARD(&task->s->lock);
271 if (new_bytes == task->bytes) {
272 return;
273 }
274
275 assert(new_bytes > 0 && new_bytes < task->bytes);
276
277 task->s->in_flight_bytes -= task->bytes - new_bytes;
278 bdrv_set_dirty_bitmap(task->s->copy_bitmap,
279 task->offset + new_bytes, task->bytes - new_bytes);
280
281 task->bytes = new_bytes;
282 qemu_co_queue_restart_all(&task->wait_queue);
283 }
284
285 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
286 {
287 QEMU_LOCK_GUARD(&task->s->lock);
288 task->s->in_flight_bytes -= task->bytes;
289 if (ret < 0) {
290 bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
291 }
292 QLIST_REMOVE(task, list);
293 progress_set_remaining(task->s->progress,
294 bdrv_get_dirty_count(task->s->copy_bitmap) +
295 task->s->in_flight_bytes);
296 qemu_co_queue_restart_all(&task->wait_queue);
297 }
298
299 void block_copy_state_free(BlockCopyState *s)
300 {
301 if (!s) {
302 return;
303 }
304
305 ratelimit_destroy(&s->rate_limit);
306 bdrv_release_dirty_bitmap(s->copy_bitmap);
307 shres_destroy(s->mem);
308 g_free(s);
309 }
310
311 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
312 {
313 return MIN_NON_ZERO(INT_MAX,
314 MIN_NON_ZERO(source->bs->bl.max_transfer,
315 target->bs->bl.max_transfer));
316 }
317
318 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
319 int64_t cluster_size, bool use_copy_range,
320 BdrvRequestFlags write_flags, Error **errp)
321 {
322 BlockCopyState *s;
323 BdrvDirtyBitmap *copy_bitmap;
324
325 copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
326 errp);
327 if (!copy_bitmap) {
328 return NULL;
329 }
330 bdrv_disable_dirty_bitmap(copy_bitmap);
331
332 s = g_new(BlockCopyState, 1);
333 *s = (BlockCopyState) {
334 .source = source,
335 .target = target,
336 .copy_bitmap = copy_bitmap,
337 .cluster_size = cluster_size,
338 .len = bdrv_dirty_bitmap_size(copy_bitmap),
339 .write_flags = write_flags,
340 .mem = shres_create(BLOCK_COPY_MAX_MEM),
341 .max_transfer = QEMU_ALIGN_DOWN(
342 block_copy_max_transfer(source, target),
343 cluster_size),
344 };
345
346 if (s->max_transfer < cluster_size) {
347 /*
348 * copy_range does not respect max_transfer. We don't want to bother
349 * with requests smaller than block-copy cluster size, so fallback to
350 * buffered copying (read and write respect max_transfer on their
351 * behalf).
352 */
353 s->method = COPY_READ_WRITE_CLUSTER;
354 } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
355 /* Compression supports only cluster-size writes and no copy-range. */
356 s->method = COPY_READ_WRITE_CLUSTER;
357 } else {
358 /*
359 * If copy range enabled, start with COPY_RANGE_SMALL, until first
360 * successful copy_range (look at block_copy_do_copy).
361 */
362 s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
363 }
364
365 ratelimit_init(&s->rate_limit);
366 qemu_co_mutex_init(&s->lock);
367 QLIST_INIT(&s->tasks);
368 QLIST_INIT(&s->calls);
369
370 return s;
371 }
372
373 /* Only set before running the job, no need for locking. */
374 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
375 {
376 s->progress = pm;
377 }
378
379 /*
380 * Takes ownership of @task
381 *
382 * If pool is NULL directly run the task, otherwise schedule it into the pool.
383 *
384 * Returns: task.func return code if pool is NULL
385 * otherwise -ECANCELED if pool status is bad
386 * otherwise 0 (successfully scheduled)
387 */
388 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
389 BlockCopyTask *task)
390 {
391 if (!pool) {
392 int ret = task->task.func(&task->task);
393
394 g_free(task);
395 return ret;
396 }
397
398 aio_task_pool_wait_slot(pool);
399 if (aio_task_pool_status(pool) < 0) {
400 co_put_to_shres(task->s->mem, task->bytes);
401 block_copy_task_end(task, -ECANCELED);
402 g_free(task);
403 return -ECANCELED;
404 }
405
406 aio_task_pool_start_task(pool, &task->task);
407
408 return 0;
409 }
410
411 /*
412 * block_copy_do_copy
413 *
414 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
415 * s->len only to cover last cluster when s->len is not aligned to clusters.
416 *
417 * No sync here: nor bitmap neighter intersecting requests handling, only copy.
418 *
419 * @method is an in-out argument, so that copy_range can be either extended to
420 * a full-size buffer or disabled if the copy_range attempt fails. The output
421 * value of @method should be used for subsequent tasks.
422 * Returns 0 on success.
423 */
424 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
425 int64_t offset, int64_t bytes,
426 BlockCopyMethod *method,
427 bool *error_is_read)
428 {
429 int ret;
430 int64_t nbytes = MIN(offset + bytes, s->len) - offset;
431 void *bounce_buffer = NULL;
432
433 assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
434 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
435 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
436 assert(offset < s->len);
437 assert(offset + bytes <= s->len ||
438 offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
439 assert(nbytes < INT_MAX);
440
441 switch (*method) {
442 case COPY_WRITE_ZEROES:
443 ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
444 ~BDRV_REQ_WRITE_COMPRESSED);
445 if (ret < 0) {
446 trace_block_copy_write_zeroes_fail(s, offset, ret);
447 *error_is_read = false;
448 }
449 return ret;
450
451 case COPY_RANGE_SMALL:
452 case COPY_RANGE_FULL:
453 ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
454 0, s->write_flags);
455 if (ret >= 0) {
456 /* Successful copy-range, increase chunk size. */
457 *method = COPY_RANGE_FULL;
458 return 0;
459 }
460
461 trace_block_copy_copy_range_fail(s, offset, ret);
462 *method = COPY_READ_WRITE;
463 /* Fall through to read+write with allocated buffer */
464
465 case COPY_READ_WRITE_CLUSTER:
466 case COPY_READ_WRITE:
467 /*
468 * In case of failed copy_range request above, we may proceed with
469 * buffered request larger than BLOCK_COPY_MAX_BUFFER.
470 * Still, further requests will be properly limited, so don't care too
471 * much. Moreover the most likely case (copy_range is unsupported for
472 * the configuration, so the very first copy_range request fails)
473 * is handled by setting large copy_size only after first successful
474 * copy_range.
475 */
476
477 bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
478
479 ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
480 if (ret < 0) {
481 trace_block_copy_read_fail(s, offset, ret);
482 *error_is_read = true;
483 goto out;
484 }
485
486 ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
487 s->write_flags);
488 if (ret < 0) {
489 trace_block_copy_write_fail(s, offset, ret);
490 *error_is_read = false;
491 goto out;
492 }
493
494 out:
495 qemu_vfree(bounce_buffer);
496 break;
497
498 default:
499 abort();
500 }
501
502 return ret;
503 }
504
505 static coroutine_fn int block_copy_task_entry(AioTask *task)
506 {
507 BlockCopyTask *t = container_of(task, BlockCopyTask, task);
508 BlockCopyState *s = t->s;
509 bool error_is_read = false;
510 BlockCopyMethod method = t->method;
511 int ret;
512
513 ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
514
515 WITH_QEMU_LOCK_GUARD(&s->lock) {
516 if (s->method == t->method) {
517 s->method = method;
518 }
519
520 if (ret < 0) {
521 if (!t->call_state->ret) {
522 t->call_state->ret = ret;
523 t->call_state->error_is_read = error_is_read;
524 }
525 } else {
526 progress_work_done(s->progress, t->bytes);
527 }
528 }
529 co_put_to_shres(s->mem, t->bytes);
530 block_copy_task_end(t, ret);
531
532 return ret;
533 }
534
535 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
536 int64_t bytes, int64_t *pnum)
537 {
538 int64_t num;
539 BlockDriverState *base;
540 int ret;
541
542 if (qatomic_read(&s->skip_unallocated)) {
543 base = bdrv_backing_chain_next(s->source->bs);
544 } else {
545 base = NULL;
546 }
547
548 ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
549 NULL, NULL);
550 if (ret < 0 || num < s->cluster_size) {
551 /*
552 * On error or if failed to obtain large enough chunk just fallback to
553 * copy one cluster.
554 */
555 num = s->cluster_size;
556 ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
557 } else if (offset + num == s->len) {
558 num = QEMU_ALIGN_UP(num, s->cluster_size);
559 } else {
560 num = QEMU_ALIGN_DOWN(num, s->cluster_size);
561 }
562
563 *pnum = num;
564 return ret;
565 }
566
567 /*
568 * Check if the cluster starting at offset is allocated or not.
569 * return via pnum the number of contiguous clusters sharing this allocation.
570 */
571 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
572 int64_t *pnum)
573 {
574 BlockDriverState *bs = s->source->bs;
575 int64_t count, total_count = 0;
576 int64_t bytes = s->len - offset;
577 int ret;
578
579 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
580
581 while (true) {
582 ret = bdrv_is_allocated(bs, offset, bytes, &count);
583 if (ret < 0) {
584 return ret;
585 }
586
587 total_count += count;
588
589 if (ret || count == 0) {
590 /*
591 * ret: partial segment(s) are considered allocated.
592 * otherwise: unallocated tail is treated as an entire segment.
593 */
594 *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
595 return ret;
596 }
597
598 /* Unallocated segment(s) with uncertain following segment(s) */
599 if (total_count >= s->cluster_size) {
600 *pnum = total_count / s->cluster_size;
601 return 0;
602 }
603
604 offset += count;
605 bytes -= count;
606 }
607 }
608
609 /*
610 * Reset bits in copy_bitmap starting at offset if they represent unallocated
611 * data in the image. May reset subsequent contiguous bits.
612 * @return 0 when the cluster at @offset was unallocated,
613 * 1 otherwise, and -ret on error.
614 */
615 int64_t block_copy_reset_unallocated(BlockCopyState *s,
616 int64_t offset, int64_t *count)
617 {
618 int ret;
619 int64_t clusters, bytes;
620
621 ret = block_copy_is_cluster_allocated(s, offset, &clusters);
622 if (ret < 0) {
623 return ret;
624 }
625
626 bytes = clusters * s->cluster_size;
627
628 if (!ret) {
629 qemu_co_mutex_lock(&s->lock);
630 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
631 progress_set_remaining(s->progress,
632 bdrv_get_dirty_count(s->copy_bitmap) +
633 s->in_flight_bytes);
634 qemu_co_mutex_unlock(&s->lock);
635 }
636
637 *count = bytes;
638 return ret;
639 }
640
641 /*
642 * block_copy_dirty_clusters
643 *
644 * Copy dirty clusters in @offset/@bytes range.
645 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
646 * clusters found and -errno on failure.
647 */
648 static int coroutine_fn
649 block_copy_dirty_clusters(BlockCopyCallState *call_state)
650 {
651 BlockCopyState *s = call_state->s;
652 int64_t offset = call_state->offset;
653 int64_t bytes = call_state->bytes;
654
655 int ret = 0;
656 bool found_dirty = false;
657 int64_t end = offset + bytes;
658 AioTaskPool *aio = NULL;
659
660 /*
661 * block_copy() user is responsible for keeping source and target in same
662 * aio context
663 */
664 assert(bdrv_get_aio_context(s->source->bs) ==
665 bdrv_get_aio_context(s->target->bs));
666
667 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
668 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
669
670 while (bytes && aio_task_pool_status(aio) == 0 &&
671 !qatomic_read(&call_state->cancelled)) {
672 BlockCopyTask *task;
673 int64_t status_bytes;
674
675 task = block_copy_task_create(s, call_state, offset, bytes);
676 if (!task) {
677 /* No more dirty bits in the bitmap */
678 trace_block_copy_skip_range(s, offset, bytes);
679 break;
680 }
681 if (task->offset > offset) {
682 trace_block_copy_skip_range(s, offset, task->offset - offset);
683 }
684
685 found_dirty = true;
686
687 ret = block_copy_block_status(s, task->offset, task->bytes,
688 &status_bytes);
689 assert(ret >= 0); /* never fail */
690 if (status_bytes < task->bytes) {
691 block_copy_task_shrink(task, status_bytes);
692 }
693 if (qatomic_read(&s->skip_unallocated) &&
694 !(ret & BDRV_BLOCK_ALLOCATED)) {
695 block_copy_task_end(task, 0);
696 trace_block_copy_skip_range(s, task->offset, task->bytes);
697 offset = task_end(task);
698 bytes = end - offset;
699 g_free(task);
700 continue;
701 }
702 if (ret & BDRV_BLOCK_ZERO) {
703 task->method = COPY_WRITE_ZEROES;
704 }
705
706 if (!call_state->ignore_ratelimit) {
707 uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
708 if (ns > 0) {
709 block_copy_task_end(task, -EAGAIN);
710 g_free(task);
711 qemu_co_sleep_ns_wakeable(&call_state->sleep,
712 QEMU_CLOCK_REALTIME, ns);
713 continue;
714 }
715 }
716
717 ratelimit_calculate_delay(&s->rate_limit, task->bytes);
718
719 trace_block_copy_process(s, task->offset);
720
721 co_get_from_shres(s->mem, task->bytes);
722
723 offset = task_end(task);
724 bytes = end - offset;
725
726 if (!aio && bytes) {
727 aio = aio_task_pool_new(call_state->max_workers);
728 }
729
730 ret = block_copy_task_run(aio, task);
731 if (ret < 0) {
732 goto out;
733 }
734 }
735
736 out:
737 if (aio) {
738 aio_task_pool_wait_all(aio);
739
740 /*
741 * We are not really interested in -ECANCELED returned from
742 * block_copy_task_run. If it fails, it means some task already failed
743 * for real reason, let's return first failure.
744 * Still, assert that we don't rewrite failure by success.
745 *
746 * Note: ret may be positive here because of block-status result.
747 */
748 assert(ret >= 0 || aio_task_pool_status(aio) < 0);
749 ret = aio_task_pool_status(aio);
750
751 aio_task_pool_free(aio);
752 }
753
754 return ret < 0 ? ret : found_dirty;
755 }
756
757 void block_copy_kick(BlockCopyCallState *call_state)
758 {
759 qemu_co_sleep_wake(&call_state->sleep);
760 }
761
762 /*
763 * block_copy_common
764 *
765 * Copy requested region, accordingly to dirty bitmap.
766 * Collaborate with parallel block_copy requests: if they succeed it will help
767 * us. If they fail, we will retry not-copied regions. So, if we return error,
768 * it means that some I/O operation failed in context of _this_ block_copy call,
769 * not some parallel operation.
770 */
771 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
772 {
773 int ret;
774 BlockCopyState *s = call_state->s;
775
776 qemu_co_mutex_lock(&s->lock);
777 QLIST_INSERT_HEAD(&s->calls, call_state, list);
778 qemu_co_mutex_unlock(&s->lock);
779
780 do {
781 ret = block_copy_dirty_clusters(call_state);
782
783 if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
784 WITH_QEMU_LOCK_GUARD(&s->lock) {
785 /*
786 * Check that there is no task we still need to
787 * wait to complete
788 */
789 ret = block_copy_wait_one(s, call_state->offset,
790 call_state->bytes);
791 if (ret == 0) {
792 /*
793 * No pending tasks, but check again the bitmap in this
794 * same critical section, since a task might have failed
795 * between this and the critical section in
796 * block_copy_dirty_clusters().
797 *
798 * block_copy_wait_one return value 0 also means that it
799 * didn't release the lock. So, we are still in the same
800 * critical section, not interrupted by any concurrent
801 * access to state.
802 */
803 ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
804 call_state->offset,
805 call_state->bytes) >= 0;
806 }
807 }
808 }
809
810 /*
811 * We retry in two cases:
812 * 1. Some progress done
813 * Something was copied, which means that there were yield points
814 * and some new dirty bits may have appeared (due to failed parallel
815 * block-copy requests).
816 * 2. We have waited for some intersecting block-copy request
817 * It may have failed and produced new dirty bits.
818 */
819 } while (ret > 0 && !qatomic_read(&call_state->cancelled));
820
821 qatomic_store_release(&call_state->finished, true);
822
823 if (call_state->cb) {
824 call_state->cb(call_state->cb_opaque);
825 }
826
827 qemu_co_mutex_lock(&s->lock);
828 QLIST_REMOVE(call_state, list);
829 qemu_co_mutex_unlock(&s->lock);
830
831 return ret;
832 }
833
834 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
835 bool ignore_ratelimit)
836 {
837 BlockCopyCallState call_state = {
838 .s = s,
839 .offset = start,
840 .bytes = bytes,
841 .ignore_ratelimit = ignore_ratelimit,
842 .max_workers = BLOCK_COPY_MAX_WORKERS,
843 };
844
845 return block_copy_common(&call_state);
846 }
847
848 static void coroutine_fn block_copy_async_co_entry(void *opaque)
849 {
850 block_copy_common(opaque);
851 }
852
853 BlockCopyCallState *block_copy_async(BlockCopyState *s,
854 int64_t offset, int64_t bytes,
855 int max_workers, int64_t max_chunk,
856 BlockCopyAsyncCallbackFunc cb,
857 void *cb_opaque)
858 {
859 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
860
861 *call_state = (BlockCopyCallState) {
862 .s = s,
863 .offset = offset,
864 .bytes = bytes,
865 .max_workers = max_workers,
866 .max_chunk = max_chunk,
867 .cb = cb,
868 .cb_opaque = cb_opaque,
869
870 .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
871 };
872
873 qemu_coroutine_enter(call_state->co);
874
875 return call_state;
876 }
877
878 void block_copy_call_free(BlockCopyCallState *call_state)
879 {
880 if (!call_state) {
881 return;
882 }
883
884 assert(qatomic_read(&call_state->finished));
885 g_free(call_state);
886 }
887
888 bool block_copy_call_finished(BlockCopyCallState *call_state)
889 {
890 return qatomic_read(&call_state->finished);
891 }
892
893 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
894 {
895 return qatomic_load_acquire(&call_state->finished) &&
896 !qatomic_read(&call_state->cancelled) &&
897 call_state->ret == 0;
898 }
899
900 bool block_copy_call_failed(BlockCopyCallState *call_state)
901 {
902 return qatomic_load_acquire(&call_state->finished) &&
903 !qatomic_read(&call_state->cancelled) &&
904 call_state->ret < 0;
905 }
906
907 bool block_copy_call_cancelled(BlockCopyCallState *call_state)
908 {
909 return qatomic_read(&call_state->cancelled);
910 }
911
912 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
913 {
914 assert(qatomic_load_acquire(&call_state->finished));
915 if (error_is_read) {
916 *error_is_read = call_state->error_is_read;
917 }
918 return call_state->ret;
919 }
920
921 /*
922 * Note that cancelling and finishing are racy.
923 * User can cancel a block-copy that is already finished.
924 */
925 void block_copy_call_cancel(BlockCopyCallState *call_state)
926 {
927 qatomic_set(&call_state->cancelled, true);
928 block_copy_kick(call_state);
929 }
930
931 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
932 {
933 return s->copy_bitmap;
934 }
935
936 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
937 {
938 qatomic_set(&s->skip_unallocated, skip);
939 }
940
941 void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
942 {
943 ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
944
945 /*
946 * Note: it's good to kick all call states from here, but it should be done
947 * only from a coroutine, to not crash if s->calls list changed while
948 * entering one call. So for now, the only user of this function kicks its
949 * only one call_state by hand.
950 */
951 }