]> git.proxmox.com Git - mirror_qemu.git/blobdiff - util/aio-posix.c
aio-posix: remove idle poll handlers to improve scalability
[mirror_qemu.git] / util / aio-posix.c
index bc0b86547c24d6ab25b15be95cd32191cddd9eca..cd6cf0a4a97d97e3e8242f9ab59769034444803f 100644 (file)
 #include "trace.h"
 #include "aio-posix.h"
 
+/* Stop userspace polling on a handler if it isn't active for some time */
+#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
+
+bool aio_poll_disabled(AioContext *ctx)
+{
+    return atomic_read(&ctx->poll_disable_cnt);
+}
+
 void aio_add_ready_handler(AioHandlerList *ready_list,
                            AioHandler *node,
                            int revents)
@@ -57,16 +65,23 @@ static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
         g_source_remove_poll(&ctx->source, &node->pfd);
     }
 
+    node->pfd.revents = 0;
+
+    /* If the fd monitor has already marked it deleted, leave it alone */
+    if (QLIST_IS_INSERTED(node, node_deleted)) {
+        return false;
+    }
+
     /* If a read is in progress, just mark the node as deleted */
     if (qemu_lockcnt_count(&ctx->list_lock)) {
         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
-        node->pfd.revents = 0;
         return false;
     }
     /* Otherwise, delete it for real.  We can't just mark it as
      * deleted because deleted nodes are only cleaned up while
      * no one is walking the handlers list.
      */
+    QLIST_SAFE_REMOVE(node, node_poll);
     QLIST_REMOVE(node, node);
     return true;
 }
@@ -126,9 +141,6 @@ void aio_set_fd_handler(AioContext *ctx,
 
         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
     }
-    if (node) {
-        deleted = aio_remove_fd_handler(ctx, node);
-    }
 
     /* No need to order poll_disable_cnt writes against other updates;
      * the counter is only used to avoid wasting time and latency on
@@ -139,11 +151,9 @@ void aio_set_fd_handler(AioContext *ctx,
     atomic_set(&ctx->poll_disable_cnt,
                atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
 
-    if (new_node) {
-        ctx->fdmon_ops->update(ctx, new_node, is_new);
-    } else if (node) {
-        /* Unregister deleted fd_handler */
-        ctx->fdmon_ops->update(ctx, node, false);
+    ctx->fdmon_ops->update(ctx, node, new_node);
+    if (node) {
+        deleted = aio_remove_fd_handler(ctx, node);
     }
     qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
@@ -199,7 +209,7 @@ static bool poll_set_started(AioContext *ctx, bool started)
     ctx->poll_started = started;
 
     qemu_lockcnt_inc(&ctx->list_lock);
-    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
         IOHandler *fn;
 
         if (QLIST_IS_INSERTED(node, node_deleted)) {
@@ -280,6 +290,7 @@ static void aio_free_deleted_handlers(AioContext *ctx)
     while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
         QLIST_REMOVE(node, node);
         QLIST_REMOVE(node, node_deleted);
+        QLIST_SAFE_REMOVE(node, node_poll);
         g_free(node);
     }
 
@@ -294,6 +305,22 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
     revents = node->pfd.revents & node->pfd.events;
     node->pfd.revents = 0;
 
+    /*
+     * Start polling AioHandlers when they become ready because activity is
+     * likely to continue.  Note that starvation is theoretically possible when
+     * fdmon_supports_polling(), but only until the fd fires for the first
+     * time.
+     */
+    if (!QLIST_IS_INSERTED(node, node_deleted) &&
+        !QLIST_IS_INSERTED(node, node_poll) &&
+        node->io_poll) {
+        trace_poll_add(ctx, node, node->pfd.fd, revents);
+        if (ctx->poll_started && node->io_poll_begin) {
+            node->io_poll_begin(node->opaque);
+        }
+        QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
+    }
+
     if (!QLIST_IS_INSERTED(node, node_deleted) &&
         (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
         aio_node_check(ctx, node->is_external) &&
@@ -358,15 +385,19 @@ void aio_dispatch(AioContext *ctx)
     timerlistgroup_run_timers(&ctx->tlg);
 }
 
-static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
+static bool run_poll_handlers_once(AioContext *ctx,
+                                   int64_t now,
+                                   int64_t *timeout)
 {
     bool progress = false;
     AioHandler *node;
+    AioHandler *tmp;
 
-    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
-        if (!QLIST_IS_INSERTED(node, node_deleted) && node->io_poll &&
-            aio_node_check(ctx, node->is_external) &&
+    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
+        if (aio_node_check(ctx, node->is_external) &&
             node->io_poll(node->opaque)) {
+            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
+
             /*
              * Polling was successful, exit try_poll_mode immediately
              * to adjust the next polling time.
@@ -383,6 +414,50 @@ static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
     return progress;
 }
 
+static bool fdmon_supports_polling(AioContext *ctx)
+{
+    return ctx->fdmon_ops->need_wait != aio_poll_disabled;
+}
+
+static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
+{
+    AioHandler *node;
+    AioHandler *tmp;
+    bool progress = false;
+
+    /*
+     * File descriptor monitoring implementations without userspace polling
+     * support suffer from starvation when a subset of handlers is polled
+     * because fds will not be processed in a timely fashion.  Don't remove
+     * idle poll handlers.
+     */
+    if (!fdmon_supports_polling(ctx)) {
+        return false;
+    }
+
+    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
+        if (node->poll_idle_timeout == 0LL) {
+            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
+        } else if (now >= node->poll_idle_timeout) {
+            trace_poll_remove(ctx, node, node->pfd.fd);
+            node->poll_idle_timeout = 0LL;
+            QLIST_SAFE_REMOVE(node, node_poll);
+            if (ctx->poll_started && node->io_poll_end) {
+                node->io_poll_end(node->opaque);
+
+                /*
+                 * Final poll in case ->io_poll_end() races with an event.
+                 * Nevermind about re-adding the handler in the rare case where
+                 * this causes progress.
+                 */
+                progress = node->io_poll(node->opaque) || progress;
+            }
+        }
+    }
+
+    return progress;
+}
+
 /* run_poll_handlers:
  * @ctx: the AioContext
  * @max_ns: maximum time to poll for, in nanoseconds
@@ -418,11 +493,16 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
 
     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     do {
-        progress = run_poll_handlers_once(ctx, timeout);
+        progress = run_poll_handlers_once(ctx, start_time, timeout);
         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
         max_ns = qemu_soonest_timeout(*timeout, max_ns);
         assert(!(max_ns && progress));
-    } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
+    } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
+
+    if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
+        *timeout = 0;
+        progress = true;
+    }
 
     /* If time has passed with no successful polling, adjust *timeout to
      * keep the same ending time.
@@ -448,9 +528,14 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
  */
 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
 {
-    int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
+    int64_t max_ns;
 
-    if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
+    if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
+        return false;
+    }
+
+    max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
+    if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
         poll_set_started(ctx, true);
 
         if (run_poll_handlers(ctx, max_ns, timeout)) {
@@ -500,7 +585,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
     /* If polling is allowed, non-blocking aio_poll does not need the
      * system call---a single round of run_poll_handlers_once suffices.
      */
-    if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
+    if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
         ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
     }
 
@@ -570,11 +655,17 @@ void aio_context_setup(AioContext *ctx)
     ctx->fdmon_ops = &fdmon_poll_ops;
     ctx->epollfd = -1;
 
+    /* Use the fastest fd monitoring implementation if available */
+    if (fdmon_io_uring_setup(ctx)) {
+        return;
+    }
+
     fdmon_epoll_setup(ctx);
 }
 
 void aio_context_destroy(AioContext *ctx)
 {
+    fdmon_io_uring_destroy(ctx);
     fdmon_epoll_disable(ctx);
 }