X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=blockjob.c;h=513620c199fde95f7448d01e3e3c33fb3724ba21;hb=a3e1505daec31ef56f0489f8c8fff1b8e4ca92bd;hp=205da9df4e03d0341312bbb77d73f447e8076b50;hpb=60a0f1af07d685c88f4ffa09370da5bd7514823e;p=mirror_qemu.git diff --git a/blockjob.c b/blockjob.c index 205da9df4e..513620c199 100644 --- a/blockjob.c +++ b/blockjob.c @@ -27,16 +27,20 @@ #include "qemu-common.h" #include "trace.h" #include "block/block.h" -#include "block/blockjob.h" +#include "block/blockjob_int.h" #include "block/block_int.h" #include "sysemu/block-backend.h" #include "qapi/qmp/qerror.h" #include "qapi/qmp/qjson.h" #include "qemu/coroutine.h" +#include "qemu/id.h" #include "qmp-commands.h" #include "qemu/timer.h" #include "qapi-event.h" +static void block_job_event_cancelled(BlockJob *job); +static void block_job_event_completed(BlockJob *job, const char *msg); + /* Transactional group of block jobs */ struct BlockJobTxn { @@ -60,15 +64,17 @@ BlockJob *block_job_next(BlockJob *job) return QLIST_NEXT(job, job_list); } -/* Normally the job runs in its BlockBackend's AioContext. The exception is - * block_job_defer_to_main_loop() where it runs in the QEMU main loop. Code - * that supports both cases uses this helper function. - */ -static AioContext *block_job_get_aio_context(BlockJob *job) +BlockJob *block_job_get(const char *id) { - return job->deferred_to_main_loop ? - qemu_get_aio_context() : - blk_get_aio_context(job->blk); + BlockJob *job; + + QLIST_FOREACH(job, &block_jobs, job_list) { + if (job->id && !strcmp(id, job->id)) { + return job; + } + } + + return NULL; } static void block_job_attached_aio_context(AioContext *new_context, @@ -83,6 +89,17 @@ static void block_job_attached_aio_context(AioContext *new_context, block_job_resume(job); } +static void block_job_drain(BlockJob *job) +{ + /* If job is !job->busy this kicks it into the next pause point. */ + block_job_enter(job); + + blk_drain(job->blk); + if (job->driver->drain) { + job->driver->drain(job); + } +} + static void block_job_detach_aio_context(void *opaque) { BlockJob *job = opaque; @@ -92,45 +109,74 @@ static void block_job_detach_aio_context(void *opaque) block_job_pause(job); - if (!job->paused) { - /* If job is !job->busy this kicks it into the next pause point. */ - block_job_enter(job); - } while (!job->paused && !job->completed) { - aio_poll(block_job_get_aio_context(job), true); + block_job_drain(job); } block_job_unref(job); } -void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs, - int64_t speed, BlockCompletionFunc *cb, - void *opaque, Error **errp) +void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs) +{ + job->nodes = g_slist_prepend(job->nodes, bs); + bdrv_ref(bs); + bdrv_op_block_all(bs, job->blocker); +} + +void *block_job_create(const char *job_id, const BlockJobDriver *driver, + BlockDriverState *bs, int64_t speed, int flags, + BlockCompletionFunc *cb, void *opaque, Error **errp) { BlockBackend *blk; BlockJob *job; - assert(cb); if (bs->job) { error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs)); return NULL; } + if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) { + job_id = bdrv_get_device_name(bs); + if (!*job_id) { + error_setg(errp, "An explicit job ID is required for this node"); + return NULL; + } + } + + if (job_id) { + if (flags & BLOCK_JOB_INTERNAL) { + error_setg(errp, "Cannot specify job ID for internal block job"); + return NULL; + } + + if (!id_wellformed(job_id)) { + error_setg(errp, "Invalid job ID '%s'", job_id); + return NULL; + } + + if (block_job_get(job_id)) { + error_setg(errp, "Job ID '%s' already in use", job_id); + return NULL; + } + } + blk = blk_new(); blk_insert_bs(blk, bs); job = g_malloc0(driver->instance_size); error_setg(&job->blocker, "block device is in use by block job: %s", BlockJobType_lookup[driver->job_type]); - bdrv_op_block_all(bs, job->blocker); + block_job_add_bdrv(job, bs); bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker); job->driver = driver; - job->id = g_strdup(bdrv_get_device_name(bs)); + job->id = g_strdup(job_id); job->blk = blk; job->cb = cb; job->opaque = opaque; - job->busy = true; + job->busy = false; + job->paused = true; + job->pause_count = 1; job->refcnt = 1; bs->job = job; @@ -153,6 +199,28 @@ void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs, return job; } +bool block_job_is_internal(BlockJob *job) +{ + return (job->id == NULL); +} + +static bool block_job_started(BlockJob *job) +{ + return job->co; +} + +void block_job_start(BlockJob *job) +{ + assert(job && !block_job_started(job) && job->paused && + !job->busy && job->driver->start); + job->co = qemu_coroutine_create(job->driver->start, job); + if (--job->pause_count == 0) { + job->paused = false; + job->busy = true; + qemu_coroutine_enter(job->co); + } +} + void block_job_ref(BlockJob *job) { ++job->refcnt; @@ -161,9 +229,15 @@ void block_job_ref(BlockJob *job) void block_job_unref(BlockJob *job) { if (--job->refcnt == 0) { + GSList *l; BlockDriverState *bs = blk_bs(job->blk); bs->job = NULL; - bdrv_op_unblock_all(bs, job->blocker); + for (l = job->nodes; l; l = l->next) { + bs = l->data; + bdrv_op_unblock_all(bs, job->blocker); + bdrv_unref(bs); + } + g_slist_free(job->nodes); blk_remove_aio_context_notifier(job->blk, block_job_attached_aio_context, block_job_detach_aio_context, job); @@ -186,8 +260,29 @@ static void block_job_completed_single(BlockJob *job) job->driver->abort(job); } } - job->cb(job->opaque, job->ret); + if (job->driver->clean) { + job->driver->clean(job); + } + + if (job->cb) { + job->cb(job->opaque, job->ret); + } + + /* Emit events only if we actually started */ + if (block_job_started(job)) { + if (block_job_is_cancelled(job)) { + block_job_event_cancelled(job); + } else { + const char *msg = NULL; + if (job->ret < 0) { + msg = strerror(-job->ret); + } + block_job_event_completed(job, msg); + } + } + if (job->txn) { + QLIST_REMOVE(job, txn_list); block_job_txn_unref(job->txn); } block_job_unref(job); @@ -289,8 +384,12 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) void block_job_complete(BlockJob *job, Error **errp) { - if (job->pause_count || job->cancelled || !job->driver->complete) { - error_setg(errp, QERR_BLOCK_JOB_NOT_READY, job->id); + /* Should not be reachable via external interface for internal jobs */ + assert(job->id); + if (job->pause_count || job->cancelled || + !block_job_started(job) || !job->driver->complete) { + error_setg(errp, "The active block job '%s' cannot be completed", + job->id); return; } @@ -302,13 +401,26 @@ void block_job_pause(BlockJob *job) job->pause_count++; } +void block_job_user_pause(BlockJob *job) +{ + job->user_paused = true; + block_job_pause(job); +} + static bool block_job_should_pause(BlockJob *job) { return job->pause_count > 0; } +bool block_job_user_paused(BlockJob *job) +{ + return job ? job->user_paused : 0; +} + void coroutine_fn block_job_pause_point(BlockJob *job) { + assert(job && block_job_started(job)); + if (!block_job_should_pause(job)) { return; } @@ -343,18 +455,30 @@ void block_job_resume(BlockJob *job) block_job_enter(job); } +void block_job_user_resume(BlockJob *job) +{ + if (job && job->user_paused && job->pause_count > 0) { + job->user_paused = false; + block_job_resume(job); + } +} + void block_job_enter(BlockJob *job) { if (job->co && !job->busy) { - qemu_coroutine_enter(job->co, NULL); + qemu_coroutine_enter(job->co); } } void block_job_cancel(BlockJob *job) { - job->cancelled = true; - block_job_iostatus_reset(job); - block_job_enter(job); + if (block_job_started(job)) { + job->cancelled = true; + block_job_iostatus_reset(job); + block_job_enter(job); + } else { + block_job_completed(job, -ECANCELED); + } } bool block_job_is_cancelled(BlockJob *job) @@ -380,14 +504,21 @@ static int block_job_finish_sync(BlockJob *job, assert(blk_bs(job->blk)->job == job); block_job_ref(job); + finish(job, &local_err); if (local_err) { error_propagate(errp, local_err); block_job_unref(job); return -EBUSY; } + /* block_job_drain calls block_job_enter, and it should be enough to + * induce progress until the job completes or moves to the main thread. + */ + while (!job->deferred_to_main_loop && !job->completed) { + block_job_drain(job); + } while (!job->completed) { - aio_poll(block_job_get_aio_context(job), true); + aio_poll(qemu_get_aio_context(), true); } ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret; block_job_unref(job); @@ -461,9 +592,15 @@ void block_job_yield(BlockJob *job) block_job_pause_point(job); } -BlockJobInfo *block_job_query(BlockJob *job) +BlockJobInfo *block_job_query(BlockJob *job, Error **errp) { - BlockJobInfo *info = g_new0(BlockJobInfo, 1); + BlockJobInfo *info; + + if (block_job_is_internal(job)) { + error_setg(errp, "Cannot query QEMU internal jobs"); + return NULL; + } + info = g_new0(BlockJobInfo, 1); info->type = g_strdup(BlockJobType_lookup[job->driver->job_type]); info->device = g_strdup(job->id); info->len = job->len; @@ -484,8 +621,12 @@ static void block_job_iostatus_set_err(BlockJob *job, int error) } } -void block_job_event_cancelled(BlockJob *job) +static void block_job_event_cancelled(BlockJob *job) { + if (block_job_is_internal(job)) { + return; + } + qapi_event_send_block_job_cancelled(job->driver->job_type, job->id, job->len, @@ -494,8 +635,12 @@ void block_job_event_cancelled(BlockJob *job) &error_abort); } -void block_job_event_completed(BlockJob *job, const char *msg) +static void block_job_event_completed(BlockJob *job, const char *msg) { + if (block_job_is_internal(job)) { + return; + } + qapi_event_send_block_job_completed(job->driver->job_type, job->id, job->len, @@ -510,6 +655,10 @@ void block_job_event_ready(BlockJob *job) { job->ready = true; + if (block_job_is_internal(job)) { + return; + } + qapi_event_send_block_job_ready(job->driver->job_type, job->id, job->len, @@ -524,6 +673,7 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err, switch (on_err) { case BLOCKDEV_ON_ERROR_ENOSPC: + case BLOCKDEV_ON_ERROR_AUTO: action = (error == ENOSPC) ? BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT; break; @@ -539,14 +689,15 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err, default: abort(); } - qapi_event_send_block_job_error(job->id, - is_read ? IO_OPERATION_TYPE_READ : - IO_OPERATION_TYPE_WRITE, - action, &error_abort); + if (!block_job_is_internal(job)) { + qapi_event_send_block_job_error(job->id, + is_read ? IO_OPERATION_TYPE_READ : + IO_OPERATION_TYPE_WRITE, + action, &error_abort); + } if (action == BLOCK_ERROR_ACTION_STOP) { /* make the pause user visible, which will be resumed from QMP. */ - job->user_paused = true; - block_job_pause(job); + block_job_user_pause(job); block_job_iostatus_set_err(job, error); } return action; @@ -554,7 +705,6 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err, typedef struct { BlockJob *job; - QEMUBH *bh; AioContext *aio_context; BlockJobDeferToMainLoopFn *fn; void *opaque; @@ -565,8 +715,6 @@ static void block_job_defer_to_main_loop_bh(void *opaque) BlockJobDeferToMainLoopData *data = opaque; AioContext *aio_context; - qemu_bh_delete(data->bh); - /* Prevent race with block_job_defer_to_main_loop() */ aio_context_acquire(data->aio_context); @@ -590,13 +738,13 @@ void block_job_defer_to_main_loop(BlockJob *job, { BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data)); data->job = job; - data->bh = qemu_bh_new(block_job_defer_to_main_loop_bh, data); data->aio_context = blk_get_aio_context(job->blk); data->fn = fn; data->opaque = opaque; job->deferred_to_main_loop = true; - qemu_bh_schedule(data->bh); + aio_bh_schedule_oneshot(qemu_get_aio_context(), + block_job_defer_to_main_loop_bh, data); } BlockJobTxn *block_job_txn_new(void)