X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=util%2Faio-posix.c;h=a4977f538ef28d56178267a1795c998c1815ccbf;hb=f34edbc760b0f689deddd175fc08732ecb46665f;hp=2d51239ec6f1dcaa401eb92597fe3947e38f3577;hpb=bd451435c02e10dd07247eae188dcca0446cc57a;p=mirror_qemu.git diff --git a/util/aio-posix.c b/util/aio-posix.c index 2d51239ec6..a4977f538e 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -14,7 +14,6 @@ */ #include "qemu/osdep.h" -#include "qemu-common.h" #include "block/block.h" #include "qemu/rcu_queue.h" #include "qemu/sockets.h" @@ -40,16 +39,16 @@ struct AioHandler #ifdef CONFIG_EPOLL_CREATE1 -/* The fd number threashold to switch to epoll */ +/* The fd number threshold to switch to epoll */ #define EPOLL_ENABLE_THRESHOLD 64 static void aio_epoll_disable(AioContext *ctx) { - ctx->epoll_available = false; - if (!ctx->epoll_enabled) { + ctx->epoll_enabled = false; + if (!ctx->epoll_available) { return; } - ctx->epoll_enabled = false; + ctx->epoll_available = false; close(ctx->epollfd); } @@ -119,7 +118,7 @@ static int aio_epoll(AioContext *ctx, GPollFD *pfds, } if (timeout <= 0 || ret > 0) { ret = epoll_wait(ctx->epollfd, events, - sizeof(events) / sizeof(events[0]), + ARRAY_SIZE(events), timeout); if (ret <= 0) { goto out; @@ -200,6 +199,31 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd) return NULL; } +static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) +{ + /* If the GSource is in the process of being destroyed then + * g_source_remove_poll() causes an assertion failure. Skip + * removal in that case, because glib cleans up its state during + * destruction anyway. + */ + if (!g_source_is_destroyed(&ctx->source)) { + g_source_remove_poll(&ctx->source, &node->pfd); + } + + /* If a read is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { + node->deleted = 1; + node->pfd.revents = 0; + return false; + } + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up while + * no one is walking the handlers list. + */ + QLIST_REMOVE(node, node); + return true; +} + void aio_set_fd_handler(AioContext *ctx, int fd, bool is_external, @@ -209,8 +233,10 @@ void aio_set_fd_handler(AioContext *ctx, void *opaque) { AioHandler *node; + AioHandler *new_node = NULL; bool is_new = false; bool deleted = false; + int poll_disable_change; qemu_lockcnt_lock(&ctx->list_lock); @@ -222,52 +248,56 @@ void aio_set_fd_handler(AioContext *ctx, qemu_lockcnt_unlock(&ctx->list_lock); return; } + /* Clean events in order to unregister fd from the ctx epoll. */ + node->pfd.events = 0; - g_source_remove_poll(&ctx->source, &node->pfd); - - /* If the lock is held, just mark the node as deleted */ - if (qemu_lockcnt_count(&ctx->list_lock)) { - node->deleted = 1; - node->pfd.revents = 0; - } else { - /* Otherwise, delete it for real. We can't just mark it as - * deleted because deleted nodes are only cleaned up while - * no one is walking the handlers list. - */ - QLIST_REMOVE(node, node); - deleted = true; - } - - if (!node->io_poll) { - ctx->poll_disable_cnt--; - } + poll_disable_change = -!node->io_poll; } else { + poll_disable_change = !io_poll - (node && !node->io_poll); if (node == NULL) { - /* Alloc and insert if it's not already there */ - node = g_new0(AioHandler, 1); - node->pfd.fd = fd; - QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); - - g_source_add_poll(&ctx->source, &node->pfd); is_new = true; + } + /* Alloc and insert if it's not already there */ + new_node = g_new0(AioHandler, 1); - ctx->poll_disable_cnt += !io_poll; + /* Update handler with latest information */ + new_node->io_read = io_read; + new_node->io_write = io_write; + new_node->io_poll = io_poll; + new_node->opaque = opaque; + new_node->is_external = is_external; + + if (is_new) { + new_node->pfd.fd = fd; } else { - ctx->poll_disable_cnt += !io_poll - !node->io_poll; + new_node->pfd = node->pfd; } + g_source_add_poll(&ctx->source, &new_node->pfd); - /* Update handler with latest information */ - node->io_read = io_read; - node->io_write = io_write; - node->io_poll = io_poll; - node->opaque = opaque; - node->is_external = is_external; + new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); + new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); - node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); - node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); + } + if (node) { + deleted = aio_remove_fd_handler(ctx, node); } - aio_epoll_update(ctx, node, is_new); + /* No need to order poll_disable_cnt writes against other updates; + * the counter is only used to avoid wasting time and latency on + * iterated polling when the system call will be ultimately necessary. + * Changing handlers is a rare event, and a little wasted polling until + * the aio_notify below is not an issue. + */ + atomic_set(&ctx->poll_disable_cnt, + atomic_read(&ctx->poll_disable_cnt) + poll_disable_change); + + if (new_node) { + aio_epoll_update(ctx, new_node, is_new); + } else if (node) { + /* Unregister deleted fd_handler */ + aio_epoll_update(ctx, node, false); + } qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); @@ -479,7 +509,7 @@ static void add_pollfd(AioHandler *node) npfd++; } -static bool run_poll_handlers_once(AioContext *ctx) +static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout) { bool progress = false; AioHandler *node; @@ -488,7 +518,14 @@ static bool run_poll_handlers_once(AioContext *ctx) if (!node->deleted && node->io_poll && aio_node_check(ctx, node->is_external) && node->io_poll(node->opaque)) { - progress = true; + /* + * Polling was successful, exit try_poll_mode immediately + * to adjust the next polling time. + */ + *timeout = 0; + if (node->opaque != &ctx->notifier) { + progress = true; + } } /* Caller handles freeing deleted nodes. Don't do it here. */ @@ -510,31 +547,39 @@ static bool run_poll_handlers_once(AioContext *ctx) * * Returns: true if progress was made, false otherwise */ -static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) +static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) { bool progress; - int64_t end_time; + int64_t start_time, elapsed_time; assert(ctx->notify_me); assert(qemu_lockcnt_count(&ctx->list_lock) > 0); - assert(ctx->poll_disable_cnt == 0); - trace_run_poll_handlers_begin(ctx, max_ns); - - end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns; + trace_run_poll_handlers_begin(ctx, max_ns, *timeout); + start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); do { - progress = run_poll_handlers_once(ctx); - } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time); - - trace_run_poll_handlers_end(ctx, progress); + progress = run_poll_handlers_once(ctx, timeout); + elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; + max_ns = qemu_soonest_timeout(*timeout, max_ns); + assert(!(max_ns && progress)); + } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt)); + + /* If time has passed with no successful polling, adjust *timeout to + * keep the same ending time. + */ + if (*timeout != -1) { + *timeout -= MIN(*timeout, elapsed_time); + } + trace_run_poll_handlers_end(ctx, progress, *timeout); return progress; } /* try_poll_mode: * @ctx: the AioContext - * @blocking: busy polling is only attempted when blocking is true + * @timeout: timeout for blocking wait, computed by the caller and updated if + * polling succeeds. * * ctx->notify_me must be non-zero so this function can detect aio_notify(). * @@ -542,19 +587,15 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) * * Returns: true if progress was made, false otherwise */ -static bool try_poll_mode(AioContext *ctx, bool blocking) +static bool try_poll_mode(AioContext *ctx, int64_t *timeout) { - if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) { - /* See qemu_soonest_timeout() uint64_t hack */ - int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx), - (uint64_t)ctx->poll_ns); + int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); - if (max_ns) { - poll_set_started(ctx, true); + if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) { + poll_set_started(ctx, true); - if (run_poll_handlers(ctx, max_ns)) { - return true; - } + if (run_poll_handlers(ctx, max_ns, timeout)) { + return true; } } @@ -563,7 +604,7 @@ static bool try_poll_mode(AioContext *ctx, bool blocking) /* Even if we don't run busy polling, try polling once in case it can make * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2). */ - return run_poll_handlers_once(ctx); + return run_poll_handlers_once(ctx, timeout); } bool aio_poll(AioContext *ctx, bool blocking) @@ -575,6 +616,8 @@ bool aio_poll(AioContext *ctx, bool blocking) int64_t timeout; int64_t start = 0; + assert(in_aio_context_home_thread(ctx)); + /* aio_notify can avoid the expensive event_notifier_set if * everything (file descriptors, bottom halves, timers) will * be re-evaluated before the next blocking poll(). This is @@ -592,8 +635,14 @@ bool aio_poll(AioContext *ctx, bool blocking) start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); } - progress = try_poll_mode(ctx, blocking); - if (!progress) { + timeout = blocking ? aio_compute_timeout(ctx) : 0; + progress = try_poll_mode(ctx, &timeout); + assert(!(timeout && progress)); + + /* If polling is allowed, non-blocking aio_poll does not need the + * system call---a single round of run_poll_handlers_once suffices. + */ + if (timeout || atomic_read(&ctx->poll_disable_cnt)) { assert(npfd == 0); /* fill pollfds */ @@ -607,8 +656,6 @@ bool aio_poll(AioContext *ctx, bool blocking) } } - timeout = blocking ? aio_compute_timeout(ctx) : 0; - /* wait until next event */ if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { AioHandler epoll_handler; @@ -625,6 +672,7 @@ bool aio_poll(AioContext *ctx, bool blocking) if (blocking) { atomic_sub(&ctx->notify_me, 2); + aio_notify_accept(ctx); } /* Adjust polling time */ @@ -668,8 +716,6 @@ bool aio_poll(AioContext *ctx, bool blocking) } } - aio_notify_accept(ctx); - /* if we have any readable fds, dispatch event */ if (ret > 0) { for (i = 0; i < npfd; i++) { @@ -694,13 +740,6 @@ bool aio_poll(AioContext *ctx, bool blocking) void aio_context_setup(AioContext *ctx) { - /* TODO remove this in final patch submission */ - if (getenv("QEMU_AIO_POLL_MAX_NS")) { - fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has " - "been replaced with -object iothread,poll-max-ns=NUM\n"); - exit(1); - } - #ifdef CONFIG_EPOLL_CREATE1 assert(!ctx->epollfd); ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); @@ -713,6 +752,13 @@ void aio_context_setup(AioContext *ctx) #endif } +void aio_context_destroy(AioContext *ctx) +{ +#ifdef CONFIG_EPOLL_CREATE1 + aio_epoll_disable(ctx); +#endif +} + void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, int64_t grow, int64_t shrink, Error **errp) {