#include "qemu/osdep.h"
#include "block/block.h"
+#include "block/thread-pool.h"
#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "qemu/rcu_queue.h"
QLIST_INSERT_HEAD(ready_list, node, node_ready);
}
+static void aio_add_poll_ready_handler(AioHandlerList *ready_list,
+ AioHandler *node)
+{
+ QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
+ node->poll_ready = true;
+ QLIST_INSERT_HEAD(ready_list, node, node_ready);
+}
+
static AioHandler *find_aio_handler(AioContext *ctx, int fd)
{
AioHandler *node;
}
node->pfd.revents = 0;
+ node->poll_ready = false;
/* If the fd monitor has already marked it deleted, leave it alone */
if (QLIST_IS_INSERTED(node, node_deleted)) {
void aio_set_fd_handler(AioContext *ctx,
int fd,
- bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
+ IOHandler *io_poll_ready,
void *opaque)
{
AioHandler *node;
bool deleted = false;
int poll_disable_change;
+ if (io_poll && !io_poll_ready) {
+ io_poll = NULL; /* polling only makes sense if there is a handler */
+ }
+
qemu_lockcnt_lock(&ctx->list_lock);
node = find_aio_handler(ctx, fd);
new_node->io_read = io_read;
new_node->io_write = io_write;
new_node->io_poll = io_poll;
+ new_node->io_poll_ready = io_poll_ready;
new_node->opaque = opaque;
- new_node->is_external = is_external;
if (is_new) {
new_node->pfd.fd = fd;
}
}
-void aio_set_fd_poll(AioContext *ctx, int fd,
- IOHandler *io_poll_begin,
- IOHandler *io_poll_end)
+static void aio_set_fd_poll(AioContext *ctx, int fd,
+ IOHandler *io_poll_begin,
+ IOHandler *io_poll_end)
{
AioHandler *node = find_aio_handler(ctx, fd);
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
- bool is_external,
EventNotifierHandler *io_read,
- AioPollFn *io_poll)
+ AioPollFn *io_poll,
+ EventNotifierHandler *io_poll_ready)
{
- aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
- (IOHandler *)io_read, NULL, io_poll, notifier);
+ aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
+ (IOHandler *)io_read, NULL, io_poll,
+ (IOHandler *)io_poll_ready, notifier);
}
void aio_set_event_notifier_poll(AioContext *ctx,
(IOHandler *)io_poll_end);
}
-static bool poll_set_started(AioContext *ctx, bool started)
+static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
+ bool started)
{
AioHandler *node;
bool progress = false;
}
/* Poll one last time in case ->io_poll_end() raced with the event */
- if (!started) {
- progress = node->io_poll(node->opaque) || progress;
+ if (!started && node->io_poll(node->opaque)) {
+ aio_add_poll_ready_handler(ready_list, node);
+ progress = true;
}
}
qemu_lockcnt_dec(&ctx->list_lock);
bool aio_prepare(AioContext *ctx)
{
+ AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
+
/* Poll mode cannot be used with glib's event loop, disable it. */
- poll_set_started(ctx, false);
+ poll_set_started(ctx, &ready_list, false);
+ /* TODO what to do with this list? */
return false;
}
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int revents;
+ /* TODO should this check poll ready? */
revents = node->pfd.revents & node->pfd.events;
- if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
- aio_node_check(ctx, node->is_external)) {
+ if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
result = true;
break;
}
- if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
- aio_node_check(ctx, node->is_external)) {
+ if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
result = true;
break;
}
static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
{
bool progress = false;
+ bool poll_ready;
int revents;
revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
+ poll_ready = node->poll_ready;
+ node->poll_ready = false;
+
/*
* Start polling AioHandlers when they become ready because activity is
* likely to continue. Note that starvation is theoretically possible when
}
QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
}
+ if (!QLIST_IS_INSERTED(node, node_deleted) &&
+ poll_ready && revents == 0 && node->io_poll_ready) {
+ /*
+ * Remove temporarily to avoid infinite loops when ->io_poll_ready()
+ * calls aio_poll() before clearing the condition that made the poll
+ * handler become ready.
+ */
+ QLIST_SAFE_REMOVE(node, node_poll);
+
+ node->io_poll_ready(node->opaque);
+
+ if (!QLIST_IS_INSERTED(node, node_poll)) {
+ QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
+ }
+
+ /*
+ * Return early since revents was zero. aio_notify() does not count as
+ * progress.
+ */
+ return node->opaque != &ctx->notifier;
+ }
if (!QLIST_IS_INSERTED(node, node_deleted) &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
- aio_node_check(ctx, node->is_external) &&
node->io_read) {
node->io_read(node->opaque);
}
if (!QLIST_IS_INSERTED(node, node_deleted) &&
(revents & (G_IO_OUT | G_IO_ERR)) &&
- aio_node_check(ctx, node->is_external) &&
node->io_write) {
node->io_write(node->opaque);
progress = true;
}
static bool run_poll_handlers_once(AioContext *ctx,
+ AioHandlerList *ready_list,
int64_t now,
int64_t *timeout)
{
AioHandler *tmp;
QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
- if (aio_node_check(ctx, node->is_external) &&
- node->io_poll(node->opaque)) {
+ if (node->io_poll(node->opaque)) {
+ aio_add_poll_ready_handler(ready_list, node);
+
node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
/*
return ctx->fdmon_ops->need_wait != aio_poll_disabled;
}
-static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
+static bool remove_idle_poll_handlers(AioContext *ctx,
+ AioHandlerList *ready_list,
+ int64_t now)
{
AioHandler *node;
AioHandler *tmp;
* Nevermind about re-adding the handler in the rare case where
* this causes progress.
*/
- progress = node->io_poll(node->opaque) || progress;
+ if (node->io_poll(node->opaque)) {
+ aio_add_poll_ready_handler(ready_list, node);
+ progress = true;
+ }
}
}
}
/* run_poll_handlers:
* @ctx: the AioContext
+ * @ready_list: the list to place ready handlers on
* @max_ns: maximum time to poll for, in nanoseconds
*
* Polls for a given time.
*
* Returns: true if progress was made, false otherwise
*/
-static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
+static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
+ int64_t max_ns, int64_t *timeout)
{
bool progress;
int64_t start_time, elapsed_time;
start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
do {
- progress = run_poll_handlers_once(ctx, start_time, timeout);
+ progress = run_poll_handlers_once(ctx, ready_list,
+ start_time, timeout);
elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
max_ns = qemu_soonest_timeout(*timeout, max_ns);
assert(!(max_ns && progress));
} while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
- if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
+ if (remove_idle_poll_handlers(ctx, ready_list,
+ start_time + elapsed_time)) {
*timeout = 0;
progress = true;
}
/* try_poll_mode:
* @ctx: the AioContext
+ * @ready_list: list to add handlers that need to be run
* @timeout: timeout for blocking wait, computed by the caller and updated if
* polling succeeds.
*
*
* Returns: true if progress was made, false otherwise
*/
-static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
+static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
+ int64_t *timeout)
{
int64_t max_ns;
max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
- poll_set_started(ctx, true);
+ /*
+ * Enable poll mode. It pairs with the poll_set_started() in
+ * aio_poll() which disables poll mode.
+ */
+ poll_set_started(ctx, ready_list, true);
- if (run_poll_handlers(ctx, max_ns, timeout)) {
+ if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
return true;
}
}
-
- if (poll_set_started(ctx, false)) {
- *timeout = 0;
- return true;
- }
-
return false;
}
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
- int ret = 0;
bool progress;
bool use_notify_me;
int64_t timeout;
}
timeout = blocking ? aio_compute_timeout(ctx) : 0;
- progress = try_poll_mode(ctx, &timeout);
+ progress = try_poll_mode(ctx, &ready_list, &timeout);
assert(!(timeout && progress));
/*
* system call---a single round of run_poll_handlers_once suffices.
*/
if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
- ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
+ /*
+ * Disable poll mode. poll mode should be disabled before the call
+ * of ctx->fdmon_ops->wait() so that guest's notification can wake
+ * up IO threads when some work becomes pending. It is essential to
+ * avoid hangs or unnecessary latency.
+ */
+ if (poll_set_started(ctx, &ready_list, false)) {
+ timeout = 0;
+ progress = true;
+ }
+
+ ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
}
if (use_notify_me) {
}
progress |= aio_bh_poll(ctx);
-
- if (ret > 0) {
- progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
- }
+ progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
aio_free_deleted_handlers(ctx);
aio_notify(ctx);
}
+
+void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
+{
+ /*
+ * No thread synchronization here, it doesn't matter if an incorrect value
+ * is used once.
+ */
+ ctx->aio_max_batch = max_batch;
+
+ aio_notify(ctx);
+}