*/
#include "qemu/osdep.h"
#include <liburing.h>
-#include "qemu-common.h"
#include "block/aio.h"
#include "qemu/queue.h"
#include "block/block.h"
#include "qapi/error.h"
#include "trace.h"
+/* Only used for assertions. */
+#include "qemu/coroutine_int.h"
+
/* io_uring ring size */
#define MAX_ENTRIES 128
struct io_uring ring;
- /* io queue for submit at batch. Protected by AioContext lock. */
+ /* No locking required, only accessed from AioContext home thread */
LuringQueue io_q;
- /* I/O completion processing. Only runs in I/O thread. */
QEMUBH *completion_bh;
} LuringState;
/**
* luring_resubmit_short_read:
*
- * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async
- * context") a buffered I/O request with the start of the file range in the
- * page cache could result in a short read. Applications need to resubmit the
- * remaining read request.
- *
- * This is a slow path but recent kernels never take it.
+ * Short reads are rare but may occur. The remaining read request needs to be
+ * resubmitted.
*/
static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
int nread)
trace_luring_resubmit_short_read(s, luringcb, nread);
/* Update read position */
- luringcb->total_read = nread;
+ luringcb->total_read += nread;
remaining = luringcb->qiov->size - luringcb->total_read;
/* Shorten qiov */
remaining);
/* Update sqe */
- luringcb->sqeq.off = nread;
+ luringcb->sqeq.off += nread;
luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
total_bytes = ret + luringcb->total_read;
if (ret < 0) {
- if (ret == -EINTR) {
+ /*
+ * Only writev/readv/fsync requests on regular files or host block
+ * devices are submitted. Therefore -EAGAIN is not expected but it's
+ * known to happen sometimes with Linux SCSI. Submit again and hope
+ * the request completes successfully.
+ *
+ * For more information, see:
+ * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
+ *
+ * If the code is changed to submit other types of requests in the
+ * future, then this workaround may need to be extended to deal with
+ * genuine -EAGAIN results that should not be resubmitted
+ * immediately.
+ */
+ if (ret == -EINTR || ret == -EAGAIN) {
luring_resubmit(s, luringcb);
continue;
}
ret = 0;
}
} else {
- ret = -ENOSPC;;
+ ret = -ENOSPC;
}
}
end:
* eventually runs later. Coroutines cannot be entered recursively
* so avoid doing that!
*/
+ assert(luringcb->co->ctx == s->aio_context);
if (!qemu_coroutine_entered(luringcb->co)) {
aio_co_wake(luringcb->co);
}
trace_luring_io_uring_submit(s, ret);
/* Prevent infinite loop if submission is refused */
if (ret <= 0) {
- if (ret == -EAGAIN) {
+ if (ret == -EAGAIN || ret == -EINTR) {
continue;
}
break;
static void luring_process_completions_and_submit(LuringState *s)
{
- aio_context_acquire(s->aio_context);
luring_process_completions(s);
if (!s->io_q.plugged && s->io_q.in_queue > 0) {
ioq_submit(s);
}
- aio_context_release(s->aio_context);
}
static void qemu_luring_completion_bh(void *opaque)
luring_process_completions_and_submit(s);
}
+static bool qemu_luring_poll_cb(void *opaque)
+{
+ LuringState *s = opaque;
+
+ return io_uring_cq_ready(&s->ring);
+}
+
+static void qemu_luring_poll_ready(void *opaque)
+{
+ LuringState *s = opaque;
+
+ luring_process_completions_and_submit(s);
+}
+
static void ioq_init(LuringQueue *io_q)
{
QSIMPLEQ_INIT(&io_q->submit_queue);
io_q->blocked = false;
}
-void luring_io_plug(BlockDriverState *bs, LuringState *s)
+void luring_io_plug(void)
{
+ AioContext *ctx = qemu_get_current_aio_context();
+ LuringState *s = aio_get_linux_io_uring(ctx);
trace_luring_io_plug(s);
s->io_q.plugged++;
}
-void luring_io_unplug(BlockDriverState *bs, LuringState *s)
+void luring_io_unplug(void)
{
+ AioContext *ctx = qemu_get_current_aio_context();
+ LuringState *s = aio_get_linux_io_uring(ctx);
assert(s->io_q.plugged);
trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
s->io_q.in_queue, s->io_q.in_flight);
io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
luringcb->qiov->niov, offset);
break;
+ case QEMU_AIO_ZONE_APPEND:
+ io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
+ luringcb->qiov->niov, offset);
+ break;
case QEMU_AIO_READ:
io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
luringcb->qiov->niov, offset);
return 0;
}
-int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
- uint64_t offset, QEMUIOVector *qiov, int type)
+int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
+ QEMUIOVector *qiov, int type)
{
int ret;
+ AioContext *ctx = qemu_get_current_aio_context();
+ LuringState *s = aio_get_linux_io_uring(ctx);
LuringAIOCB luringcb = {
.co = qemu_coroutine_self(),
.ret = -EINPROGRESS,
void luring_detach_aio_context(LuringState *s, AioContext *old_context)
{
- aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
- s);
+ aio_set_fd_handler(old_context, s->ring.ring_fd,
+ NULL, NULL, NULL, NULL, s);
qemu_bh_delete(s->completion_bh);
s->aio_context = NULL;
}
{
s->aio_context = new_context;
s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
- aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
- qemu_luring_completion_cb, NULL, NULL, s);
+ aio_set_fd_handler(s->aio_context, s->ring.ring_fd,
+ qemu_luring_completion_cb, NULL,
+ qemu_luring_poll_cb, qemu_luring_poll_ready, s);
}
LuringState *luring_init(Error **errp)
void luring_cleanup(LuringState *s)
{
io_uring_queue_exit(&s->ring);
- g_free(s);
trace_luring_cleanup_state(s);
+ g_free(s);
}