#include <unistd.h>
#include <errno.h>
#include <time.h>
-#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "qemu-queue.h"
#include "osdep.h"
+#include "sysemu.h"
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
#include "block/raw-posix-aio.h"
+static void do_spawn_thread(void);
struct qemu_paiocb {
BlockDriverAIOCB common;
ssize_t ret;
int active;
struct qemu_paiocb *next;
-
- int async_context_id;
};
typedef struct PosixAioState {
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
+static int new_threads = 0; /* backlog of threads we need to create */
+static int pending_threads = 0; /* threads created but not running yet */
+static QEMUBH *new_thread_bh;
static QTAILQ_HEAD(, qemu_paiocb) request_list;
#ifdef CONFIG_PREADV
/*
* This looks weird, but the aio code only consideres a request
- * successfull if it has written the number full number of bytes.
+ * successful if it has written the number full number of bytes.
*
* Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
* so in fact we return the ioctl command here to make posix_aio_read()
return len;
}
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
{
ssize_t offset = 0;
pid = getpid();
+ mutex_lock(&lock);
+ pending_threads--;
+ mutex_unlock(&lock);
+ do_spawn_thread();
+
while (1) {
struct qemu_paiocb *aiocb;
ssize_t ret = 0;
while (QTAILQ_EMPTY(&request_list) &&
!(ret == ETIMEDOUT)) {
+ idle_threads++;
ret = cond_timedwait(&cond, &lock, &ts);
+ idle_threads--;
}
if (QTAILQ_EMPTY(&request_list))
aiocb = QTAILQ_FIRST(&request_list);
QTAILQ_REMOVE(&request_list, aiocb, node);
aiocb->active = 1;
- idle_threads--;
mutex_unlock(&lock);
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
case QEMU_AIO_READ:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+ /* A short read means that we have reached EOF. Pad the buffer
+ * with zeros for bytes after EOF. */
+ QEMUIOVector qiov;
+
+ qemu_iovec_init_external(&qiov, aiocb->aio_iov,
+ aiocb->aio_niov);
+ qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
+
+ ret = aiocb->aio_nbytes;
+ }
+ break;
case QEMU_AIO_WRITE:
ret = handle_aiocb_rw(aiocb);
break;
mutex_lock(&lock);
aiocb->ret = ret;
- idle_threads++;
mutex_unlock(&lock);
if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
- idle_threads--;
cur_threads--;
mutex_unlock(&lock);
return NULL;
}
-static void spawn_thread(void)
+static void do_spawn_thread(void)
{
sigset_t set, oldset;
- cur_threads++;
- idle_threads++;
+ mutex_lock(&lock);
+ if (!new_threads) {
+ mutex_unlock(&lock);
+ return;
+ }
+
+ new_threads--;
+ pending_threads++;
+
+ mutex_unlock(&lock);
/* block all signals */
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
+static void spawn_thread_bh_fn(void *opaque)
+{
+ do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+ cur_threads++;
+ new_threads++;
+ /* If there are threads being created, they will spawn new workers, so
+ * we don't spend time creating many threads in a loop holding a mutex or
+ * starving the current vcpu.
+ *
+ * If there are no idle threads, ask the main thread to create one, so we
+ * inherit the correct affinity instead of the vcpu affinity.
+ */
+ if (!pending_threads) {
+ qemu_bh_schedule(new_thread_bh);
+ }
+}
+
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
struct qemu_paiocb *acb, **pacb;
int ret;
int result = 0;
- int async_context_id = get_async_context_id();
for(;;) {
pacb = &s->first_aio;
if (!acb)
return result;
- /* we're only interested in requests in the right context */
- if (acb->async_context_id != async_context_id) {
- pacb = &acb->next;
- continue;
- }
-
ret = qemu_paio_error(acb);
if (ret == ECANCELED) {
/* remove the request */
} else {
ret = -ret;
}
+
+ trace_paio_complete(acb, acb->common.opaque, ret);
+
/* remove the request */
*pacb = acb->next;
/* call the callback */
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
+ trace_paio_cancel(acb, acb->common.opaque);
+
mutex_lock(&lock);
if (!acb->active) {
QTAILQ_REMOVE(&request_list, acb, node);
acb->aio_type = type;
acb->aio_fildes = fd;
acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
if (qiov) {
acb->aio_iov = qiov->iov;
acb->aio_type = QEMU_AIO_IOCTL;
acb->aio_fildes = fd;
acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
acb->aio_offset = 0;
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
if (posix_aio_state)
return 0;
- s = qemu_malloc(sizeof(PosixAioState));
+ s = g_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
die2(ret, "pthread_attr_setdetachstate");
QTAILQ_INIT(&request_list);
+ new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
posix_aio_state = s;
return 0;