#include "trace/trace-root.h"
#include "qapi/qapi-events-job.h"
+/*
+ * The job API is composed of two categories of functions.
+ *
+ * The first includes functions used by the monitor. The monitor is
+ * peculiar in that it accesses the job list with job_get, and
+ * therefore needs consistency across job_get and the actual operation
+ * (e.g. job_user_cancel). To achieve this consistency, the caller
+ * calls job_lock/job_unlock itself around the whole operation.
+ *
+ *
+ * The second includes functions used by the job drivers and sometimes
+ * by the core block layer. These delegate the locking to the callee instead.
+ */
+
/*
* job_mutex protects the jobs list, but also makes the
* struct job fields thread-safe.
};
void job_lock(void)
-{
- /* nop */
-}
-
-void job_unlock(void)
-{
- /* nop */
-}
-
-static void real_job_lock(void)
{
qemu_mutex_lock(&job_mutex);
}
-static void real_job_unlock(void)
+void job_unlock(void)
{
qemu_mutex_unlock(&job_mutex);
}
/* Called with job_mutex held, but releases it temporarily. */
static int job_txn_apply_locked(Job *job, int fn(Job *))
{
- AioContext *inner_ctx;
Job *other_job, *next;
JobTxn *txn = job->txn;
int rc = 0;
* break AIO_WAIT_WHILE from within fn.
*/
job_ref_locked(job);
- aio_context_release(job->aio_context);
QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
- inner_ctx = other_job->aio_context;
- aio_context_acquire(inner_ctx);
rc = fn(other_job);
- aio_context_release(inner_ctx);
if (rc) {
break;
}
}
- /*
- * Note that job->aio_context might have been changed by calling fn, so we
- * can't use a local variable to cache it.
- */
- aio_context_acquire(job->aio_context);
job_unref_locked(job);
return rc;
}
return -EPERM;
}
-int job_apply_verb(Job *job, JobVerb verb, Error **errp)
-{
- JOB_LOCK_GUARD();
- return job_apply_verb_locked(job, verb, errp);
-}
-
JobType job_type(const Job *job)
{
return job->driver->job_type;
return false;
}
-bool job_is_completed(Job *job)
+static bool job_is_completed(Job *job)
{
JOB_LOCK_GUARD();
return job_is_completed_locked(job);
return NULL;
}
-Job *job_get(const char *id)
+void job_set_aio_context(Job *job, AioContext *ctx)
{
+ /* protect against read in job_finish_sync_locked and job_start */
+ GLOBAL_STATE_CODE();
+ /* protect against read in job_do_yield_locked */
JOB_LOCK_GUARD();
- return job_get_locked(id);
+ /* ensure the job is quiescent while the AioContext is changed */
+ assert(job->paused || job_is_completed_locked(job));
+ job->aio_context = ctx;
}
/* Called with job_mutex *not* held. */
++job->refcnt;
}
-void job_ref(Job *job)
-{
- JOB_LOCK_GUARD();
- job_ref_locked(job);
-}
-
void job_unref_locked(Job *job)
{
GLOBAL_STATE_CODE();
assert(!job->txn);
if (job->driver->free) {
+ AioContext *aio_context = job->aio_context;
job_unlock();
+ /* FIXME: aiocontext lock is required because cb calls blk_unref */
+ aio_context_acquire(aio_context);
job->driver->free(job);
+ aio_context_release(aio_context);
job_lock();
}
}
}
-void job_unref(Job *job)
-{
- JOB_LOCK_GUARD();
- job_unref_locked(job);
-}
-
void job_progress_update(Job *job, uint64_t done)
{
progress_work_done(&job->progress, done);
return;
}
- real_job_lock();
if (job->busy) {
- real_job_unlock();
return;
}
if (fn && !fn(job)) {
- real_job_unlock();
return;
}
assert(!job->deferred_to_main_loop);
timer_del(&job->sleep_timer);
job->busy = true;
- real_job_unlock();
job_unlock();
- aio_co_enter(job->aio_context, job->co);
+ aio_co_wake(job->co);
job_lock();
}
-void job_enter_cond(Job *job, bool(*fn)(Job *job))
-{
- JOB_LOCK_GUARD();
- job_enter_cond_locked(job, fn);
-}
-
void job_enter(Job *job)
{
JOB_LOCK_GUARD();
*/
static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
{
- real_job_lock();
+ AioContext *next_aio_context;
+
if (ns != -1) {
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
job_event_idle_locked(job);
- real_job_unlock();
job_unlock();
qemu_coroutine_yield();
job_lock();
- /* Set by job_enter_cond() before re-entering the coroutine. */
+ next_aio_context = job->aio_context;
+ /*
+ * Coroutine has resumed, but in the meanwhile the job AioContext
+ * might have changed via bdrv_try_change_aio_context(), so we need to move
+ * the coroutine too in the new aiocontext.
+ */
+ while (qemu_get_current_aio_context() != next_aio_context) {
+ job_unlock();
+ aio_co_reschedule_self(next_aio_context);
+ job_lock();
+ next_aio_context = job->aio_context;
+ }
+
+ /* Set by job_enter_cond_locked() before re-entering the coroutine. */
assert(job->busy);
}
job_pause_point_locked(job);
}
-static void coroutine_fn job_yield_locked(Job *job)
+void coroutine_fn job_yield(Job *job)
{
+ JOB_LOCK_GUARD();
assert(job->busy);
/* Check cancellation *before* setting busy = false, too! */
job_pause_point_locked(job);
}
-void coroutine_fn job_yield(Job *job)
-{
- JOB_LOCK_GUARD();
- job_yield_locked(job);
-}
-
void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
{
JOB_LOCK_GUARD();
job_pause_locked(job);
}
-void job_user_pause(Job *job, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_user_pause_locked(job, errp);
-}
-
bool job_user_paused_locked(Job *job)
{
return job->user_paused;
}
-bool job_user_paused(Job *job)
-{
- JOB_LOCK_GUARD();
- return job_user_paused_locked(job);
-}
-
void job_user_resume_locked(Job *job, Error **errp)
{
assert(job);
job_resume_locked(job);
}
-void job_user_resume(Job *job, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_user_resume_locked(job, errp);
-}
-
/* Called with job_mutex held, but releases it temporarily. */
static void job_do_dismiss_locked(Job *job)
{
*jobptr = NULL;
}
-void job_dismiss(Job **jobptr, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_dismiss_locked(jobptr, errp);
-}
-
void job_early_fail(Job *job)
{
JOB_LOCK_GUARD();
}
}
-/* Called with job_mutex held, but releases it temporarily */
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
static int job_finalize_single_locked(Job *job)
{
int job_ret;
+ AioContext *ctx = job->aio_context;
assert(job_is_completed_locked(job));
job_ret = job->ret;
job_unlock();
+ aio_context_acquire(ctx);
if (!job_ret) {
job_commit(job);
}
job_clean(job);
- job_lock();
-
if (job->cb) {
- job_ret = job->ret;
- job_unlock();
job->cb(job->opaque, job_ret);
- job_lock();
}
+ aio_context_release(ctx);
+ job_lock();
+
/* Emit events only if we actually started */
if (job_started_locked(job)) {
if (job_is_cancelled_locked(job)) {
return 0;
}
-/* Called with job_mutex held, but releases it temporarily */
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
static void job_cancel_async_locked(Job *job, bool force)
{
+ AioContext *ctx = job->aio_context;
GLOBAL_STATE_CODE();
if (job->driver->cancel) {
job_unlock();
+ aio_context_acquire(ctx);
force = job->driver->cancel(job, force);
+ aio_context_release(ctx);
job_lock();
} else {
/* No .cancel() means the job will behave as if force-cancelled */
}
}
-/* Called with job_mutex held, but releases it temporarily. */
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
static void job_completed_txn_abort_locked(Job *job)
{
- AioContext *ctx;
JobTxn *txn = job->txn;
Job *other_job;
txn->aborting = true;
job_txn_ref_locked(txn);
- /*
- * We can only hold the single job's AioContext lock while calling
- * job_finalize_single() because the finalization callbacks can involve
- * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
- * Note that the job's AioContext may change when it is finalized.
- */
job_ref_locked(job);
- aio_context_release(job->aio_context);
/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job != job) {
- ctx = other_job->aio_context;
- aio_context_acquire(ctx);
/*
* This is a transaction: If one job failed, no result will matter.
* Therefore, pass force=true to terminate all other jobs as quickly
* as possible.
*/
job_cancel_async_locked(other_job, true);
- aio_context_release(ctx);
}
}
while (!QLIST_EMPTY(&txn->jobs)) {
other_job = QLIST_FIRST(&txn->jobs);
- /*
- * The job's AioContext may change, so store it in @ctx so we
- * release the same context that we have acquired before.
- */
- ctx = other_job->aio_context;
- aio_context_acquire(ctx);
if (!job_is_completed_locked(other_job)) {
assert(job_cancel_requested_locked(other_job));
job_finish_sync_locked(other_job, NULL, NULL);
}
job_finalize_single_locked(other_job);
- aio_context_release(ctx);
}
- /*
- * Use job_ref()/job_unref() so we can read the AioContext here
- * even if the job went away during job_finalize_single().
- */
- aio_context_acquire(job->aio_context);
job_unref_locked(job);
-
job_txn_unref_locked(txn);
}
static int job_prepare_locked(Job *job)
{
int ret;
+ AioContext *ctx = job->aio_context;
GLOBAL_STATE_CODE();
+
if (job->ret == 0 && job->driver->prepare) {
job_unlock();
+ aio_context_acquire(ctx);
ret = job->driver->prepare(job);
+ aio_context_release(ctx);
job_lock();
job->ret = ret;
job_update_rc_locked(job);
}
+
return job->ret;
}
job_do_finalize_locked(job);
}
-void job_finalize(Job *job, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_finalize_locked(job, errp);
-}
-
/* Called with job_mutex held. */
static int job_transition_to_pending_locked(Job *job)
{
static void job_exit(void *opaque)
{
Job *job = (Job *)opaque;
- AioContext *ctx;
JOB_LOCK_GUARD();
-
job_ref_locked(job);
- aio_context_acquire(job->aio_context);
/* This is a lie, we're not quiescent, but still doing the completion
* callbacks. However, completion callbacks tend to involve operations that
job_event_idle_locked(job);
job_completed_locked(job);
-
- /*
- * Note that calling job_completed can move the job to a different
- * aio_context, so we cannot cache from above. job_txn_apply takes care of
- * acquiring the new lock, and we ref/unref to avoid job_completed freeing
- * the job underneath us.
- */
- ctx = job->aio_context;
job_unref_locked(job);
- aio_context_release(ctx);
}
/**
}
}
-void job_cancel(Job *job, bool force)
-{
- JOB_LOCK_GUARD();
- job_cancel_locked(job, force);
-}
-
void job_user_cancel_locked(Job *job, bool force, Error **errp)
{
if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
job_cancel_locked(job, force);
}
-void job_user_cancel(Job *job, bool force, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_user_cancel_locked(job, force, errp);
-}
-
-/* A wrapper around job_cancel() taking an Error ** parameter so it may be
- * used with job_finish_sync() without the need for (rather nasty) function
- * pointer casts there.
+/* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
+ * be used with job_finish_sync_locked() without the need for (rather nasty)
+ * function pointer casts there.
*
* Called with job_mutex held.
*/
void job_cancel_sync_all(void)
{
Job *job;
- AioContext *aio_context;
JOB_LOCK_GUARD();
while ((job = job_next_locked(NULL))) {
- aio_context = job->aio_context;
- aio_context_acquire(aio_context);
job_cancel_sync_locked(job, true);
- aio_context_release(aio_context);
}
}
return job_finish_sync_locked(job, job_complete_locked, errp);
}
-int job_complete_sync(Job *job, Error **errp)
-{
- JOB_LOCK_GUARD();
- return job_complete_sync_locked(job, errp);
-}
-
void job_complete_locked(Job *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */
job_lock();
}
-void job_complete(Job *job, Error **errp)
-{
- JOB_LOCK_GUARD();
- job_complete_locked(job, errp);
-}
-
int job_finish_sync_locked(Job *job,
void (*finish)(Job *, Error **errp),
Error **errp)
{
Error *local_err = NULL;
int ret;
+ GLOBAL_STATE_CODE();
job_ref_locked(job);
}
job_unlock();
- AIO_WAIT_WHILE(job->aio_context,
- (job_enter(job), !job_is_completed(job)));
+ AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
+ (job_enter(job), !job_is_completed(job)));
job_lock();
ret = (job_is_cancelled_locked(job) && job->ret == 0)
job_unref_locked(job);
return ret;
}
-
-int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
-{
- JOB_LOCK_GUARD();
- return job_finish_sync_locked(job, finish, errp);
-}