+void job_drain(Job *job)
+{
+ /* If job is !busy this kicks it into the next pause point. */
+ job_enter(job);
+
+ if (job->driver->drain) {
+ job->driver->drain(job);
+ }
+}
+
+/* Assumes the block_job_mutex is held */
+static bool job_timer_not_pending(Job *job)
+{
+ return !timer_pending(&job->sleep_timer);
+}
+
+void job_pause(Job *job)
+{
+ job->pause_count++;
+}
+
+void job_resume(Job *job)
+{
+ assert(job->pause_count > 0);
+ job->pause_count--;
+ if (job->pause_count) {
+ return;
+ }
+
+ /* kick only if no timer is pending */
+ job_enter_cond(job, job_timer_not_pending);
+}
+
+void job_user_pause(Job *job, Error **errp)
+{
+ if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
+ return;
+ }
+ if (job->user_paused) {
+ error_setg(errp, "Job is already paused");
+ return;
+ }
+ job->user_paused = true;
+ job_pause(job);
+}
+
+bool job_user_paused(Job *job)
+{
+ return job->user_paused;
+}
+
+void job_user_resume(Job *job, Error **errp)
+{
+ assert(job);
+ if (!job->user_paused || job->pause_count <= 0) {
+ error_setg(errp, "Can't resume a job that was not paused");
+ return;
+ }
+ if (job_apply_verb(job, JOB_VERB_RESUME, errp)) {
+ return;
+ }
+ if (job->driver->user_resume) {
+ job->driver->user_resume(job);
+ }
+ job->user_paused = false;
+ job_resume(job);
+}
+
+static void job_do_dismiss(Job *job)
+{
+ assert(job);
+ job->busy = false;
+ job->paused = false;
+ job->deferred_to_main_loop = true;
+
+ job_txn_del_job(job);
+
+ job_state_transition(job, JOB_STATUS_NULL);
+ job_unref(job);
+}
+
+void job_dismiss(Job **jobptr, Error **errp)
+{
+ Job *job = *jobptr;
+ /* similarly to _complete, this is QMP-interface only. */
+ assert(job->id);
+ if (job_apply_verb(job, JOB_VERB_DISMISS, errp)) {
+ return;
+ }
+
+ job_do_dismiss(job);
+ *jobptr = NULL;
+}
+
+void job_early_fail(Job *job)
+{
+ assert(job->status == JOB_STATUS_CREATED);
+ job_do_dismiss(job);
+}
+
+static void job_conclude(Job *job)
+{
+ job_state_transition(job, JOB_STATUS_CONCLUDED);
+ if (job->auto_dismiss || !job_started(job)) {
+ job_do_dismiss(job);
+ }
+}
+
+static void job_update_rc(Job *job)
+{
+ if (!job->ret && job_is_cancelled(job)) {
+ job->ret = -ECANCELED;
+ }
+ if (job->ret) {
+ if (!job->err) {
+ error_setg(&job->err, "%s", strerror(-job->ret));
+ }
+ job_state_transition(job, JOB_STATUS_ABORTING);
+ }
+}
+
+static void job_commit(Job *job)
+{
+ assert(!job->ret);
+ if (job->driver->commit) {
+ job->driver->commit(job);
+ }
+}
+
+static void job_abort(Job *job)
+{
+ assert(job->ret);
+ if (job->driver->abort) {
+ job->driver->abort(job);
+ }
+}
+
+static void job_clean(Job *job)
+{
+ if (job->driver->clean) {
+ job->driver->clean(job);
+ }
+}
+
+static int job_finalize_single(Job *job)
+{
+ assert(job_is_completed(job));
+
+ /* Ensure abort is called for late-transactional failures */
+ job_update_rc(job);
+
+ if (!job->ret) {
+ job_commit(job);
+ } else {
+ job_abort(job);
+ }
+ job_clean(job);
+
+ if (job->cb) {
+ job->cb(job->opaque, job->ret);
+ }
+
+ /* Emit events only if we actually started */
+ if (job_started(job)) {
+ if (job_is_cancelled(job)) {
+ job_event_cancelled(job);
+ } else {
+ job_event_completed(job);
+ }
+ }
+
+ job_txn_del_job(job);
+ job_conclude(job);
+ return 0;
+}
+
+static void job_cancel_async(Job *job, bool force)
+{
+ if (job->user_paused) {
+ /* Do not call job_enter here, the caller will handle it. */
+ if (job->driver->user_resume) {
+ job->driver->user_resume(job);
+ }
+ job->user_paused = false;
+ assert(job->pause_count > 0);
+ job->pause_count--;
+ }
+ job->cancelled = true;
+ /* To prevent 'force == false' overriding a previous 'force == true' */
+ job->force_cancel |= force;
+}
+
+static void job_completed_txn_abort(Job *job)
+{
+ AioContext *outer_ctx = job->aio_context;
+ AioContext *ctx;
+ JobTxn *txn = job->txn;
+ Job *other_job;
+
+ if (txn->aborting) {
+ /*
+ * We are cancelled by another job, which will handle everything.
+ */
+ return;
+ }
+ txn->aborting = true;
+ job_txn_ref(txn);
+
+ /* We can only hold the single job's AioContext lock while calling
+ * job_finalize_single() because the finalization callbacks can involve
+ * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
+ aio_context_release(outer_ctx);
+
+ /* Other jobs are effectively cancelled by us, set the status for
+ * them; this job, however, may or may not be cancelled, depending
+ * on the caller, so leave it. */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (other_job != job) {
+ ctx = other_job->aio_context;
+ aio_context_acquire(ctx);
+ job_cancel_async(other_job, false);
+ aio_context_release(ctx);
+ }
+ }
+ while (!QLIST_EMPTY(&txn->jobs)) {
+ other_job = QLIST_FIRST(&txn->jobs);
+ ctx = other_job->aio_context;
+ aio_context_acquire(ctx);
+ if (!job_is_completed(other_job)) {
+ assert(job_is_cancelled(other_job));
+ job_finish_sync(other_job, NULL, NULL);
+ }
+ job_finalize_single(other_job);
+ aio_context_release(ctx);
+ }
+
+ aio_context_acquire(outer_ctx);
+
+ job_txn_unref(txn);
+}
+
+static int job_prepare(Job *job)
+{
+ if (job->ret == 0 && job->driver->prepare) {
+ job->ret = job->driver->prepare(job);
+ job_update_rc(job);
+ }
+ return job->ret;
+}
+
+static int job_needs_finalize(Job *job)
+{
+ return !job->auto_finalize;
+}
+
+static void job_do_finalize(Job *job)
+{
+ int rc;
+ assert(job && job->txn);
+
+ /* prepare the transaction to complete */
+ rc = job_txn_apply(job->txn, job_prepare);
+ if (rc) {
+ job_completed_txn_abort(job);
+ } else {
+ job_txn_apply(job->txn, job_finalize_single);
+ }
+}
+
+void job_finalize(Job *job, Error **errp)
+{
+ assert(job && job->id);
+ if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
+ return;
+ }
+ job_do_finalize(job);
+}
+
+static int job_transition_to_pending(Job *job)
+{
+ job_state_transition(job, JOB_STATUS_PENDING);
+ if (!job->auto_finalize) {
+ job_event_pending(job);
+ }
+ return 0;
+}
+
+void job_transition_to_ready(Job *job)
+{
+ job_state_transition(job, JOB_STATUS_READY);
+ job_event_ready(job);
+}
+
+static void job_completed_txn_success(Job *job)
+{
+ JobTxn *txn = job->txn;
+ Job *other_job;
+
+ job_state_transition(job, JOB_STATUS_WAITING);
+
+ /*
+ * Successful completion, see if there are other running jobs in this
+ * txn.
+ */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (!job_is_completed(other_job)) {
+ return;
+ }
+ assert(other_job->ret == 0);
+ }
+
+ job_txn_apply(txn, job_transition_to_pending);
+
+ /* If no jobs need manual finalization, automatically do so */
+ if (job_txn_apply(txn, job_needs_finalize) == 0) {
+ job_do_finalize(job);
+ }
+}
+
+static void job_completed(Job *job)
+{
+ assert(job && job->txn && !job_is_completed(job));
+
+ job_update_rc(job);
+ trace_job_completed(job, job->ret);
+ if (job->ret) {
+ job_completed_txn_abort(job);
+ } else {
+ job_completed_txn_success(job);
+ }
+}
+
+/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+static void job_exit(void *opaque)
+{
+ Job *job = (Job *)opaque;
+ AioContext *ctx = job->aio_context;
+
+ aio_context_acquire(ctx);
+
+ /* This is a lie, we're not quiescent, but still doing the completion
+ * callbacks. However, completion callbacks tend to involve operations that
+ * drain block nodes, and if .drained_poll still returned true, we would
+ * deadlock. */
+ job->busy = false;
+ job_event_idle(job);
+
+ job_completed(job);
+
+ aio_context_release(ctx);
+}
+