#include "trace.h"
#include "block/blockjob_int.h"
#include "block/block_int.h"
+#include "block/dirty-bitmap.h"
#include "sysemu/block-backend.h"
#include "qapi/error.h"
#include "qapi/qmp/qerror.h"
#include "qemu/ratelimit.h"
#include "qemu/bitmap.h"
+#include "qemu/memalign.h"
#define MAX_IN_FLIGHT 16
#define MAX_IO_BYTES (1 << 20) /* 1 Mb */
bool zero_target;
MirrorCopyMode copy_mode;
BlockdevOnError on_source_error, on_target_error;
- bool synced;
/* Set when the target is synced (dirty bitmap is clean, nothing
* in flight) and the job is running in active mode */
bool actively_synced;
uint64_t last_pause_ns;
unsigned long *in_flight_bitmap;
- int in_flight;
+ unsigned in_flight;
int64_t bytes_in_flight;
QTAILQ_HEAD(, MirrorOp) ops_in_flight;
int ret;
int max_iov;
bool initial_zeroing_ongoing;
int in_active_write_counter;
+ int64_t active_write_bytes_in_flight;
bool prepared;
bool in_drain;
} MirrorBlockJob;
bool is_in_flight;
CoQueue waiting_requests;
Coroutine *co;
+ MirrorOp *waiting_for_op;
QTAILQ_ENTRY(MirrorOp) next;
};
static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
int error)
{
- s->synced = false;
s->actively_synced = false;
if (read) {
return block_job_error_action(&s->common, s->on_source_error,
if (ranges_overlap(self_start_chunk, self_nb_chunks,
op_start_chunk, op_nb_chunks))
{
+ if (self) {
+ /*
+ * If the operation is already (indirectly) waiting for us,
+ * or will wait for us as soon as it wakes up, then just go
+ * on (instead of producing a deadlock in the former case).
+ */
+ if (op->waiting_for_op) {
+ continue;
+ }
+
+ self->waiting_for_op = op;
+ }
+
qemu_co_queue_wait(&op->waiting_requests, NULL);
+
+ if (self) {
+ self->waiting_for_op = NULL;
+ }
+
break;
}
}
}
static inline void coroutine_fn
-mirror_wait_for_any_operation(MirrorBlockJob *s, bool active)
+mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
{
MirrorOp *op;
QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
- /* Do not wait on pseudo ops, because it may in turn wait on
+ /*
+ * Do not wait on pseudo ops, because it may in turn wait on
* some other operation to start, which may in fact be the
* caller of this function. Since there is only one pseudo op
* at any given time, we will always find some real operation
- * to wait on. */
- if (!op->is_pseudo_op && op->is_in_flight &&
- op->is_active_write == active)
- {
+ * to wait on.
+ * Also, do not wait on active operations, because they do not
+ * use up in-flight slots.
+ */
+ if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
qemu_co_queue_wait(&op->waiting_requests, NULL);
return;
}
abort();
}
-static inline void coroutine_fn
-mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
-{
- /* Only non-active operations use up in-flight slots */
- mirror_wait_for_any_operation(s, false);
-}
-
/* Perform a mirror copy operation.
*
* *op->bytes_handled is set to the number of bytes copied after and
}
bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
+ /*
+ * Wait for concurrent requests to @offset. The next loop will limit the
+ * copied area based on in_flight_bitmap so we only copy an area that does
+ * not overlap with concurrent in-flight requests. Still, we would like to
+ * copy something, so wait until there are at least no more requests to the
+ * very beginning of the area.
+ */
mirror_wait_on_conflicts(NULL, s, offset, 1);
job_pause_point(&s->common.job);
block_job_remove_all_bdrv(bjob);
bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
- /* We just changed the BDS the job BB refers to (with either or both of the
- * bdrv_replace_node() calls), so switch the BB back so the cleanup does
- * the right thing. We don't need any permissions any more now. */
- blk_remove_bs(bjob->blk);
- blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort);
- blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
-
bs_opaque->job = NULL;
bdrv_drained_end(src);
BlockDriverState *bs = s->mirror_top_bs->backing->bs;
BlockDriverState *target_bs = blk_bs(s->target);
bool need_drain = true;
+ BlockDeviceIoStatus iostatus;
int64_t length;
int64_t target_length;
BlockDriverInfo bdi;
* active layer. */
if (s->base == blk_bs(s->target)) {
if (s->bdev_length > target_length) {
- ret = blk_truncate(s->target, s->bdev_length, false,
- PREALLOC_MODE_OFF, 0, NULL);
+ ret = blk_co_truncate(s->target, s->bdev_length, false,
+ PREALLOC_MODE_OFF, 0, NULL);
if (ret < 0) {
goto immediate_exit;
}
if (s->bdev_length == 0) {
/* Transition to the READY state and wait for complete. */
job_transition_to_ready(&s->common.job);
- s->synced = true;
s->actively_synced = true;
- while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
+ while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
job_yield(&s->common.job);
}
- s->common.job.cancelled = false;
goto immediate_exit;
}
int64_t cnt, delta;
bool should_complete;
- /* Do not start passive operations while there are active
- * writes in progress */
- while (s->in_active_write_counter) {
- mirror_wait_for_any_operation(s, true);
- }
-
if (s->ret < 0) {
ret = s->ret;
goto immediate_exit;
job_pause_point(&s->common.job);
+ if (job_is_cancelled(&s->common.job)) {
+ ret = 0;
+ goto immediate_exit;
+ }
+
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
* the number of bytes currently being processed; together those are
* the current remaining operation length */
- job_progress_set_remaining(&s->common.job, s->bytes_in_flight + cnt);
+ job_progress_set_remaining(&s->common.job,
+ s->bytes_in_flight + cnt +
+ s->active_write_bytes_in_flight);
/* Note that even when no rate limit is applied we need to yield
* periodically with no pending I/O so that bdrv_drain_all() returns.
* We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
* an error, or when the source is clean, whichever comes first. */
delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
+ WITH_JOB_LOCK_GUARD() {
+ iostatus = s->common.iostatus;
+ }
if (delta < BLOCK_JOB_SLICE_TIME &&
- s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+ iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
(cnt == 0 && s->in_flight > 0)) {
trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
should_complete = false;
if (s->in_flight == 0 && cnt == 0) {
trace_mirror_before_flush(s);
- if (!s->synced) {
+ if (!job_is_ready(&s->common.job)) {
if (mirror_flush(s) < 0) {
/* Go check s->ret. */
continue;
* the target in a consistent state.
*/
job_transition_to_ready(&s->common.job);
- s->synced = true;
if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
s->actively_synced = true;
}
}
should_complete = s->should_complete ||
- job_is_cancelled(&s->common.job);
+ job_cancel_requested(&s->common.job);
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
}
s->in_drain = true;
bdrv_drained_begin(bs);
+
+ /* Must be zero because we are drained */
+ assert(s->in_active_write_counter == 0);
+
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
if (cnt > 0 || mirror_flush(s) < 0) {
bdrv_drained_end(bs);
* completion.
*/
assert(QLIST_EMPTY(&bs->tracked_requests));
- s->common.job.cancelled = false;
need_drain = false;
break;
}
- ret = 0;
-
- if (s->synced && !should_complete) {
+ if (job_is_ready(&s->common.job) && !should_complete) {
delay_ns = (s->in_flight == 0 &&
cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0);
}
- trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
+ trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
+ delay_ns);
job_sleep_ns(&s->common.job, delay_ns);
- if (job_is_cancelled(&s->common.job) &&
- (!s->synced || s->common.job.force_cancel))
- {
- break;
- }
s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
* or it was cancelled prematurely so that we do not guarantee that
* the target is a copy of the source.
*/
- assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) &&
- job_is_cancelled(&s->common.job)));
+ assert(ret < 0 || job_is_cancelled(&s->common.job));
assert(need_drain);
mirror_wait_for_all_io(s);
}
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
- if (!s->synced) {
+ if (!job_is_ready(job)) {
error_setg(errp, "The active block job '%s' cannot be completed",
job->id);
return;
replace_aio_context = bdrv_get_aio_context(s->to_replace);
aio_context_acquire(replace_aio_context);
- /* TODO Translate this into permission system. Current definition of
- * GRAPH_MOD would require to request it for the parents; they might
- * not even be BlockDriverStates, however, so a BdrvChild can't address
- * them. May need redefinition of GRAPH_MOD. */
+ /* TODO Translate this into child freeze system. */
error_setg(&s->replace_blocker,
"block device is in use by block-job-complete");
bdrv_op_block_all(s->to_replace, s->replace_blocker);
s->should_complete = true;
/* If the job is paused, it will be re-entered when it is resumed */
- if (!job->paused) {
- job_enter(job);
+ WITH_JOB_LOCK_GUARD() {
+ if (!job->paused) {
+ job_enter_cond_locked(job, NULL);
+ }
}
}
* from one of our own drain sections, to avoid a deadlock waiting for
* ourselves.
*/
- if (!s->common.job.paused && !s->common.job.cancelled && !s->in_drain) {
- return true;
+ WITH_JOB_LOCK_GUARD() {
+ if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
+ && !s->in_drain) {
+ return true;
+ }
}
return !!s->in_flight;
}
-static void mirror_cancel(Job *job)
+static bool mirror_cancel(Job *job, bool force)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
BlockDriverState *target = blk_bs(s->target);
- bdrv_cancel_in_flight(target);
+ /*
+ * Before the job is READY, we treat any cancellation like a
+ * force-cancellation.
+ */
+ force = force || !job_is_ready(job);
+
+ if (force) {
+ bdrv_cancel_in_flight(target);
+ }
+ return force;
+}
+
+static bool commit_active_cancel(Job *job, bool force)
+{
+ /* Same as above in mirror_cancel() */
+ return force || !job_is_ready(job);
}
static const BlockJobDriver mirror_job_driver = {
.abort = mirror_abort,
.pause = mirror_pause,
.complete = mirror_complete,
+ .cancel = commit_active_cancel,
},
.drained_poll = mirror_drained_poll,
};
}
job_progress_increase_remaining(&job->common.job, bytes);
+ job->active_write_bytes_in_flight += bytes;
switch (method) {
case MIRROR_METHOD_COPY:
abort();
}
+ job->active_write_bytes_in_flight -= bytes;
if (ret >= 0) {
job_progress_update(&job->common.job, bytes);
} else {
.bytes = bytes,
.is_active_write = true,
.is_in_flight = true,
+ .co = qemu_coroutine_self(),
};
qemu_co_queue_init(&op->waiting_requests);
QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
s->in_active_write_counter++;
+ /*
+ * Wait for concurrent requests affecting the area. If there are already
+ * running requests that are copying off now-to-be stale data in the area,
+ * we must wait for them to finish before we begin writing fresh data to the
+ * target so that the write operations appear in the correct order.
+ * Note that background requests (see mirror_iteration()) in contrast only
+ * wait for conflicting requests at the start of the dirty area, and then
+ * (based on the in_flight_bitmap) truncate the area to copy so it will not
+ * conflict with any requests beyond that. For active writes, however, we
+ * cannot truncate that area. The request from our parent must be blocked
+ * until the area is copied in full. Therefore, we must wait for the whole
+ * area to become free of concurrent requests.
+ */
mirror_wait_on_conflicts(op, s, offset, bytes);
bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
}
static int coroutine_fn bdrv_mirror_top_preadv(BlockDriverState *bs,
- uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
+ int64_t offset, int64_t bytes, QEMUIOVector *qiov, BdrvRequestFlags flags)
{
return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
}
MirrorOp *op = NULL;
MirrorBDSOpaque *s = bs->opaque;
int ret = 0;
- bool copy_to_target;
+ bool copy_to_target = false;
- copy_to_target = s->job->ret >= 0 &&
- s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ if (s->job) {
+ copy_to_target = s->job->ret >= 0 &&
+ !job_is_cancelled(&s->job->common.job) &&
+ s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ }
if (copy_to_target) {
op = active_write_prepare(s->job, offset, bytes);
}
static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
- uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
+ int64_t offset, int64_t bytes, QEMUIOVector *qiov, BdrvRequestFlags flags)
{
MirrorBDSOpaque *s = bs->opaque;
QEMUIOVector bounce_qiov;
void *bounce_buf;
int ret = 0;
- bool copy_to_target;
+ bool copy_to_target = false;
- copy_to_target = s->job->ret >= 0 &&
- s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ if (s->job) {
+ copy_to_target = s->job->ret >= 0 &&
+ !job_is_cancelled(&s->job->common.job) &&
+ s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ }
if (copy_to_target) {
/* The guest might concurrently modify the data to write; but
qemu_iovec_init(&bounce_qiov, 1);
qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
qiov = &bounce_qiov;
+
+ flags &= ~BDRV_REQ_REGISTERED_BUF;
}
ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov,
}
static int coroutine_fn bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs,
- int64_t offset, int bytes, BdrvRequestFlags flags)
+ int64_t offset, int64_t bytes, BdrvRequestFlags flags)
{
return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL,
flags);
}
static int coroutine_fn bdrv_mirror_top_pdiscard(BlockDriverState *bs,
- int64_t offset, int bytes)
+ int64_t offset, int64_t bytes)
{
return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes,
NULL, 0);
.bdrv_child_perm = bdrv_mirror_top_child_perm,
.is_filter = true,
+ .filtered_child_is_backing = true,
};
static BlockJob *mirror_start_job(
s = block_job_create(job_id, driver, NULL, mirror_top_bs,
BLK_PERM_CONSISTENT_READ,
BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
- BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD, speed,
+ BLK_PERM_WRITE, speed,
creation_flags, cb, opaque, errp);
if (!s) {
goto fail;
target_perms |= BLK_PERM_RESIZE;
}
- target_shared_perms |= BLK_PERM_CONSISTENT_READ
- | BLK_PERM_WRITE
- | BLK_PERM_GRAPH_MOD;
+ target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
} else if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
/*
* We may want to allow this in the future, but it would
goto fail;
}
- if (backing_mode != MIRROR_LEAVE_BACKING_CHAIN) {
- target_perms |= BLK_PERM_GRAPH_MOD;
- }
-
s->target = blk_new(s->common.job.aio_context,
target_perms, target_shared_perms);
ret = blk_insert_bs(s->target, target, errp);
bool is_none_mode;
BlockDriverState *base;
+ GLOBAL_STATE_CODE();
+
if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
(mode == MIRROR_SYNC_MODE_BITMAP)) {
error_setg(errp, "Sync mode '%s' not supported",
bool base_read_only;
BlockJob *job;
+ GLOBAL_STATE_CODE();
+
base_read_only = bdrv_is_read_only(base);
if (base_read_only) {