X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=async.c;h=90fe906539f70815a5cfbaa95b42fab9f11ce228;hb=8593e050871c632e245190725b11f1e10c629ff2;hp=fd313dffb7acc40bd79acfab3dc28e06c49f8596;hpb=71f34ad05359d7fa97996562d904979281ddc7f5;p=qemu.git diff --git a/async.c b/async.c index fd313dffb..90fe90653 100644 --- a/async.c +++ b/async.c @@ -23,125 +23,44 @@ */ #include "qemu-common.h" -#include "qemu-aio.h" - -/* - * An AsyncContext protects the callbacks of AIO requests and Bottom Halves - * against interfering with each other. A typical example is qcow2 that accepts - * asynchronous requests, but relies for manipulation of its metadata on - * synchronous bdrv_read/write that doesn't trigger any callbacks. - * - * However, these functions are often emulated using AIO which means that AIO - * callbacks must be run - but at the same time we must not run callbacks of - * other requests as they might start to modify metadata and corrupt the - * internal state of the caller of bdrv_read/write. - * - * To achieve the desired semantics we switch into a new AsyncContext. - * Callbacks must only be run if they belong to the current AsyncContext. - * Otherwise they need to be queued until their own context is active again. - * This is how you can make qemu_aio_wait() wait only for your own callbacks. - * - * The AsyncContexts form a stack. When you leave a AsyncContexts, you always - * return to the old ("parent") context. - */ -struct AsyncContext { - /* Consecutive number of the AsyncContext (position in the stack) */ - int id; - - /* Anchor of the list of Bottom Halves belonging to the context */ - struct QEMUBH *first_bh; - - /* Link to parent context */ - struct AsyncContext *parent; -}; - -/* The currently active AsyncContext */ -static struct AsyncContext *async_context = &(struct AsyncContext) { 0 }; - -/* - * Enter a new AsyncContext. Already scheduled Bottom Halves and AIO callbacks - * won't be called until this context is left again. - */ -void async_context_push(void) -{ - struct AsyncContext *new = qemu_mallocz(sizeof(*new)); - new->parent = async_context; - new->id = async_context->id + 1; - async_context = new; -} - -/* Run queued AIO completions and destroy Bottom Half */ -static void bh_run_aio_completions(void *opaque) -{ - QEMUBH **bh = opaque; - qemu_bh_delete(*bh); - qemu_free(bh); - qemu_aio_process_queue(); -} -/* - * Leave the currently active AsyncContext. All Bottom Halves belonging to the - * old context are executed before changing the context. - */ -void async_context_pop(void) -{ - struct AsyncContext *old = async_context; - QEMUBH **bh; - - /* Flush the bottom halves, we don't want to lose them */ - while (qemu_bh_poll()); - - /* Switch back to the parent context */ - async_context = async_context->parent; - qemu_free(old); - - if (async_context == NULL) { - abort(); - } - - /* Schedule BH to run any queued AIO completions as soon as possible */ - bh = qemu_malloc(sizeof(*bh)); - *bh = qemu_bh_new(bh_run_aio_completions, bh); - qemu_bh_schedule(*bh); -} - -/* - * Returns the ID of the currently active AsyncContext - */ -int get_async_context_id(void) -{ - return async_context->id; -} +#include "block/aio.h" +#include "block/thread-pool.h" +#include "qemu/main-loop.h" /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ struct QEMUBH { + AioContext *ctx; QEMUBHFunc *cb; void *opaque; - int scheduled; - int idle; - int deleted; QEMUBH *next; + bool scheduled; + bool idle; + bool deleted; }; -QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque) +QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) { QEMUBH *bh; - bh = qemu_mallocz(sizeof(QEMUBH)); + bh = g_malloc0(sizeof(QEMUBH)); + bh->ctx = ctx; bh->cb = cb; bh->opaque = opaque; - bh->next = async_context->first_bh; - async_context->first_bh = bh; + bh->next = ctx->first_bh; + ctx->first_bh = bh; return bh; } -int qemu_bh_poll(void) +int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; + ctx->walking_bh++; + ret = 0; - for (bh = async_context->first_bh; bh; bh = next) { + for (bh = ctx->first_bh; bh; bh = next) { next = bh->next; if (!bh->deleted && bh->scheduled) { bh->scheduled = 0; @@ -152,15 +71,20 @@ int qemu_bh_poll(void) } } + ctx->walking_bh--; + /* remove deleted bhs */ - bhp = &async_context->first_bh; - while (*bhp) { - bh = *bhp; - if (bh->deleted) { - *bhp = bh->next; - qemu_free(bh); - } else - bhp = &bh->next; + if (!ctx->walking_bh) { + bhp = &ctx->first_bh; + while (*bhp) { + bh = *bhp; + if (bh->deleted) { + *bhp = bh->next; + g_free(bh); + } else { + bhp = &bh->next; + } + } } return ret; @@ -180,8 +104,7 @@ void qemu_bh_schedule(QEMUBH *bh) return; bh->scheduled = 1; bh->idle = 0; - /* stop the currently executing CPU to execute the BH ASAP */ - qemu_notify_event(); + aio_notify(bh->ctx); } void qemu_bh_cancel(QEMUBH *bh) @@ -195,23 +118,113 @@ void qemu_bh_delete(QEMUBH *bh) bh->deleted = 1; } -void qemu_bh_update_timeout(int *timeout) +static gboolean +aio_ctx_prepare(GSource *source, gint *timeout) { + AioContext *ctx = (AioContext *) source; QEMUBH *bh; - for (bh = async_context->first_bh; bh; bh = bh->next) { + for (bh = ctx->first_bh; bh; bh = bh->next) { if (!bh->deleted && bh->scheduled) { if (bh->idle) { /* idle bottom halves will be polled at least * every 10ms */ - *timeout = MIN(10, *timeout); + *timeout = 10; } else { /* non-idle bottom halves will be executed * immediately */ *timeout = 0; - break; + return true; } } } + + return false; +} + +static gboolean +aio_ctx_check(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + + for (bh = ctx->first_bh; bh; bh = bh->next) { + if (!bh->deleted && bh->scheduled) { + return true; + } + } + return aio_pending(ctx); +} + +static gboolean +aio_ctx_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + AioContext *ctx = (AioContext *) source; + + assert(callback == NULL); + aio_poll(ctx, false); + return true; +} + +static void +aio_ctx_finalize(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + + thread_pool_free(ctx->thread_pool); + aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL); + event_notifier_cleanup(&ctx->notifier); + g_array_free(ctx->pollfds, TRUE); +} + +static GSourceFuncs aio_source_funcs = { + aio_ctx_prepare, + aio_ctx_check, + aio_ctx_dispatch, + aio_ctx_finalize +}; + +GSource *aio_get_g_source(AioContext *ctx) +{ + g_source_ref(&ctx->source); + return &ctx->source; +} + +ThreadPool *aio_get_thread_pool(AioContext *ctx) +{ + if (!ctx->thread_pool) { + ctx->thread_pool = thread_pool_new(ctx); + } + return ctx->thread_pool; +} + +void aio_notify(AioContext *ctx) +{ + event_notifier_set(&ctx->notifier); } +AioContext *aio_context_new(void) +{ + AioContext *ctx; + ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); + ctx->thread_pool = NULL; + event_notifier_init(&ctx->notifier, false); + aio_set_event_notifier(ctx, &ctx->notifier, + (EventNotifierHandler *) + event_notifier_test_and_clear, NULL); + + return ctx; +} + +void aio_context_ref(AioContext *ctx) +{ + g_source_ref(&ctx->source); +} + +void aio_context_unref(AioContext *ctx) +{ + g_source_unref(&ctx->source); +}