*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "block/block.h"
+#include "qemu/main-loop.h"
#include "qemu/queue.h"
#include "qemu/sockets.h"
#include "qapi/error.h"
#include "qemu/rcu_queue.h"
+#include "qemu/error-report.h"
struct AioHandler {
EventNotifier *e;
GPollFD pfd;
int deleted;
void *opaque;
- bool is_external;
QLIST_ENTRY(AioHandler) node;
};
+static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
+{
+ /*
+ * If the GSource is in the process of being destroyed then
+ * g_source_remove_poll() causes an assertion failure. Skip
+ * removal in that case, because glib cleans up its state during
+ * destruction anyway.
+ */
+ if (!g_source_is_destroyed(&ctx->source)) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+ }
+
+ /* If aio_poll is in progress, just mark the node as deleted */
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ } else {
+ /* Otherwise, delete it for real. We can't just mark it as
+ * deleted because deleted nodes are only cleaned up after
+ * releasing the list_lock.
+ */
+ QLIST_REMOVE(node, node);
+ g_free(node);
+ }
+}
+
void aio_set_fd_handler(AioContext *ctx,
int fd,
- bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
+ IOHandler *io_poll_ready,
void *opaque)
{
- /* fd is a SOCKET in our case */
- AioHandler *node;
+ AioHandler *old_node;
+ AioHandler *node = NULL;
+ SOCKET s;
+
+ if (!fd_is_socket(fd)) {
+ error_report("fd=%d is not a socket, AIO implementation is missing", fd);
+ return;
+ }
+
+ s = _get_osfhandle(fd);
qemu_lockcnt_lock(&ctx->list_lock);
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (node->pfd.fd == fd && !node->deleted) {
+ QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
+ if (old_node->pfd.fd == s && !old_node->deleted) {
break;
}
}
- /* Are we deleting the fd handler? */
- if (!io_read && !io_write) {
- if (node) {
- /* If aio_poll is in progress, just mark the node as deleted */
- if (qemu_lockcnt_count(&ctx->list_lock)) {
- node->deleted = 1;
- node->pfd.revents = 0;
- } else {
- /* Otherwise, delete it for real. We can't just mark it as
- * deleted because deleted nodes are only cleaned up after
- * releasing the list_lock.
- */
- QLIST_REMOVE(node, node);
- g_free(node);
- }
- }
- } else {
+ if (io_read || io_write) {
HANDLE event;
long bitmask = 0;
- if (node == NULL) {
- /* Alloc and insert if it's not already there */
- node = g_new0(AioHandler, 1);
- node->pfd.fd = fd;
- QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
- }
+ /* Alloc and insert if it's not already there */
+ node = g_new0(AioHandler, 1);
+ node->pfd.fd = s;
node->pfd.events = 0;
if (node->io_read) {
node->opaque = opaque;
node->io_read = io_read;
node->io_write = io_write;
- node->is_external = is_external;
if (io_read) {
bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
bitmask |= FD_WRITE | FD_CONNECT;
}
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
event = event_notifier_get_handle(&ctx->notifier);
- WSAEventSelect(node->pfd.fd, event, bitmask);
+ qemu_socket_select(fd, event, bitmask, NULL);
+ }
+ if (old_node) {
+ aio_remove_fd_handler(ctx, old_node);
}
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
-void aio_set_fd_poll(AioContext *ctx, int fd,
- IOHandler *io_poll_begin,
- IOHandler *io_poll_end)
-{
- /* Not implemented */
-}
-
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
- bool is_external,
EventNotifierHandler *io_notify,
- AioPollFn *io_poll)
+ AioPollFn *io_poll,
+ EventNotifierHandler *io_poll_ready)
{
AioHandler *node;
/* Are we deleting the fd handler? */
if (!io_notify) {
if (node) {
- g_source_remove_poll(&ctx->source, &node->pfd);
-
- /* aio_poll is in progress, just mark the node as deleted */
- if (qemu_lockcnt_count(&ctx->list_lock)) {
- node->deleted = 1;
- node->pfd.revents = 0;
- } else {
- /* Otherwise, delete it for real. We can't just mark it as
- * deleted because deleted nodes are only cleaned up after
- * releasing the list_lock.
- */
- QLIST_REMOVE(node, node);
- g_free(node);
- }
+ aio_remove_fd_handler(ctx, node);
}
} else {
if (node == NULL) {
node->e = e;
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
- node->is_external = is_external;
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
- HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
+ HANDLE events[MAXIMUM_WAIT_OBJECTS];
bool progress, have_select_revents, first;
- int count;
+ unsigned count;
int timeout;
+ /*
+ * There cannot be two concurrent aio_poll calls for the same AioContext (or
+ * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
+ * We rely on this below to avoid slow locked accesses to ctx->notify_me.
+ *
+ * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
+ * is special in that it runs in the main thread, but that thread's context
+ * is qemu_aio_context.
+ */
+ assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
+ qemu_get_aio_context() : ctx));
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
* so disable the optimization now.
*/
if (blocking) {
- atomic_add(&ctx->notify_me, 2);
+ qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
+ /*
+ * Write ctx->notify_me before computing the timeout
+ * (reading bottom half flags, etc.). Pairs with
+ * smp_mb in aio_notify().
+ */
+ smp_mb();
}
qemu_lockcnt_inc(&ctx->list_lock);
/* fill fd sets */
count = 0;
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
- if (!node->deleted && node->io_notify
- && aio_node_check(ctx, node->is_external)) {
+ if (!node->deleted && node->io_notify) {
+ assert(count < MAXIMUM_WAIT_OBJECTS);
events[count++] = event_notifier_get_handle(node->e);
}
}
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
- atomic_sub(&ctx->notify_me, 2);
+ qatomic_store_release(&ctx->notify_me,
+ qatomic_read(&ctx->notify_me) - 2);
+ aio_notify_accept(ctx);
}
if (first) {
- aio_notify_accept(ctx);
progress |= aio_bh_poll(ctx);
first = false;
}
{
}
+void aio_context_destroy(AioContext *ctx)
+{
+}
+
+void aio_context_use_g_source(AioContext *ctx)
+{
+}
+
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink, Error **errp)
{
- error_setg(errp, "AioContext polling is not implemented on Windows");
+ if (max_ns) {
+ error_setg(errp, "AioContext polling is not implemented on Windows");
+ }
+}
+
+void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
+ Error **errp)
+{
}