*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "block/block.h"
+#include "qemu/main-loop.h"
#include "qemu/queue.h"
#include "qemu/sockets.h"
#include "qapi/error.h"
#include "qemu/rcu_queue.h"
+#include "qemu/error-report.h"
struct AioHandler {
EventNotifier *e;
GPollFD pfd;
int deleted;
void *opaque;
- bool is_external;
QLIST_ENTRY(AioHandler) node;
};
static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
{
+ /*
+ * If the GSource is in the process of being destroyed then
+ * g_source_remove_poll() causes an assertion failure. Skip
+ * removal in that case, because glib cleans up its state during
+ * destruction anyway.
+ */
+ if (!g_source_is_destroyed(&ctx->source)) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+ }
+
/* If aio_poll is in progress, just mark the node as deleted */
if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
void aio_set_fd_handler(AioContext *ctx,
int fd,
- bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
+ IOHandler *io_poll_ready,
void *opaque)
{
- /* fd is a SOCKET in our case */
AioHandler *old_node;
AioHandler *node = NULL;
+ SOCKET s;
+
+ if (!fd_is_socket(fd)) {
+ error_report("fd=%d is not a socket, AIO implementation is missing", fd);
+ return;
+ }
+
+ s = _get_osfhandle(fd);
qemu_lockcnt_lock(&ctx->list_lock);
QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
- if (old_node->pfd.fd == fd && !old_node->deleted) {
+ if (old_node->pfd.fd == s && !old_node->deleted) {
break;
}
}
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
- node->pfd.fd = fd;
+ node->pfd.fd = s;
node->pfd.events = 0;
if (node->io_read) {
node->opaque = opaque;
node->io_read = io_read;
node->io_write = io_write;
- node->is_external = is_external;
if (io_read) {
bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
event = event_notifier_get_handle(&ctx->notifier);
- WSAEventSelect(node->pfd.fd, event, bitmask);
+ qemu_socket_select(fd, event, bitmask, NULL);
}
if (old_node) {
aio_remove_fd_handler(ctx, old_node);
aio_notify(ctx);
}
-void aio_set_fd_poll(AioContext *ctx, int fd,
- IOHandler *io_poll_begin,
- IOHandler *io_poll_end)
-{
- /* Not implemented */
-}
-
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
- bool is_external,
EventNotifierHandler *io_notify,
- AioPollFn *io_poll)
+ AioPollFn *io_poll,
+ EventNotifierHandler *io_poll_ready)
{
AioHandler *node;
/* Are we deleting the fd handler? */
if (!io_notify) {
if (node) {
- g_source_remove_poll(&ctx->source, &node->pfd);
-
aio_remove_fd_handler(ctx, node);
}
} else {
node->e = e;
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
- node->is_external = is_external;
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
- HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
+ HANDLE events[MAXIMUM_WAIT_OBJECTS];
bool progress, have_select_revents, first;
- int count;
+ unsigned count;
int timeout;
/*
* There cannot be two concurrent aio_poll calls for the same AioContext (or
* an aio_poll concurrent with a GSource prepare/check/dispatch callback).
* We rely on this below to avoid slow locked accesses to ctx->notify_me.
+ *
+ * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
+ * is special in that it runs in the main thread, but that thread's context
+ * is qemu_aio_context.
*/
- assert(in_aio_context_home_thread(ctx));
+ assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
+ qemu_get_aio_context() : ctx));
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
* so disable the optimization now.
*/
if (blocking) {
- atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
+ qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
/*
* Write ctx->notify_me before computing the timeout
* (reading bottom half flags, etc.). Pairs with
/* fill fd sets */
count = 0;
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
- if (!node->deleted && node->io_notify
- && aio_node_check(ctx, node->is_external)) {
+ if (!node->deleted && node->io_notify) {
+ assert(count < MAXIMUM_WAIT_OBJECTS);
events[count++] = event_notifier_get_handle(node->e);
}
}
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
- atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
+ qatomic_store_release(&ctx->notify_me,
+ qatomic_read(&ctx->notify_me) - 2);
aio_notify_accept(ctx);
}
{
}
+void aio_context_use_g_source(AioContext *ctx)
+{
+}
+
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink, Error **errp)
{
error_setg(errp, "AioContext polling is not implemented on Windows");
}
}
+
+void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
+ Error **errp)
+{
+}