X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=async.c;h=5fb3fa61dff7e7cd6c103fc5a2b5545b2d989ff0;hb=032ac6f8bfb68746cb0eea407b1cd2b22a78898f;hp=fd313dffb7acc40bd79acfab3dc28e06c49f8596;hpb=f897235e0a0840c44645b609dc52a4c47ddfc3d5;p=qemu.git diff --git a/async.c b/async.c index fd313dffb..5fb3fa61d 100644 --- a/async.c +++ b/async.c @@ -23,128 +23,58 @@ */ #include "qemu-common.h" -#include "qemu-aio.h" - -/* - * An AsyncContext protects the callbacks of AIO requests and Bottom Halves - * against interfering with each other. A typical example is qcow2 that accepts - * asynchronous requests, but relies for manipulation of its metadata on - * synchronous bdrv_read/write that doesn't trigger any callbacks. - * - * However, these functions are often emulated using AIO which means that AIO - * callbacks must be run - but at the same time we must not run callbacks of - * other requests as they might start to modify metadata and corrupt the - * internal state of the caller of bdrv_read/write. - * - * To achieve the desired semantics we switch into a new AsyncContext. - * Callbacks must only be run if they belong to the current AsyncContext. - * Otherwise they need to be queued until their own context is active again. - * This is how you can make qemu_aio_wait() wait only for your own callbacks. - * - * The AsyncContexts form a stack. When you leave a AsyncContexts, you always - * return to the old ("parent") context. - */ -struct AsyncContext { - /* Consecutive number of the AsyncContext (position in the stack) */ - int id; - - /* Anchor of the list of Bottom Halves belonging to the context */ - struct QEMUBH *first_bh; - - /* Link to parent context */ - struct AsyncContext *parent; -}; - -/* The currently active AsyncContext */ -static struct AsyncContext *async_context = &(struct AsyncContext) { 0 }; - -/* - * Enter a new AsyncContext. Already scheduled Bottom Halves and AIO callbacks - * won't be called until this context is left again. - */ -void async_context_push(void) -{ - struct AsyncContext *new = qemu_mallocz(sizeof(*new)); - new->parent = async_context; - new->id = async_context->id + 1; - async_context = new; -} - -/* Run queued AIO completions and destroy Bottom Half */ -static void bh_run_aio_completions(void *opaque) -{ - QEMUBH **bh = opaque; - qemu_bh_delete(*bh); - qemu_free(bh); - qemu_aio_process_queue(); -} -/* - * Leave the currently active AsyncContext. All Bottom Halves belonging to the - * old context are executed before changing the context. - */ -void async_context_pop(void) -{ - struct AsyncContext *old = async_context; - QEMUBH **bh; - - /* Flush the bottom halves, we don't want to lose them */ - while (qemu_bh_poll()); - - /* Switch back to the parent context */ - async_context = async_context->parent; - qemu_free(old); - - if (async_context == NULL) { - abort(); - } - - /* Schedule BH to run any queued AIO completions as soon as possible */ - bh = qemu_malloc(sizeof(*bh)); - *bh = qemu_bh_new(bh_run_aio_completions, bh); - qemu_bh_schedule(*bh); -} - -/* - * Returns the ID of the currently active AsyncContext - */ -int get_async_context_id(void) -{ - return async_context->id; -} +#include "block/aio.h" +#include "block/thread-pool.h" +#include "qemu/main-loop.h" /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ struct QEMUBH { + AioContext *ctx; QEMUBHFunc *cb; void *opaque; - int scheduled; - int idle; - int deleted; QEMUBH *next; + bool scheduled; + bool idle; + bool deleted; }; -QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque) +QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) { QEMUBH *bh; - bh = qemu_mallocz(sizeof(QEMUBH)); + bh = g_malloc0(sizeof(QEMUBH)); + bh->ctx = ctx; bh->cb = cb; bh->opaque = opaque; - bh->next = async_context->first_bh; - async_context->first_bh = bh; + qemu_mutex_lock(&ctx->bh_lock); + bh->next = ctx->first_bh; + /* Make sure that the members are ready before putting bh into list */ + smp_wmb(); + ctx->first_bh = bh; + qemu_mutex_unlock(&ctx->bh_lock); return bh; } -int qemu_bh_poll(void) +/* Multiple occurrences of aio_bh_poll cannot be called concurrently */ +int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; + ctx->walking_bh++; + ret = 0; - for (bh = async_context->first_bh; bh; bh = next) { + for (bh = ctx->first_bh; bh; bh = next) { + /* Make sure that fetching bh happens before accessing its members */ + smp_read_barrier_depends(); next = bh->next; if (!bh->deleted && bh->scheduled) { bh->scheduled = 0; + /* Paired with write barrier in bh schedule to ensure reading for + * idle & callbacks coming after bh's scheduling. + */ + smp_rmb(); if (!bh->idle) ret = 1; bh->idle = 0; @@ -152,15 +82,22 @@ int qemu_bh_poll(void) } } + ctx->walking_bh--; + /* remove deleted bhs */ - bhp = &async_context->first_bh; - while (*bhp) { - bh = *bhp; - if (bh->deleted) { - *bhp = bh->next; - qemu_free(bh); - } else - bhp = &bh->next; + if (!ctx->walking_bh) { + qemu_mutex_lock(&ctx->bh_lock); + bhp = &ctx->first_bh; + while (*bhp) { + bh = *bhp; + if (bh->deleted) { + *bhp = bh->next; + g_free(bh); + } else { + bhp = &bh->next; + } + } + qemu_mutex_unlock(&ctx->bh_lock); } return ret; @@ -170,48 +107,171 @@ void qemu_bh_schedule_idle(QEMUBH *bh) { if (bh->scheduled) return; - bh->scheduled = 1; bh->idle = 1; + /* Make sure that idle & any writes needed by the callback are done + * before the locations are read in the aio_bh_poll. + */ + smp_wmb(); + bh->scheduled = 1; } void qemu_bh_schedule(QEMUBH *bh) { if (bh->scheduled) return; - bh->scheduled = 1; bh->idle = 0; - /* stop the currently executing CPU to execute the BH ASAP */ - qemu_notify_event(); + /* Make sure that idle & any writes needed by the callback are done + * before the locations are read in the aio_bh_poll. + */ + smp_wmb(); + bh->scheduled = 1; + aio_notify(bh->ctx); } + +/* This func is async. + */ void qemu_bh_cancel(QEMUBH *bh) { bh->scheduled = 0; } +/* This func is async.The bottom half will do the delete action at the finial + * end. + */ void qemu_bh_delete(QEMUBH *bh) { bh->scheduled = 0; bh->deleted = 1; } -void qemu_bh_update_timeout(int *timeout) +static gboolean +aio_ctx_prepare(GSource *source, gint *timeout) { + AioContext *ctx = (AioContext *) source; QEMUBH *bh; + int deadline; - for (bh = async_context->first_bh; bh; bh = bh->next) { + /* We assume there is no timeout already supplied */ + *timeout = -1; + for (bh = ctx->first_bh; bh; bh = bh->next) { if (!bh->deleted && bh->scheduled) { if (bh->idle) { /* idle bottom halves will be polled at least * every 10ms */ - *timeout = MIN(10, *timeout); + *timeout = 10; } else { /* non-idle bottom halves will be executed * immediately */ *timeout = 0; - break; + return true; } } } + + deadline = qemu_timeout_ns_to_ms(timerlistgroup_deadline_ns(&ctx->tlg)); + if (deadline == 0) { + *timeout = 0; + return true; + } else { + *timeout = qemu_soonest_timeout(*timeout, deadline); + } + + return false; +} + +static gboolean +aio_ctx_check(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + + for (bh = ctx->first_bh; bh; bh = bh->next) { + if (!bh->deleted && bh->scheduled) { + return true; + } + } + return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0); } +static gboolean +aio_ctx_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + AioContext *ctx = (AioContext *) source; + + assert(callback == NULL); + aio_poll(ctx, false); + return true; +} + +static void +aio_ctx_finalize(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + + thread_pool_free(ctx->thread_pool); + aio_set_event_notifier(ctx, &ctx->notifier, NULL); + event_notifier_cleanup(&ctx->notifier); + qemu_mutex_destroy(&ctx->bh_lock); + g_array_free(ctx->pollfds, TRUE); + timerlistgroup_deinit(&ctx->tlg); +} + +static GSourceFuncs aio_source_funcs = { + aio_ctx_prepare, + aio_ctx_check, + aio_ctx_dispatch, + aio_ctx_finalize +}; + +GSource *aio_get_g_source(AioContext *ctx) +{ + g_source_ref(&ctx->source); + return &ctx->source; +} + +ThreadPool *aio_get_thread_pool(AioContext *ctx) +{ + if (!ctx->thread_pool) { + ctx->thread_pool = thread_pool_new(ctx); + } + return ctx->thread_pool; +} + +void aio_notify(AioContext *ctx) +{ + event_notifier_set(&ctx->notifier); +} + +static void aio_timerlist_notify(void *opaque) +{ + aio_notify(opaque); +} + +AioContext *aio_context_new(void) +{ + AioContext *ctx; + ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); + ctx->thread_pool = NULL; + qemu_mutex_init(&ctx->bh_lock); + event_notifier_init(&ctx->notifier, false); + aio_set_event_notifier(ctx, &ctx->notifier, + (EventNotifierHandler *) + event_notifier_test_and_clear); + timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); + + return ctx; +} + +void aio_context_ref(AioContext *ctx) +{ + g_source_ref(&ctx->source); +} + +void aio_context_unref(AioContext *ctx) +{ + g_source_unref(&ctx->source); +}