#include "qapi/error.h"
#include "block/aio.h"
#include "block/thread-pool.h"
+#include "block/graph-lock.h"
#include "qemu/main-loop.h"
#include "qemu/atomic.h"
#include "qemu/rcu_queue.h"
#include "block/raw-aio.h"
#include "qemu/coroutine_int.h"
+#include "qemu/coroutine-tls.h"
+#include "sysemu/cpu-timers.h"
#include "trace.h"
/***********************************************************/
struct QEMUBH {
AioContext *ctx;
+ const char *name;
QEMUBHFunc *cb;
void *opaque;
QSLIST_ENTRY(QEMUBH) next;
unsigned flags;
+ MemReentrancyGuard *reentrancy_guard;
};
/* Called concurrently from any thread */
unsigned old_flags;
/*
- * The memory barrier implicit in atomic_fetch_or makes sure that:
- * 1. idle & any writes needed by the callback are done before the
- * locations are read in the aio_bh_poll.
- * 2. ctx is loaded before the callback has a chance to execute and bh
- * could be freed.
+ * Synchronizes with atomic_fetch_and() in aio_bh_dequeue(), ensuring that
+ * insertion starts after BH_PENDING is set.
*/
- old_flags = atomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
+ old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
+
if (!(old_flags & BH_PENDING)) {
+ /*
+ * At this point the bottom half becomes visible to aio_bh_poll().
+ * This insertion thus synchronizes with QSLIST_MOVE_ATOMIC in
+ * aio_bh_poll(), ensuring that:
+ * 1. any writes needed by the callback are visible from the callback
+ * after aio_bh_dequeue() returns bh.
+ * 2. ctx is loaded before the callback has a chance to execute and bh
+ * could be freed.
+ */
QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
}
aio_notify(ctx);
+ /*
+ * Workaround for record/replay.
+ * vCPU execution should be suspended when new BH is set.
+ * This is needed to avoid guest timeouts caused
+ * by the long cycles of the execution.
+ */
+ icount_notify_exit();
}
/* Only called from aio_bh_poll() and aio_ctx_finalize() */
QSLIST_REMOVE_HEAD(head, next);
/*
- * The atomic_and is paired with aio_bh_enqueue(). The implicit memory
- * barrier ensures that the callback sees all writes done by the scheduling
- * thread. It also ensures that the scheduling thread sees the cleared
- * flag before bh->cb has run, and thus will call aio_notify again if
- * necessary.
+ * Synchronizes with qatomic_fetch_or() in aio_bh_enqueue(), ensuring that
+ * the removal finishes before BH_PENDING is reset.
*/
- *flags = atomic_fetch_and(&bh->flags,
+ *flags = qatomic_fetch_and(&bh->flags,
~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
return bh;
}
-void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
+void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb,
+ void *opaque, const char *name)
{
QEMUBH *bh;
bh = g_new(QEMUBH, 1);
.ctx = ctx,
.cb = cb,
.opaque = opaque,
+ .name = name,
};
aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
}
-QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
+QEMUBH *aio_bh_new_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque,
+ const char *name, MemReentrancyGuard *reentrancy_guard)
{
QEMUBH *bh;
bh = g_new(QEMUBH, 1);
.ctx = ctx,
.cb = cb,
.opaque = opaque,
+ .name = name,
+ .reentrancy_guard = reentrancy_guard,
};
return bh;
}
void aio_bh_call(QEMUBH *bh)
{
+ bool last_engaged_in_io = false;
+
+ /* Make a copy of the guard-pointer as cb may free the bh */
+ MemReentrancyGuard *reentrancy_guard = bh->reentrancy_guard;
+ if (reentrancy_guard) {
+ last_engaged_in_io = reentrancy_guard->engaged_in_io;
+ if (reentrancy_guard->engaged_in_io) {
+ trace_reentrant_aio(bh->ctx, bh->name);
+ }
+ reentrancy_guard->engaged_in_io = true;
+ }
+
bh->cb(bh->opaque);
+
+ if (reentrancy_guard) {
+ reentrancy_guard->engaged_in_io = last_engaged_in_io;
+ }
}
/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */
BHListSlice *s;
int ret = 0;
+ /* Synchronizes with QSLIST_INSERT_HEAD_ATOMIC in aio_bh_enqueue(). */
QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
+
+ /*
+ * GCC13 [-Werror=dangling-pointer=] complains that the local variable
+ * 'slice' is being stored in the global 'ctx->bh_slice_list' but the
+ * list is emptied before this function returns.
+ */
+#if !defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wpragmas"
+#pragma GCC diagnostic ignored "-Wdangling-pointer="
+#endif
QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
+#if !defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
QEMUBH *bh;
*/
void qemu_bh_cancel(QEMUBH *bh)
{
- atomic_and(&bh->flags, ~BH_SCHEDULED);
+ qatomic_and(&bh->flags, ~BH_SCHEDULED);
}
/* This func is async.The bottom half will do the delete action at the finial
{
AioContext *ctx = (AioContext *) source;
- atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) | 1);
+ qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);
/*
* Write ctx->notify_me before computing the timeout
BHListSlice *s;
/* Finish computing the timeout before clearing the flag. */
- atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) & ~1);
+ qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
aio_notify_accept(ctx);
QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
- /* qemu_bh_delete() must have been called on BHs in this AioContext */
- assert(flags & BH_DELETED);
+ /*
+ * qemu_bh_delete() must have been called on BHs in this AioContext. In
+ * many cases memory leaks, hangs, or inconsistent state occur when a
+ * BH is leaked because something still expects it to run.
+ *
+ * If you hit this, fix the lifecycle of the BH so that
+ * qemu_bh_delete() and any associated cleanup is called before the
+ * AioContext is finalized.
+ */
+ if (unlikely(!(flags & BH_DELETED))) {
+ fprintf(stderr, "%s: BH '%s' leaked, aborting...\n",
+ __func__, bh->name);
+ abort();
+ }
g_free(bh);
}
- aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
+ aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL, NULL);
event_notifier_cleanup(&ctx->notifier);
qemu_rec_mutex_destroy(&ctx->lock);
qemu_lockcnt_destroy(&ctx->list_lock);
timerlistgroup_deinit(&ctx->tlg);
+ unregister_aiocontext(ctx);
aio_context_destroy(ctx);
}
void aio_notify(AioContext *ctx)
{
- /* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs
- * with smp_mb in aio_ctx_prepare or aio_poll.
+ /*
+ * Write e.g. ctx->bh_list before writing ctx->notified. Pairs with
+ * smp_mb() in aio_notify_accept().
+ */
+ smp_wmb();
+ qatomic_set(&ctx->notified, true);
+
+ /*
+ * Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me.
+ * Pairs with smp_mb() in aio_ctx_prepare or aio_poll.
*/
smp_mb();
- if (atomic_read(&ctx->notify_me)) {
+ if (qatomic_read(&ctx->notify_me)) {
event_notifier_set(&ctx->notifier);
- atomic_mb_set(&ctx->notified, true);
}
}
void aio_notify_accept(AioContext *ctx)
{
- if (atomic_xchg(&ctx->notified, false)
-#ifdef WIN32
- || true
-#endif
- ) {
- event_notifier_test_and_clear(&ctx->notifier);
- }
+ qatomic_set(&ctx->notified, false);
+
+ /*
+ * Order reads of ctx->notified (in aio_context_notifier_poll()) and the
+ * above clearing of ctx->notified before reads of e.g. bh->flags. Pairs
+ * with smp_wmb() in aio_notify.
+ */
+ smp_mb();
}
static void aio_timerlist_notify(void *opaque, QEMUClockType type)
aio_notify(opaque);
}
-static void aio_context_notifier_dummy_cb(EventNotifier *e)
+static void aio_context_notifier_cb(EventNotifier *e)
{
+ AioContext *ctx = container_of(e, AioContext, notifier);
+
+ event_notifier_test_and_clear(&ctx->notifier);
}
/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
EventNotifier *e = opaque;
AioContext *ctx = container_of(e, AioContext, notifier);
- return atomic_read(&ctx->notified);
+ /*
+ * No need for load-acquire because we just want to kick the
+ * event loop. aio_notify_accept() takes care of synchronizing
+ * the event loop with the producers.
+ */
+ return qatomic_read(&ctx->notified);
+}
+
+static void aio_context_notifier_poll_ready(EventNotifier *e)
+{
+ /* Do nothing, we just wanted to kick the event loop */
}
static void co_schedule_bh_cb(void *opaque)
aio_context_acquire(ctx);
/* Protected by write barrier in qemu_aio_coroutine_enter */
- atomic_set(&co->scheduled, NULL);
+ qatomic_set(&co->scheduled, NULL);
qemu_aio_coroutine_enter(ctx, co);
aio_context_release(ctx);
}
QSLIST_INIT(&ctx->scheduled_coroutines);
aio_set_event_notifier(ctx, &ctx->notifier,
- false,
- aio_context_notifier_dummy_cb,
- aio_context_notifier_poll);
+ aio_context_notifier_cb,
+ aio_context_notifier_poll,
+ aio_context_notifier_poll_ready);
#ifdef CONFIG_LINUX_AIO
ctx->linux_aio = NULL;
#endif
ctx->poll_grow = 0;
ctx->poll_shrink = 0;
+ ctx->aio_max_batch = 0;
+
+ ctx->thread_pool_min = 0;
+ ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+
+ register_aiocontext(ctx);
+
return ctx;
fail:
g_source_destroy(&ctx->source);
void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
trace_aio_co_schedule(ctx, co);
- const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
+ const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
__func__);
if (scheduled) {
aio_context_unref(ctx);
}
-void aio_co_wake(struct Coroutine *co)
+typedef struct AioCoRescheduleSelf {
+ Coroutine *co;
+ AioContext *new_ctx;
+} AioCoRescheduleSelf;
+
+static void aio_co_reschedule_self_bh(void *opaque)
+{
+ AioCoRescheduleSelf *data = opaque;
+ aio_co_schedule(data->new_ctx, data->co);
+}
+
+void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
+{
+ AioContext *old_ctx = qemu_get_current_aio_context();
+
+ if (old_ctx != new_ctx) {
+ AioCoRescheduleSelf data = {
+ .co = qemu_coroutine_self(),
+ .new_ctx = new_ctx,
+ };
+ /*
+ * We can't directly schedule the coroutine in the target context
+ * because this would be racy: The other thread could try to enter the
+ * coroutine before it has yielded in this one.
+ */
+ aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
+ qemu_coroutine_yield();
+ }
+}
+
+void aio_co_wake(Coroutine *co)
{
AioContext *ctx;
* qemu_coroutine_enter.
*/
smp_read_barrier_depends();
- ctx = atomic_read(&co->ctx);
+ ctx = qatomic_read(&co->ctx);
aio_co_enter(ctx, co);
}
-void aio_co_enter(AioContext *ctx, struct Coroutine *co)
+void aio_co_enter(AioContext *ctx, Coroutine *co)
{
if (ctx != qemu_get_current_aio_context()) {
aio_co_schedule(ctx, co);
void aio_context_acquire(AioContext *ctx)
{
- qemu_rec_mutex_lock(&ctx->lock);
+ /* TODO remove this function */
}
void aio_context_release(AioContext *ctx)
{
- qemu_rec_mutex_unlock(&ctx->lock);
+ /* TODO remove this function */
+}
+
+QEMU_DEFINE_STATIC_CO_TLS(AioContext *, my_aiocontext)
+
+AioContext *qemu_get_current_aio_context(void)
+{
+ AioContext *ctx = get_my_aiocontext();
+ if (ctx) {
+ return ctx;
+ }
+ if (qemu_mutex_iothread_locked()) {
+ /* Possibly in a vCPU thread. */
+ return qemu_get_aio_context();
+ }
+ return NULL;
+}
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+ assert(!get_my_aiocontext());
+ set_my_aiocontext(ctx);
+}
+
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+ int64_t max, Error **errp)
+{
+
+ if (min > max || !max || min > INT_MAX || max > INT_MAX) {
+ error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+ return;
+ }
+
+ ctx->thread_pool_min = min;
+ ctx->thread_pool_max = max;
+
+ if (ctx->thread_pool) {
+ thread_pool_update_params(ctx->thread_pool, ctx);
+ }
}