X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=job.c;h=04409b40aab3545798414bda1a872b5fb8490965;hb=50276a79aa15713decfede5cab183fc371e3e57d;hp=fd10b1d26712a3663972dc2889443f1837c223b1;hpb=b15de82867975e0b4acf644b5ee36d84904b6612;p=mirror_qemu.git diff --git a/job.c b/job.c index fd10b1d267..04409b40aa 100644 --- a/job.c +++ b/job.c @@ -24,12 +24,13 @@ */ #include "qemu/osdep.h" -#include "qemu-common.h" #include "qapi/error.h" #include "qemu/job.h" #include "qemu/id.h" #include "qemu/main-loop.h" +#include "block/aio-wait.h" #include "trace-root.h" +#include "qapi/qapi-events-job.h" static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs); @@ -60,6 +61,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = { [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, }; +/* Transactional group of jobs */ +struct JobTxn { + + /* Is this txn being cancelled? */ + bool aborting; + + /* List of jobs */ + QLIST_HEAD(, Job) jobs; + + /* Reference count */ + int refcnt; +}; + /* Right now, this mutex is only needed to synchronize accesses to job->busy * and job->sleep_timer, such as concurrent calls to job_do_yield and * job_enter. */ @@ -80,22 +94,86 @@ static void __attribute__((__constructor__)) job_init(void) qemu_mutex_init(&job_mutex); } -/* TODO Make static once the whole state machine is in job.c */ -void job_state_transition(Job *job, JobStatus s1) +JobTxn *job_txn_new(void) +{ + JobTxn *txn = g_new0(JobTxn, 1); + QLIST_INIT(&txn->jobs); + txn->refcnt = 1; + return txn; +} + +static void job_txn_ref(JobTxn *txn) +{ + txn->refcnt++; +} + +void job_txn_unref(JobTxn *txn) +{ + if (txn && --txn->refcnt == 0) { + g_free(txn); + } +} + +void job_txn_add_job(JobTxn *txn, Job *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn = txn; + + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + job_txn_ref(txn); +} + +static void job_txn_del_job(Job *job) +{ + if (job->txn) { + QLIST_REMOVE(job, txn_list); + job_txn_unref(job->txn); + job->txn = NULL; + } +} + +static int job_txn_apply(JobTxn *txn, int fn(Job *)) +{ + Job *job, *next; + int rc = 0; + + QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { + rc = fn(job); + if (rc) { + break; + } + } + return rc; +} + +bool job_is_internal(Job *job) +{ + return (job->id == NULL); +} + +static void job_state_transition(Job *job, JobStatus s1) { JobStatus s0 = job->status; - assert(s1 >= 0 && s1 <= JOB_STATUS__MAX); - trace_job_state_transition(job, /* TODO re-enable: job->ret */ 0, + assert(s1 >= 0 && s1 < JOB_STATUS__MAX); + trace_job_state_transition(job, job->ret, JobSTT[s0][s1] ? "allowed" : "disallowed", JobStatus_str(s0), JobStatus_str(s1)); assert(JobSTT[s0][s1]); job->status = s1; + + if (!job_is_internal(job) && s1 != s0) { + qapi_event_send_job_status_change(job->id, job->status); + } } int job_apply_verb(Job *job, JobVerb verb, Error **errp) { JobStatus s0 = job->status; - assert(verb >= 0 && verb <= JOB_VERB__MAX); + assert(verb >= 0 && verb < JOB_VERB__MAX); trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb), JobVerbTable[verb][s0] ? "allowed" : "prohibited"); if (JobVerbTable[verb][s0]) { @@ -121,12 +199,56 @@ bool job_is_cancelled(Job *job) return job->cancelled; } -bool job_started(Job *job) +bool job_is_ready(Job *job) +{ + switch (job->status) { + case JOB_STATUS_UNDEFINED: + case JOB_STATUS_CREATED: + case JOB_STATUS_RUNNING: + case JOB_STATUS_PAUSED: + case JOB_STATUS_WAITING: + case JOB_STATUS_PENDING: + case JOB_STATUS_ABORTING: + case JOB_STATUS_CONCLUDED: + case JOB_STATUS_NULL: + return false; + case JOB_STATUS_READY: + case JOB_STATUS_STANDBY: + return true; + default: + g_assert_not_reached(); + } + return false; +} + +bool job_is_completed(Job *job) +{ + switch (job->status) { + case JOB_STATUS_UNDEFINED: + case JOB_STATUS_CREATED: + case JOB_STATUS_RUNNING: + case JOB_STATUS_PAUSED: + case JOB_STATUS_READY: + case JOB_STATUS_STANDBY: + return false; + case JOB_STATUS_WAITING: + case JOB_STATUS_PENDING: + case JOB_STATUS_ABORTING: + case JOB_STATUS_CONCLUDED: + case JOB_STATUS_NULL: + return true; + default: + g_assert_not_reached(); + } + return false; +} + +static bool job_started(Job *job) { return job->co; } -bool job_should_pause(Job *job) +static bool job_should_pause(Job *job) { return job->pause_count > 0; } @@ -159,12 +281,17 @@ static void job_sleep_timer_cb(void *opaque) job_enter(job); } -void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, - Error **errp) +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, + AioContext *ctx, int flags, BlockCompletionFunc *cb, + void *opaque, Error **errp) { Job *job; if (job_id) { + if (flags & JOB_INTERNAL) { + error_setg(errp, "Cannot specify job ID for internal job"); + return NULL; + } if (!id_wellformed(job_id)) { error_setg(errp, "Invalid job ID '%s'", job_id); return NULL; @@ -173,6 +300,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, error_setg(errp, "Job ID '%s' already in use", job_id); return NULL; } + } else if (!(flags & JOB_INTERNAL)) { + error_setg(errp, "An explicit job ID is required"); + return NULL; } job = g_malloc0(driver->instance_size); @@ -183,6 +313,15 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, job->busy = false; job->paused = true; job->pause_count = 1; + job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE); + job->auto_dismiss = !(flags & JOB_MANUAL_DISMISS); + job->cb = cb; + job->opaque = opaque; + + notifier_list_init(&job->on_finalize_cancelled); + notifier_list_init(&job->on_finalize_completed); + notifier_list_init(&job->on_pending); + notifier_list_init(&job->on_ready); job_state_transition(job, JOB_STATUS_CREATED); aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, @@ -191,6 +330,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, QLIST_INSERT_HEAD(&jobs, job, job_list); + /* Single jobs are modeled as single-job transactions for sake of + * consolidating the job management logic */ + if (!txn) { + txn = job_txn_new(); + job_txn_add_job(txn, job); + job_txn_unref(txn); + } else { + job_txn_add_job(txn, job); + } + return job; } @@ -204,6 +353,7 @@ void job_unref(Job *job) if (--job->refcnt == 0) { assert(job->status == JOB_STATUS_NULL); assert(!timer_pending(&job->sleep_timer)); + assert(!job->txn); if (job->driver->free) { job->driver->free(job); @@ -211,11 +361,52 @@ void job_unref(Job *job) QLIST_REMOVE(job, job_list); + error_free(job->err); g_free(job->id); g_free(job); } } +void job_progress_update(Job *job, uint64_t done) +{ + job->progress_current += done; +} + +void job_progress_set_remaining(Job *job, uint64_t remaining) +{ + job->progress_total = job->progress_current + remaining; +} + +void job_progress_increase_remaining(Job *job, uint64_t delta) +{ + job->progress_total += delta; +} + +void job_event_cancelled(Job *job) +{ + notifier_list_notify(&job->on_finalize_cancelled, job); +} + +void job_event_completed(Job *job) +{ + notifier_list_notify(&job->on_finalize_completed, job); +} + +static void job_event_pending(Job *job) +{ + notifier_list_notify(&job->on_pending, job); +} + +static void job_event_ready(Job *job) +{ + notifier_list_notify(&job->on_ready, job); +} + +static void job_event_idle(Job *job) +{ + notifier_list_notify(&job->on_idle, job); +} + void job_enter_cond(Job *job, bool(*fn)(Job *job)) { if (!job_started(job)) { @@ -240,7 +431,7 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job)) timer_del(&job->sleep_timer); job->busy = true; job_unlock(); - aio_co_wake(job->co); + aio_co_enter(job->aio_context, job->co); } void job_enter(Job *job) @@ -249,18 +440,19 @@ void job_enter(Job *job) } /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. - * Reentering the job coroutine with block_job_enter() before the timer has - * expired is allowed and cancels the timer. + * Reentering the job coroutine with job_enter() before the timer has expired + * is allowed and cancels the timer. * - * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be + * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be * called explicitly. */ -void coroutine_fn job_do_yield(Job *job, uint64_t ns) +static void coroutine_fn job_do_yield(Job *job, uint64_t ns) { job_lock(); if (ns != -1) { timer_mod(&job->sleep_timer, ns); } job->busy = false; + job_event_idle(job); job_unlock(); qemu_coroutine_yield(); @@ -299,7 +491,7 @@ void coroutine_fn job_pause_point(Job *job) } } -void coroutine_fn job_sleep_ns(Job *job, int64_t ns) +void job_yield(Job *job) { assert(job->busy); @@ -309,36 +501,26 @@ void coroutine_fn job_sleep_ns(Job *job, int64_t ns) } if (!job_should_pause(job)) { - job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); + job_do_yield(job, -1); } job_pause_point(job); } -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn job_co_entry(void *opaque) +void coroutine_fn job_sleep_ns(Job *job, int64_t ns) { - Job *job = opaque; + assert(job->busy); - assert(job && job->driver && job->driver->start); - job_pause_point(job); - job->driver->start(job); -} + /* Check cancellation *before* setting busy = false, too! */ + if (job_is_cancelled(job)) { + return; + } + if (!job_should_pause(job)) { + job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); + } -void job_start(Job *job) -{ - assert(job && !job_started(job) && job->paused && - job->driver && job->driver->start); - job->co = qemu_coroutine_create(job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(job, JOB_STATUS_RUNNING); - aio_co_enter(job->aio_context, job->co); + job_pause_point(job); } /* Assumes the block_job_mutex is held */ @@ -399,34 +581,409 @@ void job_user_resume(Job *job, Error **errp) job_resume(job); } +static void job_do_dismiss(Job *job) +{ + assert(job); + job->busy = false; + job->paused = false; + job->deferred_to_main_loop = true; -typedef struct { - Job *job; - JobDeferToMainLoopFn *fn; - void *opaque; -} JobDeferToMainLoopData; + job_txn_del_job(job); + + job_state_transition(job, JOB_STATUS_NULL); + job_unref(job); +} + +void job_dismiss(Job **jobptr, Error **errp) +{ + Job *job = *jobptr; + /* similarly to _complete, this is QMP-interface only. */ + assert(job->id); + if (job_apply_verb(job, JOB_VERB_DISMISS, errp)) { + return; + } + + job_do_dismiss(job); + *jobptr = NULL; +} + +void job_early_fail(Job *job) +{ + assert(job->status == JOB_STATUS_CREATED); + job_do_dismiss(job); +} + +static void job_conclude(Job *job) +{ + job_state_transition(job, JOB_STATUS_CONCLUDED); + if (job->auto_dismiss || !job_started(job)) { + job_do_dismiss(job); + } +} + +static void job_update_rc(Job *job) +{ + if (!job->ret && job_is_cancelled(job)) { + job->ret = -ECANCELED; + } + if (job->ret) { + if (!job->err) { + error_setg(&job->err, "%s", strerror(-job->ret)); + } + job_state_transition(job, JOB_STATUS_ABORTING); + } +} + +static void job_commit(Job *job) +{ + assert(!job->ret); + if (job->driver->commit) { + job->driver->commit(job); + } +} + +static void job_abort(Job *job) +{ + assert(job->ret); + if (job->driver->abort) { + job->driver->abort(job); + } +} -static void job_defer_to_main_loop_bh(void *opaque) +static void job_clean(Job *job) { - JobDeferToMainLoopData *data = opaque; - Job *job = data->job; - AioContext *aio_context = job->aio_context; + if (job->driver->clean) { + job->driver->clean(job); + } +} + +static int job_finalize_single(Job *job) +{ + assert(job_is_completed(job)); + + /* Ensure abort is called for late-transactional failures */ + job_update_rc(job); + + if (!job->ret) { + job_commit(job); + } else { + job_abort(job); + } + job_clean(job); + + if (job->cb) { + job->cb(job->opaque, job->ret); + } - aio_context_acquire(aio_context); - data->fn(data->job, data->opaque); - aio_context_release(aio_context); + /* Emit events only if we actually started */ + if (job_started(job)) { + if (job_is_cancelled(job)) { + job_event_cancelled(job); + } else { + job_event_completed(job); + } + } - g_free(data); + job_txn_del_job(job); + job_conclude(job); + return 0; +} + +static void job_cancel_async(Job *job, bool force) +{ + if (job->user_paused) { + /* Do not call job_enter here, the caller will handle it. */ + if (job->driver->user_resume) { + job->driver->user_resume(job); + } + job->user_paused = false; + assert(job->pause_count > 0); + job->pause_count--; + } + job->cancelled = true; + /* To prevent 'force == false' overriding a previous 'force == true' */ + job->force_cancel |= force; +} + +static void job_completed_txn_abort(Job *job) +{ + AioContext *outer_ctx = job->aio_context; + AioContext *ctx; + JobTxn *txn = job->txn; + Job *other_job; + + if (txn->aborting) { + /* + * We are cancelled by another job, which will handle everything. + */ + return; + } + txn->aborting = true; + job_txn_ref(txn); + + /* We can only hold the single job's AioContext lock while calling + * job_finalize_single() because the finalization callbacks can involve + * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */ + aio_context_release(outer_ctx); + + /* Other jobs are effectively cancelled by us, set the status for + * them; this job, however, may or may not be cancelled, depending + * on the caller, so leave it. */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (other_job != job) { + ctx = other_job->aio_context; + aio_context_acquire(ctx); + job_cancel_async(other_job, false); + aio_context_release(ctx); + } + } + while (!QLIST_EMPTY(&txn->jobs)) { + other_job = QLIST_FIRST(&txn->jobs); + ctx = other_job->aio_context; + aio_context_acquire(ctx); + if (!job_is_completed(other_job)) { + assert(job_is_cancelled(other_job)); + job_finish_sync(other_job, NULL, NULL); + } + job_finalize_single(other_job); + aio_context_release(ctx); + } + + aio_context_acquire(outer_ctx); + + job_txn_unref(txn); +} + +static int job_prepare(Job *job) +{ + if (job->ret == 0 && job->driver->prepare) { + job->ret = job->driver->prepare(job); + job_update_rc(job); + } + return job->ret; } -void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque) +static int job_needs_finalize(Job *job) { - JobDeferToMainLoopData *data = g_malloc(sizeof(*data)); - data->job = job; - data->fn = fn; - data->opaque = opaque; + return !job->auto_finalize; +} + +static void job_do_finalize(Job *job) +{ + int rc; + assert(job && job->txn); + + /* prepare the transaction to complete */ + rc = job_txn_apply(job->txn, job_prepare); + if (rc) { + job_completed_txn_abort(job); + } else { + job_txn_apply(job->txn, job_finalize_single); + } +} + +void job_finalize(Job *job, Error **errp) +{ + assert(job && job->id); + if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) { + return; + } + job_do_finalize(job); +} + +static int job_transition_to_pending(Job *job) +{ + job_state_transition(job, JOB_STATUS_PENDING); + if (!job->auto_finalize) { + job_event_pending(job); + } + return 0; +} + +void job_transition_to_ready(Job *job) +{ + job_state_transition(job, JOB_STATUS_READY); + job_event_ready(job); +} + +static void job_completed_txn_success(Job *job) +{ + JobTxn *txn = job->txn; + Job *other_job; + + job_state_transition(job, JOB_STATUS_WAITING); + + /* + * Successful completion, see if there are other running jobs in this + * txn. + */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (!job_is_completed(other_job)) { + return; + } + assert(other_job->ret == 0); + } + + job_txn_apply(txn, job_transition_to_pending); + + /* If no jobs need manual finalization, automatically do so */ + if (job_txn_apply(txn, job_needs_finalize) == 0) { + job_do_finalize(job); + } +} + +static void job_completed(Job *job) +{ + assert(job && job->txn && !job_is_completed(job)); + + job_update_rc(job); + trace_job_completed(job, job->ret); + if (job->ret) { + job_completed_txn_abort(job); + } else { + job_completed_txn_success(job); + } +} + +/** Useful only as a type shim for aio_bh_schedule_oneshot. */ +static void job_exit(void *opaque) +{ + Job *job = (Job *)opaque; + AioContext *ctx = job->aio_context; + + aio_context_acquire(ctx); + + /* This is a lie, we're not quiescent, but still doing the completion + * callbacks. However, completion callbacks tend to involve operations that + * drain block nodes, and if .drained_poll still returned true, we would + * deadlock. */ + job->busy = false; + job_event_idle(job); + + job_completed(job); + + aio_context_release(ctx); +} + +/** + * All jobs must allow a pause point before entering their job proper. This + * ensures that jobs can be paused prior to being started, then resumed later. + */ +static void coroutine_fn job_co_entry(void *opaque) +{ + Job *job = opaque; + + assert(job && job->driver && job->driver->run); + job_pause_point(job); + job->ret = job->driver->run(job, &job->err); job->deferred_to_main_loop = true; + job->busy = true; + aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job); +} + +void job_start(Job *job) +{ + assert(job && !job_started(job) && job->paused && + job->driver && job->driver->run); + job->co = qemu_coroutine_create(job_co_entry, job); + job->pause_count--; + job->busy = true; + job->paused = false; + job_state_transition(job, JOB_STATUS_RUNNING); + aio_co_enter(job->aio_context, job->co); +} + +void job_cancel(Job *job, bool force) +{ + if (job->status == JOB_STATUS_CONCLUDED) { + job_do_dismiss(job); + return; + } + job_cancel_async(job, force); + if (!job_started(job)) { + job_completed(job); + } else if (job->deferred_to_main_loop) { + job_completed_txn_abort(job); + } else { + job_enter(job); + } +} + +void job_user_cancel(Job *job, bool force, Error **errp) +{ + if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) { + return; + } + job_cancel(job, force); +} + +/* A wrapper around job_cancel() taking an Error ** parameter so it may be + * used with job_finish_sync() without the need for (rather nasty) function + * pointer casts there. */ +static void job_cancel_err(Job *job, Error **errp) +{ + job_cancel(job, false); +} + +int job_cancel_sync(Job *job) +{ + return job_finish_sync(job, &job_cancel_err, NULL); +} + +void job_cancel_sync_all(void) +{ + Job *job; + AioContext *aio_context; + + while ((job = job_next(NULL))) { + aio_context = job->aio_context; + aio_context_acquire(aio_context); + job_cancel_sync(job); + aio_context_release(aio_context); + } +} + +int job_complete_sync(Job *job, Error **errp) +{ + return job_finish_sync(job, job_complete, errp); +} + +void job_complete(Job *job, Error **errp) +{ + /* Should not be reachable via external interface for internal jobs */ + assert(job->id); + if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) { + return; + } + if (job->pause_count || job_is_cancelled(job) || !job->driver->complete) { + error_setg(errp, "The active block job '%s' cannot be completed", + job->id); + return; + } + + job->driver->complete(job, errp); +} + +int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp) +{ + Error *local_err = NULL; + int ret; + + job_ref(job); + + if (finish) { + finish(job, &local_err); + } + if (local_err) { + error_propagate(errp, local_err); + job_unref(job); + return -EBUSY; + } + + AIO_WAIT_WHILE(job->aio_context, + (job_enter(job), !job_is_completed(job))); - aio_bh_schedule_oneshot(qemu_get_aio_context(), - job_defer_to_main_loop_bh, data); + ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret; + job_unref(job); + return ret; }