#include "trace.h"
#include "aio-posix.h"
+/* Stop userspace polling on a handler if it isn't active for some time */
+#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
+
+bool aio_poll_disabled(AioContext *ctx)
+{
+ return atomic_read(&ctx->poll_disable_cnt);
+}
+
void aio_add_ready_handler(AioHandlerList *ready_list,
AioHandler *node,
int revents)
g_source_remove_poll(&ctx->source, &node->pfd);
}
+ node->pfd.revents = 0;
+
+ /* If the fd monitor has already marked it deleted, leave it alone */
+ if (QLIST_IS_INSERTED(node, node_deleted)) {
+ return false;
+ }
+
/* If a read is in progress, just mark the node as deleted */
if (qemu_lockcnt_count(&ctx->list_lock)) {
QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
- node->pfd.revents = 0;
return false;
}
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up while
* no one is walking the handlers list.
*/
+ QLIST_SAFE_REMOVE(node, node_poll);
QLIST_REMOVE(node, node);
return true;
}
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
}
- if (node) {
- deleted = aio_remove_fd_handler(ctx, node);
- }
/* No need to order poll_disable_cnt writes against other updates;
* the counter is only used to avoid wasting time and latency on
atomic_set(&ctx->poll_disable_cnt,
atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
- if (new_node) {
- ctx->fdmon_ops->update(ctx, new_node, is_new);
- } else if (node) {
- /* Unregister deleted fd_handler */
- ctx->fdmon_ops->update(ctx, node, false);
+ ctx->fdmon_ops->update(ctx, node, new_node);
+ if (node) {
+ deleted = aio_remove_fd_handler(ctx, node);
}
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
ctx->poll_started = started;
qemu_lockcnt_inc(&ctx->list_lock);
- QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
+ QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
IOHandler *fn;
if (QLIST_IS_INSERTED(node, node_deleted)) {
while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
QLIST_REMOVE(node, node);
QLIST_REMOVE(node, node_deleted);
+ QLIST_SAFE_REMOVE(node, node_poll);
g_free(node);
}
revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
+ /*
+ * Start polling AioHandlers when they become ready because activity is
+ * likely to continue. Note that starvation is theoretically possible when
+ * fdmon_supports_polling(), but only until the fd fires for the first
+ * time.
+ */
+ if (!QLIST_IS_INSERTED(node, node_deleted) &&
+ !QLIST_IS_INSERTED(node, node_poll) &&
+ node->io_poll) {
+ trace_poll_add(ctx, node, node->pfd.fd, revents);
+ if (ctx->poll_started && node->io_poll_begin) {
+ node->io_poll_begin(node->opaque);
+ }
+ QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
+ }
+
if (!QLIST_IS_INSERTED(node, node_deleted) &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
timerlistgroup_run_timers(&ctx->tlg);
}
-static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
+static bool run_poll_handlers_once(AioContext *ctx,
+ int64_t now,
+ int64_t *timeout)
{
bool progress = false;
AioHandler *node;
+ AioHandler *tmp;
- QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
- if (!QLIST_IS_INSERTED(node, node_deleted) && node->io_poll &&
- aio_node_check(ctx, node->is_external) &&
+ QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
+ if (aio_node_check(ctx, node->is_external) &&
node->io_poll(node->opaque)) {
+ node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
+
/*
* Polling was successful, exit try_poll_mode immediately
* to adjust the next polling time.
return progress;
}
+static bool fdmon_supports_polling(AioContext *ctx)
+{
+ return ctx->fdmon_ops->need_wait != aio_poll_disabled;
+}
+
+static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
+{
+ AioHandler *node;
+ AioHandler *tmp;
+ bool progress = false;
+
+ /*
+ * File descriptor monitoring implementations without userspace polling
+ * support suffer from starvation when a subset of handlers is polled
+ * because fds will not be processed in a timely fashion. Don't remove
+ * idle poll handlers.
+ */
+ if (!fdmon_supports_polling(ctx)) {
+ return false;
+ }
+
+ QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
+ if (node->poll_idle_timeout == 0LL) {
+ node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
+ } else if (now >= node->poll_idle_timeout) {
+ trace_poll_remove(ctx, node, node->pfd.fd);
+ node->poll_idle_timeout = 0LL;
+ QLIST_SAFE_REMOVE(node, node_poll);
+ if (ctx->poll_started && node->io_poll_end) {
+ node->io_poll_end(node->opaque);
+
+ /*
+ * Final poll in case ->io_poll_end() races with an event.
+ * Nevermind about re-adding the handler in the rare case where
+ * this causes progress.
+ */
+ progress = node->io_poll(node->opaque) || progress;
+ }
+ }
+ }
+
+ return progress;
+}
+
/* run_poll_handlers:
* @ctx: the AioContext
* @max_ns: maximum time to poll for, in nanoseconds
start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
do {
- progress = run_poll_handlers_once(ctx, timeout);
+ progress = run_poll_handlers_once(ctx, start_time, timeout);
elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
max_ns = qemu_soonest_timeout(*timeout, max_ns);
assert(!(max_ns && progress));
- } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
+ } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
+
+ if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
+ *timeout = 0;
+ progress = true;
+ }
/* If time has passed with no successful polling, adjust *timeout to
* keep the same ending time.
*/
static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
{
- int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
+ int64_t max_ns;
- if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
+ if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
+ return false;
+ }
+
+ max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
+ if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
poll_set_started(ctx, true);
if (run_poll_handlers(ctx, max_ns, timeout)) {
/* If polling is allowed, non-blocking aio_poll does not need the
* system call---a single round of run_poll_handlers_once suffices.
*/
- if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
+ if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
}
ctx->fdmon_ops = &fdmon_poll_ops;
ctx->epollfd = -1;
+ /* Use the fastest fd monitoring implementation if available */
+ if (fdmon_io_uring_setup(ctx)) {
+ return;
+ }
+
fdmon_epoll_setup(ctx);
}
void aio_context_destroy(AioContext *ctx)
{
+ fdmon_io_uring_destroy(ctx);
fdmon_epoll_disable(ctx);
}