X-Git-Url: https://git.proxmox.com/?p=mirror_qemu.git;a=blobdiff_plain;f=job.c;h=28dd48f8a59c58c74ad65056f0be4a9345abde9e;hp=b9ebd1c091436521ea4c1ce255ba28bfba597f16;hb=c4107e8208d0222f9b328691b519aaee4101db87;hpb=2fde22f8ae1a69253dff5c7a73c39d3a72fa94a1 diff --git a/job.c b/job.c index b9ebd1c091..28dd48f8a5 100644 --- a/job.c +++ b/job.c @@ -24,11 +24,11 @@ */ #include "qemu/osdep.h" -#include "qemu-common.h" #include "qapi/error.h" #include "qemu/job.h" #include "qemu/id.h" #include "qemu/main-loop.h" +#include "block/aio-wait.h" #include "trace-root.h" #include "qapi/qapi-events-job.h" @@ -136,21 +136,13 @@ static void job_txn_del_job(Job *job) } } -static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock) +static int job_txn_apply(JobTxn *txn, int fn(Job *)) { - AioContext *ctx; Job *job, *next; int rc = 0; QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { - if (lock) { - ctx = job->aio_context; - aio_context_acquire(ctx); - } rc = fn(job); - if (lock) { - aio_context_release(ctx); - } if (rc) { break; } @@ -166,7 +158,7 @@ bool job_is_internal(Job *job) static void job_state_transition(Job *job, JobStatus s1) { JobStatus s0 = job->status; - assert(s1 >= 0 && s1 <= JOB_STATUS__MAX); + assert(s1 >= 0 && s1 < JOB_STATUS__MAX); trace_job_state_transition(job, job->ret, JobSTT[s0][s1] ? "allowed" : "disallowed", JobStatus_str(s0), JobStatus_str(s1)); @@ -181,7 +173,7 @@ static void job_state_transition(Job *job, JobStatus s1) int job_apply_verb(Job *job, JobVerb verb, Error **errp) { JobStatus s0 = job->status; - assert(verb >= 0 && verb <= JOB_VERB__MAX); + assert(verb >= 0 && verb < JOB_VERB__MAX); trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb), JobVerbTable[verb][s0] ? "allowed" : "prohibited"); if (JobVerbTable[verb][s0]) { @@ -369,7 +361,7 @@ void job_unref(Job *job) QLIST_REMOVE(job, job_list); - g_free(job->error); + error_free(job->err); g_free(job->id); g_free(job); } @@ -410,6 +402,11 @@ static void job_event_ready(Job *job) notifier_list_notify(&job->on_ready, job); } +static void job_event_idle(Job *job) +{ + notifier_list_notify(&job->on_idle, job); +} + void job_enter_cond(Job *job, bool(*fn)(Job *job)) { if (!job_started(job)) { @@ -434,7 +431,7 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job)) timer_del(&job->sleep_timer); job->busy = true; job_unlock(); - aio_co_wake(job->co); + aio_co_enter(job->aio_context, job->co); } void job_enter(Job *job) @@ -455,6 +452,7 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns) timer_mod(&job->sleep_timer, ns); } job->busy = false; + job_event_idle(job); job_unlock(); qemu_coroutine_yield(); @@ -535,33 +533,6 @@ void job_drain(Job *job) } } - -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn job_co_entry(void *opaque) -{ - Job *job = opaque; - - assert(job && job->driver && job->driver->start); - job_pause_point(job); - job->driver->start(job); -} - - -void job_start(Job *job) -{ - assert(job && !job_started(job) && job->paused && - job->driver && job->driver->start); - job->co = qemu_coroutine_create(job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(job, JOB_STATUS_RUNNING); - aio_co_enter(job->aio_context, job->co); -} - /* Assumes the block_job_mutex is held */ static bool job_timer_not_pending(Job *job) { @@ -666,8 +637,8 @@ static void job_update_rc(Job *job) job->ret = -ECANCELED; } if (job->ret) { - if (!job->error) { - job->error = g_strdup(strerror(-job->ret)); + if (!job->err) { + error_setg(&job->err, "%s", strerror(-job->ret)); } job_state_transition(job, JOB_STATUS_ABORTING); } @@ -746,6 +717,7 @@ static void job_cancel_async(Job *job, bool force) static void job_completed_txn_abort(Job *job) { + AioContext *outer_ctx = job->aio_context; AioContext *ctx; JobTxn *txn = job->txn; Job *other_job; @@ -759,23 +731,26 @@ static void job_completed_txn_abort(Job *job) txn->aborting = true; job_txn_ref(txn); - /* We are the first failed job. Cancel other jobs. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - ctx = other_job->aio_context; - aio_context_acquire(ctx); - } + /* We can only hold the single job's AioContext lock while calling + * job_finalize_single() because the finalization callbacks can involve + * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */ + aio_context_release(outer_ctx); /* Other jobs are effectively cancelled by us, set the status for * them; this job, however, may or may not be cancelled, depending * on the caller, so leave it. */ QLIST_FOREACH(other_job, &txn->jobs, txn_list) { if (other_job != job) { + ctx = other_job->aio_context; + aio_context_acquire(ctx); job_cancel_async(other_job, false); + aio_context_release(ctx); } } while (!QLIST_EMPTY(&txn->jobs)) { other_job = QLIST_FIRST(&txn->jobs); ctx = other_job->aio_context; + aio_context_acquire(ctx); if (!job_is_completed(other_job)) { assert(job_is_cancelled(other_job)); job_finish_sync(other_job, NULL, NULL); @@ -784,6 +759,8 @@ static void job_completed_txn_abort(Job *job) aio_context_release(ctx); } + aio_context_acquire(outer_ctx); + job_txn_unref(txn); } @@ -807,11 +784,11 @@ static void job_do_finalize(Job *job) assert(job && job->txn); /* prepare the transaction to complete */ - rc = job_txn_apply(job->txn, job_prepare, true); + rc = job_txn_apply(job->txn, job_prepare); if (rc) { job_completed_txn_abort(job); } else { - job_txn_apply(job->txn, job_finalize_single, true); + job_txn_apply(job->txn, job_finalize_single); } } @@ -857,27 +834,20 @@ static void job_completed_txn_success(Job *job) assert(other_job->ret == 0); } - job_txn_apply(txn, job_transition_to_pending, false); + job_txn_apply(txn, job_transition_to_pending); /* If no jobs need manual finalization, automatically do so */ - if (job_txn_apply(txn, job_needs_finalize, false) == 0) { + if (job_txn_apply(txn, job_needs_finalize) == 0) { job_do_finalize(job); } } -void job_completed(Job *job, int ret, Error *error) +static void job_completed(Job *job) { assert(job && job->txn && !job_is_completed(job)); - job->ret = ret; - if (error) { - assert(job->ret < 0); - job->error = g_strdup(error_get_pretty(error)); - error_free(error); - } - job_update_rc(job); - trace_job_completed(job, ret, job->ret); + trace_job_completed(job, job->ret); if (job->ret) { job_completed_txn_abort(job); } else { @@ -885,6 +855,54 @@ void job_completed(Job *job, int ret, Error *error) } } +/** Useful only as a type shim for aio_bh_schedule_oneshot. */ +static void job_exit(void *opaque) +{ + Job *job = (Job *)opaque; + AioContext *ctx = job->aio_context; + + aio_context_acquire(ctx); + + /* This is a lie, we're not quiescent, but still doing the completion + * callbacks. However, completion callbacks tend to involve operations that + * drain block nodes, and if .drained_poll still returned true, we would + * deadlock. */ + job->busy = false; + job_event_idle(job); + + job_completed(job); + + aio_context_release(ctx); +} + +/** + * All jobs must allow a pause point before entering their job proper. This + * ensures that jobs can be paused prior to being started, then resumed later. + */ +static void coroutine_fn job_co_entry(void *opaque) +{ + Job *job = opaque; + + assert(job && job->driver && job->driver->run); + job_pause_point(job); + job->ret = job->driver->run(job, &job->err); + job->deferred_to_main_loop = true; + job->busy = true; + aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job); +} + +void job_start(Job *job) +{ + assert(job && !job_started(job) && job->paused && + job->driver && job->driver->run); + job->co = qemu_coroutine_create(job_co_entry, job); + job->pause_count--; + job->busy = true; + job->paused = false; + job_state_transition(job, JOB_STATUS_RUNNING); + aio_co_enter(job->aio_context, job->co); +} + void job_cancel(Job *job, bool force) { if (job->status == JOB_STATUS_CONCLUDED) { @@ -893,7 +911,7 @@ void job_cancel(Job *job, bool force) } job_cancel_async(job, force); if (!job_started(job)) { - job_completed(job, -ECANCELED, NULL); + job_completed(job); } else if (job->deferred_to_main_loop) { job_completed_txn_abort(job); } else { @@ -956,38 +974,6 @@ void job_complete(Job *job, Error **errp) job->driver->complete(job, errp); } - -typedef struct { - Job *job; - JobDeferToMainLoopFn *fn; - void *opaque; -} JobDeferToMainLoopData; - -static void job_defer_to_main_loop_bh(void *opaque) -{ - JobDeferToMainLoopData *data = opaque; - Job *job = data->job; - AioContext *aio_context = job->aio_context; - - aio_context_acquire(aio_context); - data->fn(data->job, data->opaque); - aio_context_release(aio_context); - - g_free(data); -} - -void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque) -{ - JobDeferToMainLoopData *data = g_malloc(sizeof(*data)); - data->job = job; - data->fn = fn; - data->opaque = opaque; - job->deferred_to_main_loop = true; - - aio_bh_schedule_oneshot(qemu_get_aio_context(), - job_defer_to_main_loop_bh, data); -} - int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp) { Error *local_err = NULL; @@ -1003,14 +989,10 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp) job_unref(job); return -EBUSY; } - /* job_drain calls job_enter, and it should be enough to induce progress - * until the job completes or moves to the main thread. */ - while (!job->deferred_to_main_loop && !job_is_completed(job)) { - job_drain(job); - } - while (!job_is_completed(job)) { - aio_poll(qemu_get_aio_context(), true); - } + + AIO_WAIT_WHILE(job->aio_context, + (job_drain(job), !job_is_completed(job))); + ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret; job_unref(job); return ret;