]> git.proxmox.com Git - mirror_qemu.git/blobdiff - util/aio-posix.c
vl: make sure char-pty message displayed by moving setbuf to the beginning
[mirror_qemu.git] / util / aio-posix.c
index 1427f49b4a98dbbdc5c3d626e84fea62cb348952..a4977f538ef28d56178267a1795c998c1815ccbf 100644 (file)
@@ -14,7 +14,6 @@
  */
 
 #include "qemu/osdep.h"
-#include "qemu-common.h"
 #include "block/block.h"
 #include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
@@ -40,16 +39,16 @@ struct AioHandler
 
 #ifdef CONFIG_EPOLL_CREATE1
 
-/* The fd number threashold to switch to epoll */
+/* The fd number threshold to switch to epoll */
 #define EPOLL_ENABLE_THRESHOLD 64
 
 static void aio_epoll_disable(AioContext *ctx)
 {
-    ctx->epoll_available = false;
-    if (!ctx->epoll_enabled) {
+    ctx->epoll_enabled = false;
+    if (!ctx->epoll_available) {
         return;
     }
-    ctx->epoll_enabled = false;
+    ctx->epoll_available = false;
     close(ctx->epollfd);
 }
 
@@ -119,7 +118,7 @@ static int aio_epoll(AioContext *ctx, GPollFD *pfds,
     }
     if (timeout <= 0 || ret > 0) {
         ret = epoll_wait(ctx->epollfd, events,
-                         sizeof(events) / sizeof(events[0]),
+                         ARRAY_SIZE(events),
                          timeout);
         if (ret <= 0) {
             goto out;
@@ -200,6 +199,31 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd)
     return NULL;
 }
 
+static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
+{
+    /* If the GSource is in the process of being destroyed then
+     * g_source_remove_poll() causes an assertion failure.  Skip
+     * removal in that case, because glib cleans up its state during
+     * destruction anyway.
+     */
+    if (!g_source_is_destroyed(&ctx->source)) {
+        g_source_remove_poll(&ctx->source, &node->pfd);
+    }
+
+    /* If a read is in progress, just mark the node as deleted */
+    if (qemu_lockcnt_count(&ctx->list_lock)) {
+        node->deleted = 1;
+        node->pfd.revents = 0;
+        return false;
+    }
+    /* Otherwise, delete it for real.  We can't just mark it as
+     * deleted because deleted nodes are only cleaned up while
+     * no one is walking the handlers list.
+     */
+    QLIST_REMOVE(node, node);
+    return true;
+}
+
 void aio_set_fd_handler(AioContext *ctx,
                         int fd,
                         bool is_external,
@@ -209,8 +233,10 @@ void aio_set_fd_handler(AioContext *ctx,
                         void *opaque)
 {
     AioHandler *node;
+    AioHandler *new_node = NULL;
     bool is_new = false;
     bool deleted = false;
+    int poll_disable_change;
 
     qemu_lockcnt_lock(&ctx->list_lock);
 
@@ -222,59 +248,56 @@ void aio_set_fd_handler(AioContext *ctx,
             qemu_lockcnt_unlock(&ctx->list_lock);
             return;
         }
+        /* Clean events in order to unregister fd from the ctx epoll. */
+        node->pfd.events = 0;
 
-        /* If the GSource is in the process of being destroyed then
-         * g_source_remove_poll() causes an assertion failure.  Skip
-         * removal in that case, because glib cleans up its state during
-         * destruction anyway.
-         */
-        if (!g_source_is_destroyed(&ctx->source)) {
-            g_source_remove_poll(&ctx->source, &node->pfd);
-        }
-
-        /* If the lock is held, just mark the node as deleted */
-        if (qemu_lockcnt_count(&ctx->list_lock)) {
-            node->deleted = 1;
-            node->pfd.revents = 0;
-        } else {
-            /* Otherwise, delete it for real.  We can't just mark it as
-             * deleted because deleted nodes are only cleaned up while
-             * no one is walking the handlers list.
-             */
-            QLIST_REMOVE(node, node);
-            deleted = true;
-        }
-
-        if (!node->io_poll) {
-            ctx->poll_disable_cnt--;
-        }
+        poll_disable_change = -!node->io_poll;
     } else {
+        poll_disable_change = !io_poll - (node && !node->io_poll);
         if (node == NULL) {
-            /* Alloc and insert if it's not already there */
-            node = g_new0(AioHandler, 1);
-            node->pfd.fd = fd;
-            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
-
-            g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
+        }
+        /* Alloc and insert if it's not already there */
+        new_node = g_new0(AioHandler, 1);
 
-            ctx->poll_disable_cnt += !io_poll;
+        /* Update handler with latest information */
+        new_node->io_read = io_read;
+        new_node->io_write = io_write;
+        new_node->io_poll = io_poll;
+        new_node->opaque = opaque;
+        new_node->is_external = is_external;
+
+        if (is_new) {
+            new_node->pfd.fd = fd;
         } else {
-            ctx->poll_disable_cnt += !io_poll - !node->io_poll;
+            new_node->pfd = node->pfd;
         }
+        g_source_add_poll(&ctx->source, &new_node->pfd);
 
-        /* Update handler with latest information */
-        node->io_read = io_read;
-        node->io_write = io_write;
-        node->io_poll = io_poll;
-        node->opaque = opaque;
-        node->is_external = is_external;
+        new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
+        new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 
-        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
-        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
+        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
+    }
+    if (node) {
+        deleted = aio_remove_fd_handler(ctx, node);
     }
 
-    aio_epoll_update(ctx, node, is_new);
+    /* No need to order poll_disable_cnt writes against other updates;
+     * the counter is only used to avoid wasting time and latency on
+     * iterated polling when the system call will be ultimately necessary.
+     * Changing handlers is a rare event, and a little wasted polling until
+     * the aio_notify below is not an issue.
+     */
+    atomic_set(&ctx->poll_disable_cnt,
+               atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
+
+    if (new_node) {
+        aio_epoll_update(ctx, new_node, is_new);
+    } else if (node) {
+        /* Unregister deleted fd_handler */
+        aio_epoll_update(ctx, node, false);
+    }
     qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 
@@ -486,7 +509,7 @@ static void add_pollfd(AioHandler *node)
     npfd++;
 }
 
-static bool run_poll_handlers_once(AioContext *ctx)
+static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
 {
     bool progress = false;
     AioHandler *node;
@@ -495,7 +518,14 @@ static bool run_poll_handlers_once(AioContext *ctx)
         if (!node->deleted && node->io_poll &&
             aio_node_check(ctx, node->is_external) &&
             node->io_poll(node->opaque)) {
-            progress = true;
+            /*
+             * Polling was successful, exit try_poll_mode immediately
+             * to adjust the next polling time.
+             */
+            *timeout = 0;
+            if (node->opaque != &ctx->notifier) {
+                progress = true;
+            }
         }
 
         /* Caller handles freeing deleted nodes.  Don't do it here. */
@@ -517,31 +547,39 @@ static bool run_poll_handlers_once(AioContext *ctx)
  *
  * Returns: true if progress was made, false otherwise
  */
-static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
+static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
 {
     bool progress;
-    int64_t end_time;
+    int64_t start_time, elapsed_time;
 
     assert(ctx->notify_me);
     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
-    assert(ctx->poll_disable_cnt == 0);
 
-    trace_run_poll_handlers_begin(ctx, max_ns);
-
-    end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns;
+    trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
 
+    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     do {
-        progress = run_poll_handlers_once(ctx);
-    } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time);
-
-    trace_run_poll_handlers_end(ctx, progress);
+        progress = run_poll_handlers_once(ctx, timeout);
+        elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
+        max_ns = qemu_soonest_timeout(*timeout, max_ns);
+        assert(!(max_ns && progress));
+    } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
+
+    /* If time has passed with no successful polling, adjust *timeout to
+     * keep the same ending time.
+     */
+    if (*timeout != -1) {
+        *timeout -= MIN(*timeout, elapsed_time);
+    }
 
+    trace_run_poll_handlers_end(ctx, progress, *timeout);
     return progress;
 }
 
 /* try_poll_mode:
  * @ctx: the AioContext
- * @blocking: busy polling is only attempted when blocking is true
+ * @timeout: timeout for blocking wait, computed by the caller and updated if
+ *    polling succeeds.
  *
  * ctx->notify_me must be non-zero so this function can detect aio_notify().
  *
@@ -549,19 +587,15 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
  *
  * Returns: true if progress was made, false otherwise
  */
-static bool try_poll_mode(AioContext *ctx, bool blocking)
+static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
 {
-    if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) {
-        /* See qemu_soonest_timeout() uint64_t hack */
-        int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx),
-                             (uint64_t)ctx->poll_ns);
+    int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
 
-        if (max_ns) {
-            poll_set_started(ctx, true);
+    if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
+        poll_set_started(ctx, true);
 
-            if (run_poll_handlers(ctx, max_ns)) {
-                return true;
-            }
+        if (run_poll_handlers(ctx, max_ns, timeout)) {
+            return true;
         }
     }
 
@@ -570,7 +604,7 @@ static bool try_poll_mode(AioContext *ctx, bool blocking)
     /* Even if we don't run busy polling, try polling once in case it can make
      * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
      */
-    return run_poll_handlers_once(ctx);
+    return run_poll_handlers_once(ctx, timeout);
 }
 
 bool aio_poll(AioContext *ctx, bool blocking)
@@ -582,6 +616,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
     int64_t timeout;
     int64_t start = 0;
 
+    assert(in_aio_context_home_thread(ctx));
+
     /* aio_notify can avoid the expensive event_notifier_set if
      * everything (file descriptors, bottom halves, timers) will
      * be re-evaluated before the next blocking poll().  This is
@@ -599,8 +635,14 @@ bool aio_poll(AioContext *ctx, bool blocking)
         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     }
 
-    progress = try_poll_mode(ctx, blocking);
-    if (!progress) {
+    timeout = blocking ? aio_compute_timeout(ctx) : 0;
+    progress = try_poll_mode(ctx, &timeout);
+    assert(!(timeout && progress));
+
+    /* If polling is allowed, non-blocking aio_poll does not need the
+     * system call---a single round of run_poll_handlers_once suffices.
+     */
+    if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
         assert(npfd == 0);
 
         /* fill pollfds */
@@ -614,8 +656,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
             }
         }
 
-        timeout = blocking ? aio_compute_timeout(ctx) : 0;
-
         /* wait until next event */
         if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
             AioHandler epoll_handler;
@@ -632,6 +672,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
     if (blocking) {
         atomic_sub(&ctx->notify_me, 2);
+        aio_notify_accept(ctx);
     }
 
     /* Adjust polling time */
@@ -675,8 +716,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         }
     }
 
-    aio_notify_accept(ctx);
-
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
         for (i = 0; i < npfd; i++) {
@@ -713,6 +752,13 @@ void aio_context_setup(AioContext *ctx)
 #endif
 }
 
+void aio_context_destroy(AioContext *ctx)
+{
+#ifdef CONFIG_EPOLL_CREATE1
+    aio_epoll_disable(ctx);
+#endif
+}
+
 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
                                  int64_t grow, int64_t shrink, Error **errp)
 {