*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "block/block.h"
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
return NULL;
}
+static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
+{
+ /* If the GSource is in the process of being destroyed then
+ * g_source_remove_poll() causes an assertion failure. Skip
+ * removal in that case, because glib cleans up its state during
+ * destruction anyway.
+ */
+ if (!g_source_is_destroyed(&ctx->source)) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+ }
+
+ /* If a read is in progress, just mark the node as deleted */
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ return false;
+ }
+ /* Otherwise, delete it for real. We can't just mark it as
+ * deleted because deleted nodes are only cleaned up while
+ * no one is walking the handlers list.
+ */
+ QLIST_REMOVE(node, node);
+ return true;
+}
+
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
void *opaque)
{
AioHandler *node;
+ AioHandler *new_node = NULL;
bool is_new = false;
bool deleted = false;
int poll_disable_change;
qemu_lockcnt_unlock(&ctx->list_lock);
return;
}
+ /* Clean events in order to unregister fd from the ctx epoll. */
+ node->pfd.events = 0;
- /* If the GSource is in the process of being destroyed then
- * g_source_remove_poll() causes an assertion failure. Skip
- * removal in that case, because glib cleans up its state during
- * destruction anyway.
- */
- if (!g_source_is_destroyed(&ctx->source)) {
- g_source_remove_poll(&ctx->source, &node->pfd);
- }
-
- /* If a read is in progress, just mark the node as deleted */
- if (qemu_lockcnt_count(&ctx->list_lock)) {
- node->deleted = 1;
- node->pfd.revents = 0;
- } else {
- /* Otherwise, delete it for real. We can't just mark it as
- * deleted because deleted nodes are only cleaned up while
- * no one is walking the handlers list.
- */
- QLIST_REMOVE(node, node);
- deleted = true;
- }
poll_disable_change = -!node->io_poll;
} else {
poll_disable_change = !io_poll - (node && !node->io_poll);
if (node == NULL) {
- /* Alloc and insert if it's not already there */
- node = g_new0(AioHandler, 1);
- node->pfd.fd = fd;
- QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
-
- g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
}
+ /* Alloc and insert if it's not already there */
+ new_node = g_new0(AioHandler, 1);
/* Update handler with latest information */
- node->io_read = io_read;
- node->io_write = io_write;
- node->io_poll = io_poll;
- node->opaque = opaque;
- node->is_external = is_external;
+ new_node->io_read = io_read;
+ new_node->io_write = io_write;
+ new_node->io_poll = io_poll;
+ new_node->opaque = opaque;
+ new_node->is_external = is_external;
+
+ if (is_new) {
+ new_node->pfd.fd = fd;
+ } else {
+ new_node->pfd = node->pfd;
+ }
+ g_source_add_poll(&ctx->source, &new_node->pfd);
+
+ new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
+ new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
- node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
- node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
+ }
+ if (node) {
+ deleted = aio_remove_fd_handler(ctx, node);
}
/* No need to order poll_disable_cnt writes against other updates;
atomic_set(&ctx->poll_disable_cnt,
atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
- aio_epoll_update(ctx, node, is_new);
+ if (new_node) {
+ aio_epoll_update(ctx, new_node, is_new);
+ } else if (node) {
+ /* Unregister deleted fd_handler */
+ aio_epoll_update(ctx, node, false);
+ }
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
if (!node->deleted && node->io_poll &&
aio_node_check(ctx, node->is_external) &&
node->io_poll(node->opaque)) {
+ /*
+ * Polling was successful, exit try_poll_mode immediately
+ * to adjust the next polling time.
+ */
*timeout = 0;
if (node->opaque != &ctx->notifier) {
progress = true;
do {
progress = run_poll_handlers_once(ctx, timeout);
elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
- } while (!progress && elapsed_time < max_ns
- && !atomic_read(&ctx->poll_disable_cnt));
+ max_ns = qemu_soonest_timeout(*timeout, max_ns);
+ assert(!(max_ns && progress));
+ } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
/* If time has passed with no successful polling, adjust *timeout to
* keep the same ending time.
*/
static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
{
- /* See qemu_soonest_timeout() uint64_t hack */
- int64_t max_ns = MIN((uint64_t)*timeout, (uint64_t)ctx->poll_ns);
+ int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
poll_set_started(ctx, true);
int64_t timeout;
int64_t start = 0;
+ assert(in_aio_context_home_thread(ctx));
+
/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
* so disable the optimization now.
*/
if (blocking) {
- assert(in_aio_context_home_thread(ctx));
atomic_add(&ctx->notify_me, 2);
}